引言:为什么需要Python数据分析进阶技能?
在当今数据驱动的商业环境中,Python已经成为数据分析领域的标准工具。然而,许多初学者在掌握了基础的Pandas和Matplotlib后,往往在面对真实工作场景时感到力不从心。本课程将带你从入门到精通,掌握高级技巧,解决实际工作难题,从而显著提升职场竞争力。
学习目标
- 掌握Python数据分析的核心高级技巧
- 学会处理大规模数据集和复杂数据结构
- 提升数据清洗、转换和可视化的效率
- 学会使用高级库解决实际业务问题
- 构建可复用的数据分析工作流
第一部分:Pandas高级操作与性能优化
1.1 高效处理大规模数据集
向量化操作 vs 循环
在处理大规模数据时,向量化操作是提升性能的关键。向量化操作利用了底层的C语言实现,比Python循环快得多。
错误示例(使用循环):
import pandas as pd
import numpy as np
# 创建一个包含100万行的DataFrame
df = pd.DataFrame({'A': np.random.randint(0, 100, 1000000),
'B': np.random.randint(0, 100, 1000000)})
# 使用循环计算两列之和(慢)
def sum_with_loop(df):
result = []
for i in range(len(df)):
result.append(df['A'].iloc[i] + df['B'].iloc[i])
return result
# 这将非常慢,可能需要几秒钟甚至更长时间
# sum_with_loop(df)
正确示例(向量化操作):
# 使用向量化操作计算两列之和(快)
df['sum'] = df['A'] + df['B']
# 或者使用NumPy的向量化函数
df['sum'] = np.add(df['A'], df['B'])
使用query()方法进行高效过滤
query()方法可以使用字符串表达式进行过滤,比传统的布尔索引更简洁且性能更好。
# 传统方法
filtered_df = df[(df['A'] > 50) & (df['B'] < 80)]
# 使用query方法
filtered_df = df.query('A > 50 and B < 100')
使用eval()进行高效计算
eval()方法可以使用字符串表达式进行计算,避免创建中间DataFrame。
# 传统方法(创建中间DataFrame)
df['C'] = df['A'] + df['B']
df['D'] = df['A'] - df['B']
df['E'] = df['C'] * df['D']
# 使用eval方法(避免中间DataFrame)
df.eval('''
C = A + B
D = A - B
E = C * D
''', inplace=True)
1.2 内存优化技巧
选择合适的数据类型
Pandas默认使用64位数据类型,但很多时候我们可以使用更小的类型来节省内存。
# 查看当前内存使用情况
print(df.info())
# 优化数据类型
df['A'] = df['A'].astype('int32') # 从int64转换为int32
df['B'] = df['B'].astype('int32')
# 对于分类数据,使用category类型
df['category'] = pd.Categorical(['A', 'B', 'C'] * 333333)
# 对于浮点数,如果精度要求不高,可以使用float32
df['float_col'] = df['A'].astype('float32')
使用select_dtypes()批量转换
# 批量转换整数列
int_cols = df.select_dtypes(include=['int64']).columns
df[int_cols] = df[int_cols].astype('int32')
# 批量转换浮点列
float_cols = df.select_dtypes(include=['float64']).columns
df[float_cols] = df[float_cols].astype('float32')
1.3 高级数据合并与连接
多条件合并
# 创建示例数据
left = pd.DataFrame({'key': ['A', 'B', 'C', 'D'],
'value': [1, 2, 3, 4],
'category': ['X', 'Y', 'X', 'Y']})
right = pd.DataFrame({'key': ['A', 'B', 'C', 'D'],
'value': [5, 6, 7, 8],
'category': ['X', 'Y', 'X', 'Z']})
# 多条件合并
merged = pd.merge(left, right, on=['key', 'category'], how='inner')
print(merged)
使用join()进行索引合并
# 设置索引
left.set_index('key', inplace=True)
right.set_index('key', inplace=True)
# 使用join合并
joined = left.join(right, lsuffix='_left', rsuffix='_right')
print(joined)
使用concat()进行纵向合并
# 创建多个DataFrame
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
df3 = pd.DataFrame({'A': [9, 10], 'B': [11, 12]})
# 纵向合并
combined = pd.concat([df1, df2, df3], ignore_index=True)
print(combined)
第二部分:高级数据清洗与转换
2.1 处理复杂缺失值
多策略填充
# 创建包含复杂缺失值的DataFrame
df = pd.DataFrame({
'A': [1, 2, np.nan, 4, 5],
'B': [np.nan, 2, 3, np.nan, 5],
'C': [1, 2, 3, 4, 5],
'D': ['cat', 'dog', np.nan, 'bird', 'fish']
})
# 1. 使用不同策略填充不同列
df['A'].fillna(df['A'].mean(), inplace=True) # 数值列用均值填充
df['B'].fillna(method='ffill', inplace=True) # 用前一个值填充
df['D'].fillna('unknown', inplace=True) # 分类列用特定值填充
# 2. 使用插值方法
df['A'] = df['A'].interpolate(method='linear') # 线性插值
# 3. 使用KNN填充(需要sklearn)
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=2)
df[['A', 'B', 'C']] = imputer.fit_transform(df[['A', 'B', 'C']])
2.2 高级数据转换
使用transform()进行分组转换
# 创建示例数据
df = pd.DataFrame({
'group': ['A', 'A', 'B', 'B', 'C', 'C'],
'value': [10, 20, 30, 40, 50, 60]
})
# 计算每组的均值并减去均值(中心化)
df['value_centered'] = df.groupby('group')['value'].transform(lambda x: x - x.mean())
# 计算每组的百分比
df['value_pct'] = df.groupby('group')['value'].transform(lambda x: x / x.sum() * 100)
print(df)
使用apply()进行复杂转换
# 复杂条件转换
def complex_transform(row):
if row['value'] > 30:
return 'High'
elif row['value'] > 15:
'Medium'
else:
return 'Low'
df['category'] = df.apply(complex_transform, axis=1)
2.3 时间序列处理
时间特征提取
# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=10, freq='D')
df = pd.DataFrame({
'date': dates,
'value': np.random.randint(10, 100, 10)
})
# 提取时间特征
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.weekday # 0=周一, 6=周日
df['is_weekend'] = df['weekday'].isin([5, 6])
df['quarter'] = df['date'].dt.quarter
df['day_of_year'] = df['date'].dt.dayofyear
# 计算时间差
df['days_since_start'] = (df['date'] - df['date'].min()).dt.days
print(df)
时间重采样
# 创建分钟级数据
minute_df = pd.DataFrame({
'datetime': pd.date_range('2023-01-01', periods=100, freq='T'),
'value': np.random.randint(10, 100, 100)
})
minute_df.set_index('datetime', inplace=True)
# 重采样为小时数据
hourly = minute_df.resample('H').mean()
# 重采样为天数据
daily = minute_df.resample('D').sum()
# 重采样为周数据,使用自定义的周起始
weekly = minute_df.resample('W-MON').agg({'value': ['mean', 'sum', 'std']})
第三部分:高级数据可视化
3.1 Matplotlib高级技巧
子图网格与复杂布局
import matplotlib.pyplot as plt
import seaborn as sns
# 创建复杂布局
fig = plt.figure(figsize=(12, 8))
# 创建子图网格
gs = fig.add_gridspec(3, 3)
# 添加不同大小的子图
ax1 = fig.add_subplot(gs[0, :]) # 第一行,占满
ax2 = fig.add_subplot(gs[1, 0]) # 第二行第一列
ax3 = fig.add_subplot(gs[1, 1:]) # 第二行,后两列
ax4 = fig.add_subplot(gs[2, :2]) # 第三行,前两列
ax5 = fig.add_subplot(gs[2, 2]) # 第三行第三列
# 绘制数据
data = np.random.randn(100, 4)
ax1.plot(data[:, 0])
ax2.scatter(data[:, 1], data[:, 2])
ax3.hist(data[:, 3], bins=20)
ax4.boxplot(data)
ax5.violinplot(data)
plt.tight_layout()
plt.show()
自定义颜色映射
from matplotlib.colors import LinearSegmentedColormap
# 创建自定义颜色映射
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7']
custom_cmap = LinearSegmentedColormap.from_list('custom', colors)
# 使用自定义颜色映射
plt.imshow(np.random.rand(10, 10), cmap=custom_cmap)
plt.colorbar()
plt.show()
3.2 Seaborn高级可视化
复杂关系图
# 创建示例数据
tips = sns.load_dataset('tips')
tips['tip_pct'] = tips['tip'] / tips['total_bill'] * 100
# 1. PairGrid用于自定义关系图
g = sns.PairGrid(tips, vars=['total_bill', 'tip', 'tip_pct'], hue='time')
g.map_upper(sns.scatterplot)
g.map_lower(sns.kdeplot, fill=True)
g.map_diag(sns.histplot, kde=True)
g.add_legend()
# 2. 复杂热力图
corr = tips[['total_bill', 'tip', 'size']].corr()
mask = np.triu(np.ones_like(corr, dtype=bool))
sns.heatmap(corr, mask=mask, annot=True, cmap='coolwarm', center=0)
plt.show()
分类数据可视化
# 创建分类数据
data = pd.DataFrame({
'category': ['A', 'B', 'C'] * 100,
'subcat': ['X', 'Y'] * 150,
'value': np.random.randn(300)
})
# 使用catplot进行复杂分类可视化
sns.catplot(data=data, x='category', y='value', hue='subcat',
kind='box', height=5, aspect=2)
plt.show()
3.3 交互式可视化
Plotly基础
import plotly.express as px
import plotly.graph_objects as go
# 创建交互式散点图
fig = px.scatter(
tips,
x='total_bill',
y='tip',
color='time',
size='size',
hover_data=['day', 'sex'],
title='小费与账单金额关系',
template='plotly_white'
)
fig.show()
# 创建交互式折线图
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df['date'],
y=df['value'],
mode='lines+markers',
name='数值趋势',
hovertemplate='%{x}<br>数值: %{y}<extra></extra>'
))
fig.update_layout(
title='时间序列趋势',
xaxis_title='日期',
yaxis_title='数值',
hovermode='x unified'
)
fig.show()
高级Plotly图表
# 创建复杂的组合图表
fig = go.Figure()
# 添加柱状图
fig.add_trace(go.Bar(
x=df['date'],
y=df['value'],
name='每日数值',
marker_color='rgba(55, 83, 109, 0.7)'
))
# 添加折线图(移动平均)
df['MA7'] = df['value'].rolling(window=7).mean()
fig.add_trace(go.Scatter(
x=df['date'],
y=df['MA7'],
mode='lines',
name='7日移动平均',
line=dict(color='red', width=3)
))
# 添加范围滑块和选择器
fig.update_layout(
xaxis=dict(
rangeselector=dict(
buttons=list([
dict(count=7, label="1w", step="day", stepmode="backward"),
dict(count=1, label="1m", step="month", stepmode="backward"),
dict(step="all")
])
),
rangeslider=dict(visible=True),
type="date"
)
)
fig.show()
第四部分:高级数据处理技巧
4.1 处理大数据集
分块处理(Chunking)
# 处理大CSV文件(无法一次性读入内存)
chunk_size = 100000
results = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
# 对每个块进行处理
processed = chunk.groupby('category').agg({
'value': ['mean', 'sum', 'count']
})
results.append(processed)
# 合并结果
final_result = pd.concat(results).groupby(level=0).sum()
使用Dask进行并行处理
import dask.dataframe as dd
# 创建Dask DataFrame(类似于Pandas,但可以并行处理)
ddf = dd.read_csv('large_file.csv')
# 执行计算(惰性求值)
result = ddf.groupby('category').value.mean().compute() # compute()触发实际计算
# 处理大于内存的数据集
ddf = dd.read_csv('large_file.csv', blocksize=25e6) # 25MB一块
# 复杂操作
result = (ddf[ddf['value'] > 100]
.groupby('category')
.agg({'value': ['mean', 'sum', 'count']})
.compute())
4.2 高级字符串处理
正则表达式应用
# 创建包含文本数据的DataFrame
text_df = pd.DataFrame({
'email': ['user1@example.com', 'user2@test.org', 'user3@company.co.uk'],
'phone': ['(123) 456-7890', '123.456.7890', '123-456-7890'],
'address': ['123 Main St, City, ST 12345', '456 Oak Ave, Town, ST 67890']
})
# 提取域名
text_df['domain'] = text_df['email'].str.extract(r'@(.+)')
# 标准化电话号码
text_df['phone_clean'] = text_df['phone'].str.replace(r'[^\d]', '', regex=True)
# 提取邮政编码
text_df['zipcode'] = text_df['address'].str.extract(r'(\d{5})')
# 复杂模式匹配
text_df['is_valid_email'] = text_df['email'].str.match(r'^[\w\.-]+@[\w\.-]+\.\w+$')
print(text_df)
文本向量化
from sklearn.feature_extraction.text import TfidfVectorizer
# 创建文本数据
documents = [
"Python is great for data analysis",
"Pandas is a powerful library",
"Data analysis requires Python skills",
"Machine learning with Python"
]
# 创建TF-IDF向量化器
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
X = vectorizer.fit_transform(documents)
# 转换为DataFrame查看
tfidf_df = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())
print(tfidf_df)
4.3 高级分组与聚合
多级索引操作
# 创建多级索引数据
arrays = [['A', 'A', 'B', 'B', 'C', 'C'],
['one', 'two', 'one', 'two', 'one', 'two']]
index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
df = pd.DataFrame({'value': [10, 20, 30, 40, 50, 60]}, index=index)
# 多级索引查询
print(df.loc[('A', 'one')]) # 查询特定元素
print(df.loc['A']) # 查询第一级
print(df.xs('two', level='second')) # 跨选择
# 多级索引聚合
print(df.groupby(level=0).sum()) # 按第一级分组
print(df.groupby(level=1).mean()) # 按第二级分组
自定义聚合函数
# 创建示例数据
df = pd.DataFrame({
'group': ['A', 'A', 'B', 'B', 'C', 'C'],
'value': [10, 20, 30, 40, 50, 60],
'weight': [1, 2, 1, 2, 1, 2]
})
# 定义自定义聚合函数
def weighted_mean(x, w):
return np.average(x, weights=w)
# 使用apply进行复杂分组计算
result = df.groupby('group').apply(
lambda x: pd.Series({
'weighted_mean': weighted_mean(x['value'], x['weight']),
'std': x['value'].std(),
'count': x['value'].count()
})
)
print(result)
第五部分:实际工作场景解决方案
5.1 销售数据分析案例
场景描述
假设你是一家零售公司的数据分析师,需要分析销售数据,找出最佳销售员、最畅销产品,并预测下季度销售趋势。
# 创建模拟销售数据
np.random.seed(42)
sales_data = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=365, freq='D'),
'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie', 'David'], 365),
'product': np.random.choice(['Laptop', 'Phone', 'Tablet', 'Monitor'], 365),
'quantity': np.random.randint(1, 10, 365),
'unit_price': np.random.randint(500, 2000, 365)
})
# 计算总销售额
sales_data['total_sales'] = sales_data['quantity'] * sales_data['unit_price']
# 1. 最佳销售员分析
top_salesperson = sales_data.groupby('salesperson')['total_sales'].sum().sort_values(ascending=False)
print("最佳销售员:")
print(top_salesperson)
# 2. 最畅销产品
top_product = sales_data.groupby('product')['quantity'].sum().sort_values(ascending=False)
print("\n最畅销产品:")
print(top_product)
# 3. 月度销售趋势
sales_data['month'] = sales_data['date'].dt.month
monthly_sales = sales_data.groupby('month')['total_sales'].sum()
print("\n月度销售趋势:")
print(monthly_sales)
# 4. 销售员-产品矩阵
pivot_table = pd.pivot_table(sales_data,
values='total_sales',
index='salesperson',
columns='product',
aggfunc='sum',
fill_value=0)
print("\n销售员-产品矩阵:")
print(pivot_table)
# 5. 预测下季度销售(简单移动平均)
sales_data['MA7'] = sales_data['total_sales'].rolling(window=7).mean()
sales_data['MA30'] = sales_data['total_sales'].rolling(window=30).mean()
last_30_days_avg = sales_data['MA30'].iloc[-1]
predicted_next_month = last_30_days_avg * 30 # 简单预测
print(f"\n下月预测销售额:{predicted_next_month:.2f}")
5.2 用户行为分析案例
场景描述
分析用户在网站上的行为数据,找出高价值用户,分析用户流失原因。
# 创建模拟用户行为数据
user_data = pd.DataFrame({
'user_id': np.random.randint(1000, 2000, 1000),
'session_date': pd.date_range('2023-01-01', periods=1000, freq='H'),
'page_views': np.random.randint(1, 20, 1000),
'time_on_site': np.random.randint(10, 600, 1000),
'purchases': np.random.randint(0, 5, 1000),
'purchase_amount': np.random.randint(0, 500, 1000)
})
# 1. RFM分析(最近购买时间、购买频率、购买金额)
# 计算每个用户的RFM指标
user_rfm = user_data.groupby('user_id').agg({
'session_date': lambda x: (pd.Timestamp.now() - x.max()).days, # Recency
'user_id': 'count', # Frequency
'purchase_amount': 'sum' # Monetary
}).rename(columns={
'session_date': 'recency',
'user_id': 'frequency',
'purchase_amount': 'monetary'
})
# RFM评分(1-5分)
user_rfm['R_score'] = pd.qcut(user_rfm['recency'], 5, labels=[5,4,3,2,1]) # 越小越好
user_rfm['F_score'] = pd.qcut(user_rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
user_rfm['M_score'] = pd.qcut(user_rfm['monetary'], 5, labels=[1,2,3,4,5])
# 计算RFM总分
user_rfm['RFM_score'] = user_rfm['R_score'].astype(str) + user_rfm['F_score'].astype(str) + user_rfm['M_score'].astype(str)
# 2. 用户分群
def segment_user(row):
score = int(row['RFM_score'])
if score >= 555:
return 'VIP'
elif score >= 444:
return 'High Value'
elif score >= 333:
return 'Medium Value'
else:
return 'Low Value'
user_rfm['segment'] = user_rfm.apply(segment_user, axis=1)
print("用户分群结果:")
print(user_rfm['segment'].value_counts())
# 3. 分析用户流失(长时间未访问)
current_date = pd.Timestamp.now()
user_data['days_since_last_visit'] = (current_date - user_data['session_date']).dt.days
churned_users = user_data[user_data['days_since_last_visit'] > 30]['user_id'].unique()
print(f"\n流失用户数量:{len(churned_users)}")
# 4. 高价值用户特征分析
vip_users = user_rfm[user_rfm['segment'] == 'VIP'].index
vip_behavior = user_data[user_data['user_id'].isin(vip_users)].agg({
'page_views': 'mean',
'time_on_site': 'mean',
'purchases': 'mean'
})
print("\nVIP用户平均行为:")
print(vip_behavior)
5.3 A/B测试分析案例
场景描述
分析网站改版A/B测试结果,判断新版本是否优于旧版本。
# 创建模拟A/B测试数据
np.random.seed(42)
ab_test_data = pd.DataFrame({
'user_id': range(10000),
'group': np.random.choice(['control', 'treatment'], 10000, p=[0.5, 0.5]),
'converted': 0
})
# 模拟转化率:对照组2%,实验组2.5%
control_mask = ab_test_data['group'] == 'control'
treatment_mask = ab_test_data['group'] == 'treatment'
ab_test_data.loc[control_mask, 'converted'] = np.random.binomial(1, 0.02, control_mask.sum())
ab_test_data.loc[treatment_mask, 'converted'] = np.random.binomial(1, 0.025, treatment_mask.sum())
# 1. 基础统计
summary = ab_test_data.groupby('group')['converted'].agg(['count', 'sum', 'mean'])
print("A/B测试结果摘要:")
print(summary)
# 2. 统计显著性检验(卡方检验)
from scipy.stats import chi2_contingency
# 创建列联表
contingency_table = pd.crosstab(ab_test_data['group'], ab_test_data['converted'])
chi2, p_value, dof, expected = chi2_contingency(contingency_table)
print(f"\n卡方检验结果:")
print(f"Chi2: {chi2:.4f}")
print(f"P-value: {p_value:.4f}")
print(f"显著性水平: 0.05")
print(f"结果: {'显著' if p_value < 0.05 else '不显著'}")
# 3. 计算提升度和置信区间
control_rate = summary.loc['control', 'mean']
treatment_rate = summary.loc['treatment', 'mean']
uplift = (treatment_rate - control_rate) / control_rate * 100
# 计算置信区间(使用正态近似)
from scipy.stats import norm
import math
def confidence_interval(successes, total, confidence=0.95):
p = successes / total
z = norm.ppf(1 - (1 - confidence) / 2)
se = math.sqrt(p * (1 - p) / total)
return (p - z * se, p + z * se)
control_ci = confidence_interval(
summary.loc['control', 'sum'],
summary.loc['control', 'count']
)
treatment_ci = confidence_interval(
summary.loc['treatment', 'sum'],
summary.loc['treatment', 'count']
)
print(f"\n转化率提升: {uplift:.2f}%")
print(f"对照组95%置信区间: [{control_ci[0]:.4f}, {control_ci[1]:.4f}]")
print(f"实验组95%置信区间: [{treatment_ci[0]:.4f}, {treatment_ci[1]:.4f}]")
# 4. 功效分析(需要statsmodels)
try:
from statsmodels.stats.power import zt_ind_solve_power
from statsmodels.stats.proportion import proportion_effectsize
effect_size = proportion_effectsize(0.025, 0.02)
power = zt_ind_solve_power(
effect_size=effect_size,
nobs1=5000,
alpha=0.05,
ratio=1.0
)
print(f"\n统计功效: {power:.4f}")
except ImportError:
print("\n需要安装statsmodels进行功效分析")
第六部分:性能优化与最佳实践
6.1 代码性能分析
使用cProfile分析性能瓶颈
import cProfile
import pstats
def slow_function():
# 模拟慢函数
df = pd.DataFrame({'A': np.random.randint(0, 100, 100000)})
df['B'] = df['A'].apply(lambda x: x**2 + np.sin(x))
return df
# 分析性能
profiler = cProfile.Profile()
profiler.enable()
slow_function()
profiler.disable()
# 输出结果
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative').print_stats(10)
使用%timeit魔法命令
# 在Jupyter中使用
# %timeit df['A'] + df['B']
# %timeit df.apply(lambda row: row['A'] + row['B'], axis=1)
6.2 内存优化最佳实践
使用astype()优化内存
def optimize_memory(df):
"""优化DataFrame内存使用"""
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
else:
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
end_mem = df.memory_usage().sum() / 1024**2
print(f"内存使用从 {start_mem:.2f} MB 降低到 {end_mem:.2f} MB")
return df
# 使用示例
df = pd.DataFrame({
'A': np.random.randint(0, 100, 100000),
'B': np.random.randn(100000)
})
df = optimize_memory(df)
6.3 代码组织与可复用性
创建可复用的分析函数
def sales_analysis_pipeline(data_path, output_path=None):
"""
销售数据分析管道
参数:
data_path: 数据文件路径
output_path: 输出路径(可选)
返回:
分析结果DataFrame
"""
# 1. 数据加载
df = pd.read_csv(data_path)
# 2. 数据清洗
df = df.dropna(subset=['sales', 'date'])
df['date'] = pd.to_datetime(df['date'])
# 3. 特征工程
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['sales'] = df['sales'].astype('float32')
# 4. 分析计算
monthly_sales = df.groupby('month')['sales'].sum()
top_products = df.groupby('product')['sales'].sum().nlargest(5)
# 5. 结果输出
result = {
'monthly_sales': monthly_sales,
'top_products': top_products,
'total_sales': df['sales'].sum()
}
if output_path:
monthly_sales.to_csv(f"{output_path}/monthly_sales.csv")
top_products.to_csv(f"{output_path}/top_products.csv")
return result
# 使用示例
# result = sales_analysis_pipeline('sales_data.csv', 'output/')
第七部分:进阶库与工具
7.1 使用Polars进行高性能分析
Polars基础
import polars as pl
# 创建Polars DataFrame(比Pandas更快)
df = pl.DataFrame({
"a": [1, 2, 3, 4, 5],
"b": ["A", "B", "A", "B", "A"],
"c": [3.1, 2.4, 5.6, 2.7, 8.8]
})
# 链式操作(惰性求值)
result = (df.filter(pl.col("a") > 2)
.groupby("b")
.agg(pl.col("c").mean())
.collect()) # collect()触发计算
print(result)
# 与Pandas互操作
pandas_df = df.to_pandas()
polars_from_pandas = pl.from_pandas(pandas_df)
Polars性能优势
# 大数据集性能对比
import time
# Pandas方式
start = time.time()
pandas_df = pd.DataFrame({
'A': np.random.randint(0, 100, 1000000),
'B': np.random.randint(0, 100, 1000000)
})
pandas_result = pandas_df.groupby('A').agg({'B': ['mean', 'sum']})
pandas_time = time.time() - start
# Polars方式
start = time.time()
polars_df = pl.DataFrame({
'A': np.random.randint(0, 100, 1000000),
'B': np.random.randint(0, 100, 1000000)
})
polars_result = (polars_df.groupby('A')
.agg(pl.col("B").mean().alias("B_mean"),
pl.col("B").sum().alias("B_sum"))
.collect())
polars_time = time.time() - start
print(f"Pandas时间: {pandas_time:.2f}秒")
print(f"Polars时间: {polars_time:.2f}秒")
print(f"Polars比Pandas快 {pandas_time/polars_time:.1f}倍")
7.2 使用PySpark处理超大规模数据
PySpark基础
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count
# 创建Spark会话
spark = SparkSession.builder \
.appName("DataAnalysis") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 创建DataFrame
data = [(1, "Alice", 1000),
(2, "Bob", 1500),
(3, "Charlie", 2000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])
# 执行聚合操作
result = df.groupBy() \
.agg(
sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary"),
count("id").alias("employee_count")
)
result.show()
# 读取大文件
large_df = spark.read.csv("large_file.csv", header=True, inferSchema=True)
# 执行分布式计算
summary = large_df.groupBy("category").agg(
sum("value").alias("total"),
avg("value").alias("average")
)
summary.show()
# 停止Spark会话
spark.stop()
7.3 使用SQL进行数据分析
在Python中使用SQL
import sqlite3
import pandas as pd
# 创建内存数据库
conn = sqlite3.connect(':memory:')
# 创建表
conn.execute('''
CREATE TABLE sales (
id INTEGER PRIMARY KEY,
date TEXT,
product TEXT,
salesperson TEXT,
amount REAL
)
''')
# 插入数据
sales_data = pd.DataFrame({
'date': pd.date_range('2023-01-01', periods=100, freq='D'),
'product': np.random.choice(['Laptop', 'Phone', 'Tablet'], 100),
'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie'], 100),
'amount': np.random.randint(500, 2000, 100)
})
sales_data.to_sql('sales', conn, if_exists='append', index=False)
# 使用SQL查询
query = '''
SELECT
salesperson,
product,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM sales
WHERE date >= '2023-02-01'
GROUP BY salesperson, product
ORDER BY total_sales DESC
'''
result = pd.read_sql(query, conn)
print(result)
# 复杂SQL分析
complex_query = '''
WITH monthly_sales AS (
SELECT
strftime('%Y-%m', date) as month,
salesperson,
SUM(amount) as monthly_total
FROM sales
GROUP BY month, salesperson
),
ranked_sales AS (
SELECT
month,
salesperson,
monthly_total,
RANK() OVER (PARTITION BY month ORDER BY monthly_total DESC) as rank
FROM monthly_sales
)
SELECT * FROM ranked_sales WHERE rank <= 3
'''
top_performers = pd.read_sql(complex_query, conn)
print(top_performers)
conn.close()
第八部分:机器学习基础集成
8.1 特征工程自动化
使用Featuretools
import featuretools as ft
# 创建实体集
es = ft.EntitySet(id="sales_data")
# 添加数据
data = pd.DataFrame({
'customer_id': [1, 1, 2, 2, 3, 3],
'transaction_id': [1, 2, 3, 4, 5, 6],
'amount': [100, 200, 150, 250, 300, 400],
'date': pd.date_range('2023-01-01', periods=6, freq='D')
})
es = es.add_dataframe(
dataframe_name="transactions",
dataframe=data,
index="transaction_id",
time_index="date"
)
es = es.add_dataframe(
dataframe_name="customers",
dataframe=pd.DataFrame({'customer_id': [1, 2, 3]}),
index="customer_id"
)
# 自动特征生成
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name="customers",
max_depth=2
)
print("生成的特征:")
print(feature_matrix.head())
8.2 预测模型集成
简单预测示例
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
# 准备数据
df = pd.DataFrame({
'feature1': np.random.randn(1000),
'feature2': np.random.randn(1000),
'feature3': np.random.randn(1000),
'target': np.random.randn(1000) + 0.5 * np.random.randn(1000)
})
X = df[['feature1', 'feature2', 'feature3']]
y = df['target']
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 评估
mse = mean_squared_error(y_test, y_pred)
print(f"均方误差: {mse:.4f}")
# 特征重要性
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feature_importance)
第九部分:项目实战与职场应用
9.1 构建自动化分析报告
自动化报告生成
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
def generate_monthly_report(data_path, output_dir):
"""
自动生成月度分析报告
"""
# 1. 数据加载与清洗
df = pd.read_csv(data_path)
df['date'] = pd.to_datetime(df['date'])
df = df[df['date'] >= df['date'].max() - pd.DateOffset(months=1)]
# 2. 计算关键指标
metrics = {
'总销售额': df['sales'].sum(),
'平均订单价值': df['sales'].mean(),
'订单数量': len(df),
'同比增长': ((df['sales'].sum() - df['sales'].iloc[:-30].sum()) / df['sales'].iloc[:-30].sum() * 100)
}
# 3. 生成图表
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 销售趋势
daily_sales = df.groupby('date')['sales'].sum()
axes[0, 0].plot(daily_sales.index, daily_sales.values)
axes[0, 0].set_title('每日销售趋势')
axes[0, 0].tick_params(axis='x', rotation=45)
# 产品分布
product_sales = df.groupby('product')['sales'].sum()
axes[0, 1].pie(product_sales.values, labels=product_sales.index, autopct='%1.1f%%')
axes[0, 1].set_title('产品销售占比')
# 销售员表现
salesperson_sales = df.groupby('salesperson')['sales'].sum().sort_values(ascending=False)
axes[1, 0].bar(salesperson_sales.index, salesperson_sales.values)
axes[1, 0].set_title('销售员业绩')
axes[1, 0].tick_params(axis='x', rotation=45)
# 订单分布
axes[1, 1].hist(df['sales'], bins=20, alpha=0.7)
axes[1, 1].set_title('订单金额分布')
plt.tight_layout()
# 4. 保存报告
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_path = f"{output_dir}/monthly_report_{timestamp}.png"
plt.savefig(report_path, dpi=300, bbox_inches='tight')
plt.close()
# 5. 生成文本报告
report_text = f"""
月度销售分析报告
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
关键指标:
- 总销售额: {metrics['总销售额']:,.2f}
- 平均订单价值: {metrics['平均订单价值']:,.2f}
- 订单数量: {metrics['订单数量']}
- 同比增长: {metrics['同比增长']:.2f}%
可视化图表已保存至: {report_path}
"""
with open(f"{output_dir}/monthly_report_{timestamp}.txt", 'w') as f:
f.write(report_text)
return metrics, report_path
# 使用示例
# metrics, chart_path = generate_monthly_report('sales_data.csv', './reports')
9.2 数据质量监控
数据质量检查框架
def data_quality_check(df, rules):
"""
数据质量检查框架
参数:
df: DataFrame
rules: 质量检查规则字典
"""
quality_report = {}
# 1. 完整性检查
quality_report['completeness'] = {
col: 1 - df[col].isnull().sum() / len(df)
for col in rules.get('required_columns', [])
}
# 2. 唯一性检查
for col in rules.get('unique_columns', []):
quality_report[f'unique_{col}'] = {
'unique_ratio': df[col].nunique() / len(df),
'duplicates': df[col].duplicated().sum()
}
# 3. 有效性检查
for col, condition in rules.get('value_constraints', {}).items():
valid = df[col].apply(condition)
quality_report[f'valid_{col}'] = {
'valid_ratio': valid.mean(),
'invalid_count': (~valid).sum()
}
# 4. 范围检查
for col, (min_val, max_val) in rules.get('range_constraints', {}).items():
in_range = (df[col] >= min_val) & (df[col] <= max_val)
quality_report[f'range_{col}'] = {
'in_range_ratio': in_range.mean(),
'out_of_range': (~in_range).sum()
}
return quality_report
# 使用示例
rules = {
'required_columns': ['sales', 'date', 'product'],
'unique_columns': ['transaction_id'],
'value_constraints': {
'sales': lambda x: x > 0,
'product': lambda x: x.isin(['Laptop', 'Phone', 'Tablet'])
},
'range_constraints': {
'sales': (0, 10000)
}
}
# 质量检查
# quality = data_quality_check(your_df, rules)
# print(quality)
9.3 职场竞争力提升建议
1. 建立个人作品集
- 将完成的项目整理成Jupyter Notebook
- 使用GitHub托管代码
- 撰写项目文档说明业务价值
2. 持续学习
- 关注Pandas、NumPy、Scikit-learn的更新
- 参加Kaggle竞赛
- 阅读数据分析相关博客和论文
3. 业务理解能力
- 学习所在行业的业务知识
- 理解数据背后的业务逻辑
- 能够将数据洞察转化为业务建议
4. 沟通能力
- 学会用数据讲故事
- 制作清晰的可视化图表
- 向非技术人员解释技术概念
结语
通过本课程的学习,你已经掌握了Python数据分析的高级技巧,能够解决实际工作中的复杂问题。记住,数据分析不仅仅是写代码,更重要的是理解业务、发现问题并提供解决方案。
关键要点回顾
- 性能优化:向量化操作、内存优化、分块处理
- 数据清洗:高级缺失值处理、复杂转换、时间序列处理
- 可视化:交互式图表、复杂布局、自定义样式
- 实际应用:销售分析、用户行为分析、A/B测试
- 工具进阶:Polars、PySpark、SQL集成
- 机器学习:特征工程、预测模型
下一步行动
- 在实际工作中应用这些技巧
- 持续挑战更复杂的数据问题
- 建立个人数据分析项目集
- 参与开源项目或Kaggle竞赛
- 考虑获得相关认证(如Pandas认证)
掌握这些高级技能将显著提升你的职场竞争力,让你在数据驱动的决策中发挥关键作用。祝你在数据分析的道路上取得成功!
