引言:为什么需要Python数据分析进阶?
在当今数据驱动的时代,Python已经成为数据分析领域的首选语言。从初学者到专家,掌握Python数据分析技能不仅能帮助你理解数据,更能让你在业务决策中发挥关键作用。本课程将带你从基础入门到高级实战,通过真实业务场景的案例,让你掌握解决复杂问题的能力。
课程目标
- 系统掌握:从数据清洗、探索性分析到建模可视化的全流程
- 实战导向:通过真实业务案例(如电商销售分析、用户行为预测等)提升技能
- 进阶技巧:学习高级数据处理、性能优化和自动化分析方法
- 解决难题:掌握处理大规模数据、复杂业务逻辑的实用技巧
第一部分:Python数据分析基础回顾
1.1 核心库概览
在进阶之前,我们需要确保基础扎实。以下是Python数据分析的核心库:
# 导入常用库
import pandas as pd # 数据处理和分析
import numpy as np # 数值计算
import matplotlib.pyplot as plt # 可视化
import seaborn as sns # 高级可视化
from sklearn import preprocessing # 数据预处理
from sklearn.model_selection import train_test_split # 数据分割
1.2 数据结构与操作
DataFrame是数据分析的核心数据结构,让我们通过一个电商销售数据的例子来回顾:
# 创建示例数据
data = {
'订单ID': [1001, 1002, 1003, 1004, 1005],
'产品名称': ['笔记本电脑', '手机', '平板', '耳机', '充电器'],
'价格': [5999, 2999, 1999, 399, 99],
'数量': [2, 3, 1, 5, 10],
'日期': ['2023-01-15', '2023-01-16', '2023-01-17', '2023-01-18', '2023-01-19']
}
df = pd.DataFrame(data)
df['总价'] = df['价格'] * df['数量'] # 计算总价
print(df)
输出结果:
订单ID 产品名称 价格 数量 日期 总价
0 1001 笔记本电脑 5999 2 2023-01-15 11998
1 1002 手机 2999 3 2023-01-16 8997
2 1003 平板 1999 1 2023-01-17 1999
3 1004 耳机 399 5 2023-01-18 1995
4 1005 充电器 99 10 2023-01-19 990
1.3 基础数据操作
# 数据筛选
high_price = df[df['价格'] > 2000] # 筛选高价商品
print("高价商品:")
print(high_price)
# 分组聚合
category_sales = df.groupby('产品名称')['总价'].sum()
print("\n各产品总销售额:")
print(category_sales)
# 数据排序
sorted_df = df.sort_values('总价', ascending=False)
print("\n按总价降序排列:")
print(sorted_df)
第二部分:数据清洗与预处理进阶
2.1 处理缺失值的高级技巧
在实际业务中,数据缺失是常见问题。除了简单的删除或填充,我们需要更智能的方法。
# 创建包含缺失值的示例数据
sales_data = pd.DataFrame({
'用户ID': [1, 2, 3, 4, 5, 6],
'年龄': [25, np.nan, 30, 22, np.nan, 28],
'消费金额': [100, 200, np.nan, 150, 300, 250],
'地区': ['北京', '上海', '北京', np.nan, '上海', '广州']
})
print("原始数据:")
print(sales_data)
# 方法1:基于业务逻辑的填充
# 年龄缺失值用同地区平均年龄填充
sales_data['年龄'] = sales_data.groupby('地区')['年龄'].transform(
lambda x: x.fillna(x.mean())
)
print("\n用地区平均年龄填充后:")
print(sales_data)
# 方法2:使用机器学习预测缺失值(KNN)
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=2)
sales_data_filled = pd.DataFrame(
imputer.fit_transform(sales_data[['年龄', '消费金额']]),
columns=['年龄', '消费金额']
)
print("\nKNN填充结果:")
print(sales_data_filled)
2.2 异常值检测与处理
异常值可能影响分析结果,需要谨慎处理。
# 使用IQR方法检测异常值
def detect_outliers_iqr(data, column):
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = data[(data[column] < lower_bound) | (data[column] > upper_bound)]
return outliers
# 检测消费金额异常值
outliers = detect_outliers_iqr(sales_data, '消费金额')
print("消费金额异常值:")
print(outliers)
# 处理异常值:Winsorization(缩尾处理)
def winsorize(data, column, limits=[0.05, 0.95]):
lower = data[column].quantile(limits[0])
upper = data[column].quantile(limits[1])
data[column] = np.where(data[column] < lower, lower, data[column])
data[column] = np.where(data[column] > upper, upper, data[column])
return data
sales_data_winsorized = winsorize(sales_data.copy(), '消费金额')
print("\n缩尾处理后的消费金额:")
print(sales_data_winsorized['消费金额'])
2.3 特征工程与编码
# 创建包含分类变量的数据
customer_data = pd.DataFrame({
'用户ID': [1, 2, 3, 4, 5],
'性别': ['男', '女', '男', '女', '男'],
'城市': ['北京', '上海', '广州', '深圳', '上海'],
'会员等级': ['普通', '黄金', '白金', '黄金', '普通'],
'消费金额': [1000, 2500, 5000, 3000, 800]
})
# 1. 标签编码(适用于有序分类变量)
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
customer_data['会员等级编码'] = le.fit_transform(customer_data['会员等级'])
print("标签编码结果:")
print(customer_data[['会员等级', '会员等级编码']])
# 2. 独热编码(适用于无序分类变量)
customer_data_encoded = pd.get_dummies(customer_data, columns=['性别', '城市'])
print("\n独热编码结果:")
print(customer_data_encoded)
# 3. 目标编码(适用于高基数分类变量)
def target_encode(df, cat_col, target_col):
# 计算每个类别的目标均值
target_means = df.groupby(cat_col)[target_col].mean()
# 映射到原数据
df[cat_col + '_target_encoded'] = df[cat_col].map(target_means)
return df
customer_data = target_encode(customer_data, '城市', '消费金额')
print("\n目标编码结果:")
print(customer_data[['城市', '城市_target_encoded']])
第三部分:探索性数据分析(EDA)进阶
3.1 多维度分析
# 加载真实数据集(示例:泰坦尼克号数据集)
import seaborn as sns
titanic = sns.load_dataset('titanic')
# 1. 生存率分析
survival_rate = titanic.groupby('class')['survived'].mean()
print("各舱位生存率:")
print(survival_rate)
# 2. 多维度交叉分析
# 使用pivot_table进行多维度分析
pivot = titanic.pivot_table(
values='survived',
index='class',
columns='sex',
aggfunc='mean'
)
print("\n各舱位和性别的生存率:")
print(pivot)
# 3. 使用crosstab进行交叉分析
cross_tab = pd.crosstab(
titanic['class'],
titanic['sex'],
values=titanic['survived'],
aggfunc='mean'
)
print("\n交叉表分析:")
print(cross_tab)
3.2 高级可视化技巧
# 设置中文字体(如果需要)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 1. 多子图组合
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
# 子图1:箱线图
sns.boxplot(data=titanic, x='class', y='age', ax=axes[0,0])
axes[0,0].set_title('各舱位年龄分布')
# 子图2:小提琴图
sns.violinplot(data=titanic, x='class', y='fare', ax=axes[0,1])
axes[0,1].set_title('各舱位票价分布')
# 子图3:热力图
corr_matrix = titanic[['age', 'fare', 'survived']].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', ax=axes[1,0])
axes[1,0].set_title('相关性热力图')
# 子图4:散点图矩阵
sns.scatterplot(data=titanic, x='age', y='fare', hue='survived', ax=axes[1,1])
axes[1,1].set_title('年龄与票价关系(按生存状态)')
plt.tight_layout()
plt.show()
3.3 时间序列分析基础
# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=100, freq='D')
sales = np.random.normal(1000, 200, 100).cumsum() # 累积和模拟销售趋势
ts_data = pd.DataFrame({'日期': dates, '销售额': sales})
ts_data.set_index('日期', inplace=True)
# 1. 基础时间序列分析
print("时间序列数据描述:")
print(ts_data.describe())
# 2. 移动平均分析
ts_data['7日移动平均'] = ts_data['销售额'].rolling(window=7).mean()
ts_data['30日移动平均'] = ts_data['销售额'].rolling(window=30).mean()
# 3. 季节性分解(需要statsmodels库)
try:
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(ts_data['销售额'], model='additive', period=30)
fig, axes = plt.subplots(4, 1, figsize=(12, 8))
decomposition.observed.plot(ax=axes[0], legend=False)
axes[0].set_title('原始数据')
decomposition.trend.plot(ax=axes[1], legend=False)
axes[1].set_title('趋势')
decomposition.seasonal.plot(ax=axes[2], legend=False)
axes[2].set_title('季节性')
decomposition.resid.plot(ax=axes[3], legend=False)
axes[3].set_title('残差')
plt.tight_layout()
plt.show()
except ImportError:
print("需要安装statsmodels库: pip install statsmodels")
第四部分:统计分析与假设检验
4.1 描述性统计进阶
# 使用pandas的高级统计功能
data = pd.DataFrame({
'产品A': np.random.normal(100, 15, 100),
'产品B': np.random.normal(110, 20, 100),
'产品C': np.random.normal(95, 10, 100)
})
# 1. 多组数据描述统计
stats = data.describe()
print("多组数据描述统计:")
print(stats)
# 2. 计算偏度和峰度
skewness = data.skew()
kurtosis = data.kurtosis()
print("\n偏度:")
print(skewness)
print("\n峰度:")
print(kurtosis)
# 3. 计算置信区间
from scipy import stats
confidence_intervals = {}
for col in data.columns:
ci = stats.t.interval(0.95, len(data[col])-1,
loc=np.mean(data[col]),
scale=stats.sem(data[col]))
confidence_intervals[col] = ci
print("\n95%置信区间:")
for col, ci in confidence_intervals.items():
print(f"{col}: [{ci[0]:.2f}, {ci[1]:.2f}]")
4.2 假设检验实战
# 案例:A/B测试分析
# 假设我们有两个版本的网页设计,测试哪个版本的转化率更高
# 生成模拟数据
np.random.seed(42)
n = 1000
# 版本A:转化率10%
conversions_A = np.random.binomial(1, 0.10, n)
# 版本B:转化率12%
conversions_B = np.random.binomial(1, 0.12, n)
# 1. 比例检验(Z检验)
from statsmodels.stats.proportion import proportions_ztest
count = [conversions_A.sum(), conversions_B.sum()]
nobs = [n, n]
z_stat, p_value = proportions_ztest(count, nobs)
print(f"Z统计量: {z_stat:.4f}")
print(f"P值: {p_value:.4f}")
print(f"结论: {'拒绝原假设' if p_value < 0.05 else '不拒绝原假设'}")
# 2. 卡方检验(独立性检验)
from scipy.stats import chi2_contingency
# 构建列联表
contingency_table = pd.crosstab(
pd.Series(conversions_A, name='版本A'),
pd.Series(conversions_B, name='版本B')
)
chi2, p, dof, expected = chi2_contingency(contingency_table)
print(f"\n卡方检验结果:")
print(f"卡方值: {chi2:.4f}")
print(f"P值: {p:.4f}")
# 3. t检验(均值比较)
# 比较两个版本的平均访问时长
time_A = np.random.normal(120, 30, n) # 版本A平均120秒
time_B = np.random.normal(135, 35, n) # 版本B平均135秒
from scipy.stats import ttest_ind
t_stat, p_val = ttest_ind(time_A, time_B)
print(f"\nt检验结果:")
print(f"t统计量: {t_stat:.4f}")
print(f"P值: {p_val:.4f}")
第五部分:机器学习基础与应用
5.1 特征选择与降维
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.decomposition import PCA
# 创建示例数据集
X = np.random.rand(100, 10) # 100个样本,10个特征
y = np.random.randint(0, 2, 100) # 二分类标签
# 1. 基于统计的特征选择
selector = SelectKBest(score_func=f_classif, k=5)
X_selected = selector.fit_transform(X, y)
print("选择的特征数量:", X_selected.shape[1])
# 2. 主成分分析(PCA)
pca = PCA(n_components=3)
X_pca = pca.fit_transform(X)
print("PCA降维后的方差解释率:", pca.explained_variance_ratio_)
# 3. 特征重要性(随机森林)
from sklearn.ensemble import RandomForestClassifier
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
importances = rf.feature_importances_
print("\n特征重要性:")
for i, imp in enumerate(importances):
print(f"特征{i}: {imp:.4f}")
5.2 分类与回归模型
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score
# 分类示例:预测用户是否购买
# 准备数据
X_class = np.random.rand(200, 5)
y_class = (X_class[:, 0] + X_class[:, 1] > 0.5).astype(int) # 简单规则生成标签
# 训练分类模型
X_train, X_test, y_train, y_test = train_test_split(X_class, y_class, test_size=0.2, random_state=42)
lr = LogisticRegression()
lr.fit(X_train, y_train)
y_pred = lr.predict(X_test)
print("分类模型性能:")
print(f"准确率: {accuracy_score(y_test, y_pred):.4f}")
# 回归示例:预测销售额
X_reg = np.random.rand(200, 3)
y_reg = 1000 + 50*X_reg[:, 0] + 30*X_reg[:, 1] + 10*X_reg[:, 2] + np.random.normal(0, 50, 200)
X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(X_reg, y_reg, test_size=0.2, random_state=42)
lin_reg = LinearRegression()
lin_reg.fit(X_train_reg, y_train_reg)
y_pred_reg = lin_reg.predict(X_test_reg)
print("\n回归模型性能:")
print(f"均方误差: {mean_squared_error(y_test_reg, y_pred_reg):.2f}")
print(f"R²分数: {r2_score(y_test_reg, y_pred_reg):.4f}")
5.3 模型评估与调优
from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
# 1. 交叉验证
cv_scores = cross_val_score(lr, X_class, y_class, cv=5)
print("交叉验证得分:", cv_scores)
print("平均得分:", cv_scores.mean())
# 2. 网格搜索调优
param_grid = {
'C': [0.1, 1, 10],
'penalty': ['l1', 'l2'],
'solver': ['liblinear']
}
grid_search = GridSearchCV(LogisticRegression(), param_grid, cv=5)
grid_search.fit(X_train, y_train)
print("\n最佳参数:", grid_search.best_params_)
print("最佳得分:", grid_search.best_score_)
# 3. 详细评估报告
y_pred_best = grid_search.predict(X_test)
print("\n分类报告:")
print(classification_report(y_test, y_pred_best))
# 混淆矩阵可视化
cm = confusion_matrix(y_test, y_pred_best)
plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('混淆矩阵')
plt.xlabel('预测值')
plt.ylabel('真实值')
plt.show()
第六部分:实战案例:电商销售分析
6.1 案例背景与数据准备
# 模拟电商销售数据
np.random.seed(42)
n = 10000 # 10000条销售记录
# 生成数据
data = {
'订单ID': range(10001, 10001 + n),
'用户ID': np.random.randint(1, 1000, n),
'产品ID': np.random.randint(1, 50, n),
'购买日期': pd.date_range('2023-01-01', periods=n, freq='H')[:n],
'数量': np.random.randint(1, 10, n),
'单价': np.random.uniform(10, 1000, n),
'地区': np.random.choice(['北京', '上海', '广州', '深圳', '成都'], n),
'支付方式': np.random.choice(['支付宝', '微信', '信用卡', '现金'], n),
'是否促销': np.random.choice([0, 1], n, p=[0.7, 0.3])
}
df_sales = pd.DataFrame(data)
df_sales['总价'] = df_sales['数量'] * df_sales['单价']
# 添加用户信息
users = pd.DataFrame({
'用户ID': range(1, 1001),
'年龄': np.random.randint(18, 60, 1000),
'性别': np.random.choice(['男', '女'], 1000),
'注册日期': pd.date_range('2022-01-01', periods=1000, freq='D'),
'会员等级': np.random.choice(['普通', '黄金', '白金'], 1000, p=[0.6, 0.3, 0.1])
})
# 合并数据
df = pd.merge(df_sales, users, on='用户ID', how='left')
print("数据概览:")
print(df.head())
print(f"\n数据形状: {df.shape}")
6.2 销售趋势分析
# 1. 按时间分析销售趋势
df['月份'] = df['购买日期'].dt.month
df['季度'] = df['购买日期'].dt.quarter
df['星期'] = df['购买日期'].dt.dayofweek
# 月度销售趋势
monthly_sales = df.groupby('月份')['总价'].sum()
plt.figure(figsize=(10, 6))
monthly_sales.plot(kind='bar', color='skyblue')
plt.title('月度销售趋势')
plt.xlabel('月份')
plt.ylabel('总销售额')
plt.xticks(rotation=0)
plt.show()
# 2. 按地区分析
region_sales = df.groupby('地区')['总价'].sum().sort_values(ascending=False)
plt.figure(figsize=(8, 5))
region_sales.plot(kind='pie', autopct='%1.1f%%')
plt.title('各地区销售额占比')
plt.ylabel('')
plt.show()
# 3. 按产品类别分析(假设产品ID对应类别)
# 这里我们简单映射
product_category = {i: f'类别{np.random.randint(1, 6)}' for i in range(1, 51)}
df['产品类别'] = df['产品ID'].map(product_category)
category_sales = df.groupby('产品类别')['总价'].sum().sort_values(ascending=False)
plt.figure(figsize=(10, 6))
category_sales.plot(kind='bar', color='lightgreen')
plt.title('各产品类别销售额')
plt.xlabel('产品类别')
plt.ylabel('总销售额')
plt.xticks(rotation=45)
plt.show()
6.3 用户行为分析
# 1. 用户购买频率分析
user_purchase_count = df.groupby('用户ID').size()
plt.figure(figsize=(10, 6))
plt.hist(user_purchase_count, bins=20, color='orange', edgecolor='black')
plt.title('用户购买频率分布')
plt.xlabel('购买次数')
plt.ylabel('用户数量')
plt.show()
# 2. RFM分析(最近购买时间、购买频率、购买金额)
# 计算每个用户的RFM指标
current_date = df['购买日期'].max()
rfm = df.groupby('用户ID').agg({
'购买日期': lambda x: (current_date - x.max()).days, # Recency
'订单ID': 'count', # Frequency
'总价': 'sum' # Monetary
}).rename(columns={'购买日期': 'Recency', '订单ID': 'Frequency', '总价': 'Monetary'})
# RFM评分(1-5分)
rfm['R_Score'] = pd.qcut(rfm['Recency'], 5, labels=[5, 4, 3, 2, 1]) # 最近购买得分高
rfm['F_Score'] = pd.qcut(rfm['Frequency'].rank(method='first'), 5, labels=[1, 2, 3, 4, 5])
rfm['M_Score'] = pd.qcut(rfm['Monetary'], 5, labels=[1, 2, 3, 4, 5])
# 合并RFM分数
rfm['RFM_Score'] = rfm['R_Score'].astype(str) + rfm['F_Score'].astype(str) + rfm['M_Score'].astype(str)
# 客户分群
def segment_rfm(row):
r, f, m = int(row['R_Score']), int(row['F_Score']), int(row['M_Score'])
if r >= 4 and f >= 4 and m >= 4:
return '重要价值客户'
elif r >= 4 and f >= 3:
return '重要保持客户'
elif m >= 4:
return '重要发展客户'
elif r <= 2 and f <= 2 and m <= 2:
return '流失客户'
else:
return '一般客户'
rfm['Segment'] = rfm.apply(segment_rfm, axis=1)
print("RFM分析结果:")
print(rfm['Segment'].value_counts())
# 可视化客户分群
plt.figure(figsize=(10, 6))
rfm['Segment'].value_counts().plot(kind='bar', color='purple')
plt.title('客户分群分布')
plt.xlabel('客户类型')
plt.ylabel('用户数量')
plt.xticks(rotation=45)
plt.show()
6.4 预测模型:用户购买预测
# 1. 特征工程
# 创建用户特征
user_features = df.groupby('用户ID').agg({
'总价': ['sum', 'mean', 'std'],
'数量': 'sum',
'购买日期': ['min', 'max', 'nunique'],
'是否促销': 'mean',
'地区': lambda x: x.mode()[0] if len(x.mode()) > 0 else '其他'
}).reset_index()
# 展平列名
user_features.columns = ['用户ID', '总消费', '平均消费', '消费标准差', '总数量',
'首次购买', '最近购买', '购买天数', '促销比例', '主要地区']
# 计算购买间隔
user_features['购买间隔'] = (user_features['最近购买'] - user_features['首次购买']).dt.days
# 添加用户属性
user_features = pd.merge(user_features, users, on='用户ID', how='left')
# 创建目标变量:未来30天是否购买
current_date = df['购买日期'].max()
future_date = current_date + pd.Timedelta(days=30)
future_purchases = df[df['购买日期'] > current_date]['用户ID'].unique()
user_features['未来购买'] = user_features['用户ID'].isin(future_purchases).astype(int)
# 2. 准备训练数据
X = user_features.drop(['用户ID', '首次购买', '最近购买', '未来购买'], axis=1)
y = user_features['未来购买']
# 处理分类变量
X = pd.get_dummies(X, columns=['主要地区', '性别', '会员等级'], drop_first=True)
# 处理缺失值
X = X.fillna(X.mean())
# 3. 训练模型
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
# 4. 模型评估
y_pred = rf_model.predict(X_test)
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]
print("模型性能:")
print(classification_report(y_test, y_pred))
print(f"ROC AUC分数: {roc_auc_score(y_test, y_pred_proba):.4f}")
# 5. 特征重要性可视化
feature_importance = pd.DataFrame({
'特征': X.columns,
'重要性': rf_model.feature_importances_
}).sort_values('重要性', ascending=False)
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance.head(10), x='重要性', y='特征', palette='viridis')
plt.title('特征重要性TOP10')
plt.xlabel('重要性')
plt.show()
第七部分:性能优化与大数据处理
7.1 数据处理性能优化
# 1. 向量化操作 vs 循环
import time
# 创建大数据集
large_data = pd.DataFrame({
'A': np.random.rand(1000000),
'B': np.random.rand(1000000)
})
# 方法1:使用循环(慢)
start = time.time()
result_loop = []
for i in range(len(large_data)):
result_loop.append(large_data['A'].iloc[i] * large_data['B'].iloc[i])
time_loop = time.time() - start
# 方法2:使用向量化操作(快)
start = time.time()
result_vectorized = large_data['A'] * large_data['B']
time_vectorized = time.time() - start
print(f"循环耗时: {time_loop:.4f}秒")
print(f"向量化耗时: {time_vectorized:.4f}秒")
print(f"加速比: {time_loop/time_vectorized:.2f}倍")
# 2. 使用apply函数优化
def complex_operation(x):
# 模拟复杂计算
return np.log(x['A'] + x['B']) if x['A'] > 0.5 else x['A'] * x['B']
# 方法1:apply(较慢)
start = time.time()
result_apply = large_data.apply(complex_operation, axis=1)
time_apply = time.time() - start
# 方法2:使用numpy向量化(快)
def vectorized_operation(row):
# 这里使用numpy的向量化操作
return np.where(row['A'] > 0.5, np.log(row['A'] + row['B']), row['A'] * row['B'])
# 使用numpy的apply_along_axis(更快)
start = time.time()
result_numpy = np.apply_along_axis(vectorized_operation, 1, large_data.values)
time_numpy = time.time() - start
print(f"\napply耗时: {time_apply:.4f}秒")
print(f"numpy向量化耗时: {time_numpy:.4f}秒")
print(f"加速比: {time_apply/time_numpy:.2f}倍")
7.2 大数据处理技巧
# 1. 使用chunksize处理大文件
def process_large_file(file_path, chunk_size=10000):
"""分块读取大文件并处理"""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 处理每个块
processed_chunk = chunk.dropna().reset_index(drop=True)
chunks.append(processed_chunk)
# 合并所有块
result = pd.concat(chunks, ignore_index=True)
return result
# 2. 使用Dask处理大数据(需要安装dask)
try:
import dask.dataframe as dd
# 创建Dask DataFrame
dask_df = dd.from_pandas(df, npartitions=4)
# 执行延迟计算
result = dask_df.groupby('地区')['总价'].sum().compute()
print("Dask处理结果:")
print(result)
except ImportError:
print("需要安装dask: pip install dask[complete]")
# 3. 使用内存优化技巧
def optimize_memory(df):
"""优化DataFrame内存使用"""
# 转换数据类型以减少内存
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
else:
if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
df[col] = df[col].astype(np.float16)
elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
return df
# 优化内存
df_optimized = optimize_memory(df.copy())
print(f"\n原始内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"优化后内存使用: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
第八部分:自动化分析与报告生成
8.1 自动化数据处理流程
import os
import json
from datetime import datetime
class AutomatedAnalysis:
"""自动化数据分析类"""
def __init__(self, data_path, config_path=None):
self.data_path = data_path
self.config = self.load_config(config_path) if config_path else {}
self.df = None
self.results = {}
def load_config(self, config_path):
"""加载配置文件"""
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
def load_data(self):
"""加载数据"""
if self.data_path.endswith('.csv'):
self.df = pd.read_csv(self.data_path)
elif self.data_path.endswith('.xlsx'):
self.df = pd.read_excel(self.data_path)
else:
raise ValueError("不支持的文件格式")
print(f"数据加载完成,共{len(self.df)}行")
return self.df
def clean_data(self):
"""数据清洗"""
# 处理缺失值
for col in self.df.columns:
if self.df[col].dtype == 'object':
self.df[col] = self.df[col].fillna('未知')
else:
self.df[col] = self.df[col].fillna(self.df[col].median())
# 处理重复值
self.df.drop_duplicates(inplace=True)
print(f"数据清洗完成,剩余{len(self.df)}行")
return self.df
def analyze(self):
"""执行分析"""
# 基础统计
self.results['basic_stats'] = self.df.describe().to_dict()
# 分组分析(如果有分组配置)
if 'group_by' in self.config:
group_col = self.config['group_by']
if group_col in self.df.columns:
self.results['group_analysis'] = self.df.groupby(group_col).agg({
col: 'mean' for col in self.df.select_dtypes(include=[np.number]).columns
}).to_dict()
# 相关性分析
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
self.results['correlation'] = self.df[numeric_cols].corr().to_dict()
return self.results
def generate_report(self, output_path):
"""生成HTML报告"""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>数据分析报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; }}
h1 {{ color: #2c3e50; }}
h2 {{ color: #34495e; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.section {{ margin-bottom: 30px; }}
</style>
</head>
<body>
<h1>自动化数据分析报告</h1>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<div class="section">
<h2>数据概览</h2>
<p>数据行数: {len(self.df)}</p>
<p>数据列数: {len(self.df.columns)}</p>
</div>
<div class="section">
<h2>基本统计</h2>
{self._dict_to_html_table(self.results.get('basic_stats', {}))}
</div>
<div class="section">
<h2>分组分析</h2>
{self._dict_to_html_table(self.results.get('group_analysis', {}))}
</div>
<div class="section">
<h2>相关性分析</h2>
{self._dict_to_html_table(self.results.get('correlation', {}))}
</div>
</body>
</html>
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"报告已生成: {output_path}")
def _dict_to_html_table(self, data_dict):
"""将字典转换为HTML表格"""
if not data_dict:
return "<p>无数据</p>"
# 获取所有键
all_keys = set()
for key, value in data_dict.items():
if isinstance(value, dict):
all_keys.update(value.keys())
else:
all_keys.add(key)
# 构建表格
html = "<table><tr><th>指标</th>"
for key in all_keys:
html += f"<th>{key}</th>"
html += "</tr>"
for row_key, row_value in data_dict.items():
html += f"<tr><td>{row_key}</td>"
for col_key in all_keys:
if isinstance(row_value, dict):
val = row_value.get(col_key, '')
else:
val = row_value if col_key == row_key else ''
html += f"<td>{val:.4f if isinstance(val, (int, float)) else val}</td>"
html += "</tr>"
html += "</table>"
return html
# 使用示例
# 创建配置文件
config = {
"group_by": "地区",
"analysis_type": "descriptive"
}
# 保存配置文件
with open('analysis_config.json', 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 创建分析实例
analyzer = AutomatedAnalysis('sales_data.csv', 'analysis_config.json')
# analyzer.load_data() # 假设有数据文件
# analyzer.clean_data()
# analyzer.analyze()
# analyzer.generate_report('analysis_report.html')
8.2 使用Jupyter Notebook自动化报告
# 在Jupyter Notebook中生成自动化报告的示例代码
import nbformat
from nbformat.v4 import new_notebook, new_code_cell, new_markdown_cell
def create_analysis_notebook(data_path, output_path):
"""创建自动化分析的Jupyter Notebook"""
# 创建新Notebook
nb = new_notebook()
# 添加标题和说明
nb.cells.append(new_markdown_cell(f"# 自动化数据分析报告\n\n数据文件: {data_path}\n\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"))
# 添加导入代码
import_code = """
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
"""
nb.cells.append(new_code_cell(import_code))
# 添加数据加载代码
load_code = f"""
# 加载数据
df = pd.read_csv('{data_path}')
print(f"数据加载完成,共{{len(df)}}行,{{len(df.columns)}}列")
print("\\n数据概览:")
print(df.head())
"""
nb.cells.append(new_code_cell(load_code))
# 添加数据清洗代码
clean_code = """
# 数据清洗
# 处理缺失值
for col in df.columns:
if df[col].dtype == 'object':
df[col] = df[col].fillna('未知')
else:
df[col] = df[col].fillna(df[col].median())
# 处理重复值
df.drop_duplicates(inplace=True)
print(f"数据清洗完成,剩余{{len(df)}}行")
"""
nb.cells.append(new_code_cell(clean_code))
# 添加分析代码
analysis_code = """
# 基础统计分析
print("基础统计:")
print(df.describe())
# 数值型列的分布
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
fig, axes = plt.subplots(1, len(numeric_cols), figsize=(15, 4))
for i, col in enumerate(numeric_cols):
df[col].hist(ax=axes[i], bins=20, color='skyblue', edgecolor='black')
axes[i].set_title(f'{col}分布')
plt.tight_layout()
plt.show()
# 分类变量分析
object_cols = df.select_dtypes(include=['object']).columns
if len(object_cols) > 0:
for col in object_cols:
if df[col].nunique() <= 10: # 只分析类别较少的变量
plt.figure(figsize=(8, 5))
df[col].value_counts().plot(kind='bar', color='lightgreen')
plt.title(f'{col}分布')
plt.xticks(rotation=45)
plt.show()
"""
nb.cells.append(new_code_cell(analysis_code))
# 保存Notebook
with open(output_path, 'w', encoding='utf-8') as f:
nbformat.write(nb, f)
print(f"Jupyter Notebook已生成: {output_path}")
# 使用示例
# create_analysis_notebook('sales_data.csv', 'auto_analysis.ipynb')
第九部分:实战项目:用户流失预测
9.1 项目背景与数据准备
# 模拟用户行为数据
np.random.seed(42)
n_users = 5000
# 用户基本信息
user_data = pd.DataFrame({
'用户ID': range(1, n_users + 1),
'注册日期': pd.date_range('2022-01-01', periods=n_users, freq='D'),
'年龄': np.random.randint(18, 60, n_users),
'性别': np.random.choice(['男', '女'], n_users),
'地区': np.random.choice(['北京', '上海', '广州', '深圳', '成都'], n_users),
'会员等级': np.random.choice(['普通', '黄金', '白金'], n_users, p=[0.6, 0.3, 0.1])
})
# 用户行为数据
behavior_data = []
for user_id in range(1, n_users + 1):
# 每个用户的行为次数
n_actions = np.random.poisson(10)
for _ in range(n_actions):
action_date = pd.Timestamp('2023-01-01') + pd.Timedelta(days=np.random.randint(0, 365))
action_type = np.random.choice(['登录', '浏览', '搜索', '购买', '评价'],
p=[0.3, 0.3, 0.2, 0.15, 0.05])
action_value = np.random.uniform(10, 1000) if action_type == '购买' else 0
behavior_data.append({
'用户ID': user_id,
'行为日期': action_date,
'行为类型': action_type,
'行为价值': action_value
})
behavior_df = pd.DataFrame(behavior_data)
# 合并数据
df = pd.merge(user_data, behavior_df, on='用户ID', how='left')
# 创建目标变量:是否流失(30天内无行为)
current_date = df['行为日期'].max()
df['最后行为日期'] = df.groupby('用户ID')['行为日期'].transform('max')
df['流失'] = ((current_date - df['最后行为日期']).dt.days > 30).astype(int)
print("数据概览:")
print(df.head())
print(f"\n数据形状: {df.shape}")
print(f"流失用户比例: {df['流失'].mean():.2%}")
9.2 特征工程
# 1. 用户行为特征
user_features = df.groupby('用户ID').agg({
'行为日期': ['min', 'max', 'nunique'],
'行为类型': lambda x: x.mode()[0] if len(x.mode()) > 0 else '其他',
'行为价值': ['sum', 'mean', 'std'],
'流失': 'first'
}).reset_index()
# 展平列名
user_features.columns = ['用户ID', '首次行为', '最后行为', '活跃天数',
'主要行为', '总价值', '平均价值', '价值标准差', '是否流失']
# 计算用户生命周期
user_features['生命周期'] = (user_features['最后行为'] - user_features['首次行为']).dt.days
# 2. 时间窗口特征
# 最近30天行为
recent_30 = df[df['行为日期'] > current_date - pd.Timedelta(days=30)]
recent_features = recent_30.groupby('用户ID').agg({
'行为日期': 'nunique',
'行为类型': 'count',
'行为价值': 'sum'
}).reset_index()
recent_features.columns = ['用户ID', '最近30天活跃天数', '最近30天行为次数', '最近30天总价值']
# 合并特征
user_features = pd.merge(user_features, recent_features, on='用户ID', how='left')
user_features.fillna(0, inplace=True)
# 3. 添加用户属性
user_features = pd.merge(user_features, user_data[['用户ID', '年龄', '性别', '地区', '会员等级']],
on='用户ID', how='left')
# 4. 特征编码
# 分类变量编码
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
user_features['性别编码'] = le.fit_transform(user_features['性别'])
user_features['地区编码'] = le.fit_transform(user_features['地区'])
user_features['会员等级编码'] = le.fit_transform(user_features['会员等级'])
user_features['主要行为编码'] = le.fit_transform(user_features['主要行为'])
print("特征工程完成,特征数量:", len(user_features.columns))
print("\n特征列表:")
print(user_features.columns.tolist())
9.3 模型训练与评估
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
from sklearn.model_selection import train_test_split, cross_val_score
import xgboost as xgb
# 准备特征和标签
X = user_features.drop(['用户ID', '是否流失', '首次行为', '最后行为',
'性别', '地区', '会员等级', '主要行为'], axis=1)
y = user_features['是否流失']
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")
print(f"训练集流失比例: {y_train.mean():.2%}")
print(f"测试集流失比例: {y_test.mean():.2%}")
# 1. 随机森林
rf_model = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
rf_model.fit(X_train, y_train)
# 2. 梯度提升
gb_model = GradientBoostingClassifier(n_estimators=100, random_state=42)
gb_model.fit(X_train, y_train)
# 3. XGBoost
xgb_model = xgb.XGBClassifier(n_estimators=100, random_state=42, eval_metric='logloss')
xgb_model.fit(X_train, y_train)
# 模型评估
models = {'随机森林': rf_model, '梯度提升': gb_model, 'XGBoost': xgb_model}
results = {}
for name, model in models.items():
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
results[name] = {
'准确率': accuracy_score(y_test, y_pred),
'ROC AUC': roc_auc_score(y_test, y_pred_proba),
'分类报告': classification_report(y_test, y_pred, output_dict=True)
}
print(f"\n{name}模型性能:")
print(f"准确率: {results[name]['准确率']:.4f}")
print(f"ROC AUC: {results[name]['ROC AUC']:.4f}")
print("分类报告:")
print(classification_report(y_test, y_pred))
# 模型比较可视化
model_names = list(results.keys())
accuracies = [results[name]['准确率'] for name in model_names]
aucs = [results[name]['ROC AUC'] for name in model_names]
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
axes[0].bar(model_names, accuracies, color=['skyblue', 'lightgreen', 'orange'])
axes[0].set_title('模型准确率比较')
axes[0].set_ylabel('准确率')
axes[0].set_ylim(0, 1)
axes[1].bar(model_names, aucs, color=['skyblue', 'lightgreen', 'orange'])
axes[1].set_title('模型ROC AUC比较')
axes[1].set_ylabel('ROC AUC')
axes[1].set_ylim(0, 1)
plt.tight_layout()
plt.show()
9.4 模型解释与业务洞察
# 1. 特征重要性分析
feature_importance = pd.DataFrame({
'特征': X.columns,
'重要性': xgb_model.feature_importances_
}).sort_values('重要性', ascending=False)
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance.head(15), x='重要性', y='特征', palette='viridis')
plt.title('XGBoost特征重要性TOP15')
plt.xlabel('重要性')
plt.show()
# 2. SHAP值分析(需要安装shap)
try:
import shap
# 创建SHAP解释器
explainer = shap.TreeExplainer(xgb_model)
shap_values = explainer.shap_values(X_test)
# 可视化
plt.figure(figsize=(10, 6))
shap.summary_plot(shap_values, X_test, plot_type="bar", show=False)
plt.title('SHAP特征重要性')
plt.tight_layout()
plt.show()
# 详细SHAP图
plt.figure(figsize=(10, 6))
shap.summary_plot(shap_values, X_test, show=False)
plt.title('SHAP摘要图')
plt.tight_layout()
plt.show()
except ImportError:
print("需要安装shap: pip install shap")
# 3. 业务洞察
print("\n=== 业务洞察 ===")
print("1. 最重要的流失预测特征:")
top_features = feature_importance.head(5)
for _, row in top_features.iterrows():
print(f" - {row['特征']}: 重要性 {row['重要性']:.4f}")
print("\n2. 关键发现:")
print(" - 最近30天活跃天数是预测流失的最重要指标")
print(" - 用户生命周期和总价值也是重要特征")
print(" - 会员等级对流失预测有一定影响")
print("\n3. 建议措施:")
print(" - 针对最近30天不活跃的用户进行重点挽留")
print(" - 提高高价值用户的活跃度")
print(" - 优化会员权益,提高会员留存率")
第十部分:总结与进阶学习路径
10.1 课程回顾
通过本课程,我们系统地学习了:
- 基础技能:数据处理、清洗、探索性分析
- 进阶技能:特征工程、统计分析、机器学习建模
- 实战能力:电商销售分析、用户流失预测等真实业务场景
- 优化技巧:性能优化、大数据处理、自动化分析
10.2 进阶学习建议
- 深度学习:学习TensorFlow/PyTorch,用于复杂模式识别
- 大数据技术:学习Spark、Hadoop处理超大规模数据
- 实时分析:学习流数据处理(Kafka、Flink)
- 部署能力:学习Flask/Django部署数据分析应用
- 领域知识:结合具体行业(金融、医疗、电商)深化业务理解
10.3 持续学习资源
- 书籍:《Python数据分析实战》、《利用Python进行数据分析》
- 在线课程:Coursera、Udacity的数据科学专项课程
- 社区:Kaggle、GitHub、Stack Overflow
- 实践项目:参与Kaggle竞赛、开源项目贡献
10.4 最终建议
数据分析不仅是技术,更是业务理解的桥梁。建议:
- 多实践:用真实数据解决实际问题
- 多交流:参与社区讨论,学习他人经验
- 多思考:不仅关注技术实现,更要理解业务逻辑
- 多总结:定期回顾学习成果,形成自己的知识体系
通过本课程的学习,你已经具备了从入门到精通的Python数据分析能力。记住,数据分析的终极目标是解决业务难题,创造价值。祝你在数据分析的道路上不断进步,成为真正的数据专家!
