引言:为什么需要Python数据分析进阶技能?
在当今数据驱动的职场环境中,掌握Python数据分析已经从一项加分项变成了必备技能。然而,许多初学者在掌握了基础的Pandas和Matplotlib后,往往陷入”工具人”的困境——只会机械地执行代码,却无法从数据中提取真正的商业洞察。本课程将带你突破这一瓶颈,从入门走向精通。
学习目标
- 掌握高级数据处理技巧:处理大规模数据、复杂数据清洗
- 提升数据可视化能力:创建专业级图表,讲述数据故事
- 培养数据思维:从数据中发现业务机会和风险
- 解决真实职场问题:销售分析、用户行为分析、运营效率优化
第一章:Pandas高级数据处理技巧
1.1 高效处理大规模数据集
当数据集超过内存限制时,传统的Pandas操作会变得缓慢甚至崩溃。以下是几种解决方案:
方法一:分块读取(Chunking)
import pandas as pd
# 处理10GB的销售数据文件
chunk_size = 100000 # 每次读取10万行
results = []
for chunk in pd.read_csv('large_sales_data.csv', chunksize=chunk_size):
# 对每个数据块进行处理
chunk['profit_margin'] = (chunk['revenue'] - chunk['cost']) / chunk['revenue']
monthly_stats = chunk.groupby('month').agg({
'revenue': 'sum',
'profit_margin': 'mean'
})
results.append(monthly_stats)
# 合并结果
final_result = pd.concat(results).groupby(level=0).mean()
print(f"处理完成!平均利润率: {final_result['profit_margin'].mean():.2%}")
方法二:使用Dask进行并行计算
import dask.dataframe as dd
# Dask会自动并行处理大数据
df = dd.read_csv('large_sales_data.csv')
# 延迟计算,只有在调用compute()时才执行
result = df[df['region'] == '华东'].groupby('product')['revenue'].sum().compute()
print(result)
1.2 复杂数据清洗实战
职场中80%的时间都在处理脏数据,以下是真实场景案例:
场景:客户信息清洗
import pandas as pd
import numpy as np
import re
# 模拟脏数据
data = {
'customer_id': [1, 2, 3, 4, 5],
'name': ['张三', '李四', '王五', '赵六', '钱七'],
'phone': ['138-1234-5678', '13912345678', '137-1234-567', '13612345678', '135-1234-5678'],
'email': ['zhangsan@email.com', 'lisi@', 'wangwu@email.com', 'zhaoliu@company', 'qianqi@email.com'],
'signup_date': ['2023-01-15', '2023/02/20', '2023-03-10', '2023-04-05', '2023-05-25'],
'company': ['ABC公司', 'DEF公司', 'GHI公司', 'ABC公司', 'JKL公司']
}
df = pd.DataFrame(data)
# 1. 清洗电话号码
def clean_phone(phone):
if pd.isna(phone):
return None
# 移除所有非数字字符
digits = re.sub(r'\D', '', str(phone))
# 验证是否为11位手机号
if len(digits) == 11 and digits.startswith('1'):
return digits
return None
df['phone_clean'] = df['phone'].apply(clean_phone)
# 2. 验证邮箱格式
def validate_email(email):
if pd.isna(email):
return False
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, str(email)))
df['email_valid'] = df['email'].apply(validate_email)
# 3. 统一日期格式
df['signup_date'] = pd.to_datetime(df['signup_date'], errors='coerce')
# 4. 标准化公司名称
df['company_clean'] = df['company'].str.upper().str.replace(r'\s+', '', regex=True)
print("清洗后的数据:")
print(df[['customer_id', 'phone_clean', 'email_valid', 'signup_date', 'company_clean']])
1.3 高性能合并操作
职场中经常需要合并多个数据源,以下是优化技巧:
# 创建示例数据
df1 = pd.DataFrame({
'id': range(1, 100001),
'value1': np.random.randint(1, 100, 100000)
})
df2 = pd.DataFrame({
'id': range(50000, 150001),
'value2': np.random.randint(1, 100, 100001)
})
# 传统merge方法(较慢)
# result = pd.merge(df1, df2, on='id', how='inner')
# 优化方法1:使用索引合并
df1_indexed = df1.set_index('id')
df2_indexed = df2.set_index('id')
result_fast = df1_indexed.join(df2_indexed, how='inner').reset_index()
# 优化方法2:使用numpy的intersect1d(最快)
common_ids = np.intersect1d(df1['id'].values, df2['id'].values)
df1_filtered = df1[df1['id'].isin(common_ids)]
df2_filtered = df2[df2['id'].isin(common_ids)]
result_fastest = pd.merge(df1_filtered, df2_filtered, on='id', how='inner')
print(f"原始数据量: {len(df1)} x {len(df2)}")
print(f"合并后数据量: {len(result_fastest)}")
第二章:高级数据可视化与数据故事讲述
2.1 创建专业级图表
案例:销售仪表板
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 创建销售数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
sales_data = pd.DataFrame({
'date': dates,
'revenue': np.random.normal(50000, 15000, len(dates)).cumsum() / 100 + np.random.normal(0, 5000, len(dates)),
'region': np.random.choice(['华东', '华南', '华北', '西南'], len(dates)),
'product': np.random.choice(['产品A', '产品B', '产品C'], len(dates))
})
# 创建复合图表
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('2023年度销售分析仪表板', fontsize=20, fontweight='bold')
# 1. 时间序列趋势
axes[0, 0].plot(sales_data['date'], sales_data['revenue'], color='#2E86AB', linewidth=1.5, alpha=0.8)
axes[0, 0].set_title('每日收入趋势', fontweight='bold')
axes[0, 0].set_ylabel('收入 (元)')
axes[0, 0].grid(True, alpha=0.3)
# 2. 区域分布
region_sum = sales_data.groupby('region')['revenue'].sum()
colors = ['#A23B72', '#F18F01', '#C73E1D', '#3B1F2B']
axes[0, 1].pie(region_sum.values, labels=region_sum.index, autopct='%1.1f%%', colors=colors)
axes[0, 1].set_title('区域收入占比', fontweight='bold')
# 3. 产品对比
product_monthly = sales_data.groupby([sales_data['date'].dt.month, 'product'])['revenue'].sum().unstack()
product_monthly.plot(kind='bar', ax=axes[1, 0], color=['#06A77D', '#FF9F1C', '#2EC4B6'])
axes[1, 0].set_title('月度产品对比', fontweight='bold')
axes[1, 0].set_ylabel('收入 (元)')
axes[1, 0].legend(title='产品')
# 4. 箱线图(分布分析)
sns.boxplot(data=sales_data, x='region', y='revenue', ax=axes[1, 1], palette='Set2')
axes[1, 1].set_title('区域收入分布', fontweight='bold')
axes[1, 1].set_ylabel('收入 (元)')
plt.tight_layout()
plt.show()
2.2 使用Plotly创建交互式仪表板
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
# 创建交互式销售地图
def create_interactive_dashboard():
# 模拟各城市销售数据
cities = ['上海', '北京', '广州', '深圳', '杭州', '成都', '武汉', '南京']
lat = [31.2304, 39.9042, 23.1291, 22.5431, 30.2741, 30.5728, 30.5428, 32.0603]
lon = [121.4737, 116.4074, 113.2644, 114.0579, 120.1551, 104.0668, 114.3055, 118.7969]
sales = [450000, 380000, 320000, 290000, 250000, 180000, 160000, 140000]
fig = make_subplots(
rows=2, cols=2,
specs=[[{"type": "scattergeo"}, {"type": "bar"}],
[{"type": "scatter"}, {"type": "domain"}]],
subplot_titles=('销售地理分布', 'Top5城市', '趋势分析', '市场份额')
)
# 地图
fig.add_trace(go.Scattergeo(
lon=lon, lat=lat, text=cities,
mode='markers+text', marker=dict(size=sales, color=sales, colorscale='Viridis'),
name='销售额', textposition='top center'
), row=1, col=1)
# 柱状图
top5 = sorted(zip(cities, sales), key=lambda x: x[1], reverse=True)[:5]
fig.add_trace(go.Bar(
x=[x[0] for x in top5], y=[x[1] for x in top5],
marker_color='#2E86AB', name='Top5城市'
), row=1, col=2)
# 趋势线
dates = pd.date_range('2023-01-01', periods=12, freq='M')
trend = np.cumsum(np.random.normal(0, 20000, 12)) + 100000
fig.add_trace(go.Scatter(
x=dates, y=trend, mode='lines+markers',
line=dict(color='#F18F01', width=3), name='月度趋势'
), row=2, col=1)
# 饼图
fig.add_trace(go.Pie(
labels=cities[:4], values=sales[:4],
hole=0.4, name='区域份额'
), row=2, col=2)
fig.update_layout(
height=800, showlegend=False,
title_text="2023年度销售分析仪表板", title_x=0.5
)
fig.show()
create_interactive_dashboard()
第三章:数据思维与业务洞察
3.1 构建分析框架
RFM客户价值分析模型
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 模拟电商订单数据
np.random.seed(42)
customer_ids = np.arange(1, 1001)
orders = []
for cid in customer_ids:
# 每个客户1-10笔订单
num_orders = np.random.randint(1, 11)
for _ in RFM分析:
# 1. 计算RFM指标
current_date = datetime(2023, 12, 31)
rfm = df.groupby('customer_id').agg({
'order_date': lambda x: (current_date - x.max()).days, # Recency
'order_id': 'count', # Frequency
'amount': 'sum' # Monetary
}).rename(columns={
'order_date': 'recency',
'order_id': 'frequency',
'amount': 'monetary'
})
# 2. 评分(1-5分)
rfm['R_score'] = pd.qcut(rfm['recency'], 5, labels=[5,4,3,2,1]) # 越小越好
rfm['F_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
rfm['M_score'] = pd.qcut(rfm['monetary'], 5, labels=[1,2,3,4,5])
# 3. 客户分层
rfm['RFM_score'] = rfm['R_score'].astype(str) + rfm['F_score'].astype(str) + rfm['M_score'].astype(str)
def segment_customer(row):
score = int(row['RFM_score'])
if score >= 555:
return 'VIP客户'
elif score >= 444:
return '高价值客户'
elif score >= 333:
return '潜力客户'
elif score >= 222:
return '一般客户'
else:
return '流失风险客户'
rfm['segment'] = rfm.apply(segment_customer, axis=1)
print("客户分层结果:")
print(rfm['segment'].value_counts())
# 4. 可视化
segment_summary = rfm.groupby('segment').agg({
'recency': 'mean',
'frequency': 'mean',
'monetary': 'mean'
}).round(2)
print("\n各分层平均值:")
print(segment_summary)
return rfm
# 执行分析
rfm_result = rfm_analysis(orders_df)
3.2 漏斗分析:用户转化路径分析
def funnel_analysis():
# 模拟用户行为数据
stages = ['访问', '注册', '浏览商品', '加入购物车', '下单', '支付成功']
users = np.arange(1, 10001)
# 模拟每个用户在各阶段的转化情况
data = []
for user in users:
# 转化率递减
prob = 0.95
for stage in stages:
if np.random.random() < prob:
data.append({
'user_id': user,
'stage': stage,
'timestamp': datetime.now() - timedelta(days=np.random.randint(0, 30))
})
prob *= 0.85 # 每个阶段转化率下降
df = pd.DataFrame(data)
# 计算漏斗转化率
funnel_counts = df.groupby('stage').size().reindex(stages)
funnel_conversion = funnel_counts / funnel_counts.shift(1) * 100
print("漏斗分析结果:")
for i, stage in enumerate(stages):
count = funnel_counts[stage]
if i == 0:
print(f"{stage}: {count} (100%)")
else:
conversion = funnel_conversion[stage]
print(f"{stage}: {count} ({conversion:.1f}%)")
# 可视化
fig, ax = plt.subplots(figsize=(12, 8))
# 漏斗图
y_pos = np.arange(len(stages))
widths = funnel_counts.values / funnel_counts.values[0] * 100
bars = ax.barh(y_pos, widths, color='#2E86AB', alpha=0.8)
# 添加数值标签
for i, (stage, count) in enumerate(zip(stages, funnel_counts)):
ax.text(widths[i] + 1, i, f'{count} ({widths[i]:.1f}%)',
va='center', fontweight='bold')
ax.set_yticks(y_pos)
ax.set_yticklabels(stages)
ax.set_xlabel('转化率 (%)')
ax.set_title('用户转化漏斗分析', fontsize=16, fontweight='bold')
ax.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()
funnel_analysis()
第四章:性能优化与最佳实践
4.1 内存优化技巧
import pandas as pd
import numpy as np
def optimize_memory(df):
"""优化DataFrame内存使用"""
start_mem = df.memory_usage().sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type != object:
c_min = df[col].min()
c_max = df[col].max()
if str(col_type)[:3] == 'int':
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df[col] = df[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df[col] = df[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df[col] = df[col].astype(np.int32)
elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
df[col] = df[col].astype(np.int64)
else:
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df[col] = df[col].astype(np.float32)
else:
df[col] = df[col].astype(np.float64)
else:
# 对象类型转换为category如果唯一值较少
num_unique_values = len(df[col].unique())
num_total_values = len(df[col])
if num_unique_values / num_total_values < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage().sum() / 1024**2
print(f"内存优化: {start_mem:.2f} MB -> {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}% reduction)")
return df
# 示例
df = pd.DataFrame({
'id': range(1000000),
'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
'value': np.random.random(1000000)
})
df_optimized = optimize_memory(df)
4.2 代码性能分析
import time
import cProfile
import pstats
def performance_analysis():
"""性能分析示例"""
# 创建测试数据
df = pd.DataFrame({
'A': np.random.random(100000),
'B': np.random.random(100000),
'C': np.random.choice(['X', 'Y', 'Z'], 100000)
})
# 方法1:使用apply(较慢)
def method1():
return df.apply(lambda row: row['A'] * 2 if row['B'] > 0.5 else row['A'], axis=1)
# 方法2:使用numpy向量化(快)
def method2():
return np.where(df['B'] > 0.5, df['A'] * 2, df['A'])
# 方法3:使用numba加速
from numba import jit
@jit(nopython=True)
def calculate_fast(a, b):
result = np.empty_like(a)
for i in range(len(a)):
if b[i] > 0.5:
result[i] = a[i] * 2
else:
result[i] = a['A'][i]
return result
def method3():
return calculate_fast(df['A'].values, df['B'].values)
# 性能测试
methods = [('Apply', method1), ('Numpy', method2), ('Numba', method3)]
for name, func in methods:
start = time.time()
result = func()
end = time.time()
print(f"{name}: {end-start:.4f}秒")
# 详细性能分析
print("\n详细性能分析:")
cProfile.run('method1()', sort='cumulative')
stats = pstats.Stats(cProfile.run('method1()', sort='cumulative'))
stats.sort_stats('cumulative').print_stats(10)
performance_analysis()
第五章:职场实战项目
5.1 销售预测与库存优化
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
def sales_forecast():
"""销售预测与库存优化"""
# 模拟历史销售数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
# 创建特征
df = pd.DataFrame({'date': dates})
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['dayofweek'] = df['date'].dt.dayofweek
df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
# 模拟销售量(有趋势、季节性和随机性)
base = 1000
trend = np.linspace(0, 500, len(df))
seasonal = 200 * np.sin(2 * np.pi * df['month'] / 12)
random = np.random.normal(0, 50, len(df))
df['sales'] = base + trend + seasonal + random
# 添加节假日效应
holidays = [pd.Timestamp('2020-01-25'), pd.Timestamp('2021-02-12'),
pd.Timestamp('2022-02-01'), pd.Timestamp('2023-01-22')]
for holiday in holidays:
mask = (df['date'] >= holiday) & (df['date'] <= holiday + pd.Timedelta(days=3))
df.loc[mask, 'sales'] *= 1.5
# 特征工程
features = ['year', 'month', 'day', 'dayofweek', 'is_weekend']
X = df[features]
y = df['sales']
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 评估
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f"预测准确度评估:")
print(f"平均绝对误差: {mae:.2f}")
print(f"均方根误差: {rmse:.2f}")
# 库存优化建议
future_dates = pd.date_range('2024-01-01', '2024-01-31', freq='D')
future_df = pd.DataFrame({'date': future_dates})
future_df['year'] = future_df['date'].dt.year
future_df['month'] = future_df['date'].dt.month
future_df['day'] = future_df['date'].dt.day
future_df['dayofweek'] = future_df['date'].dt.dayofweek
future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int)
future_sales = model.predict(future_df[features])
# 计算安全库存(考虑预测误差)
safety_stock = rmse * 1.65 # 95%置信区间
reorder_point = future_sales.mean() + safety_stock
print(f"\n库存优化建议:")
print(f"平均日销量预测: {future_sales.mean():.2f}")
print(f"安全库存水平: {safety_stock:.2f}")
print(f"补货点: {reorder_point:.2f}")
# 可视化
plt.figure(figsize=(14, 6))
plt.plot(df['date'].tail(100), df['sales'].tail(100), label='历史数据', color='gray', alpha=0.7)
plt.plot(future_dates, future_sales, label='预测值', color='#2E86AB', linewidth=2)
plt.axhline(y=reorder_point, color='#F18F01', linestyle='--', label='补货点')
plt.fill_between(future_dates, future_sales - safety_stock, future_sales + safety_stock,
alpha=0.2, color='#2E86AB', label='95%置信区间')
plt.title('销售预测与库存优化', fontsize=16, fontweight='bold')
plt.xlabel('日期')
plt.ylabel('销售量')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
sales_forecast()
5.2 用户流失预警系统
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
def churn_prediction():
"""用户流失预测"""
# 模拟用户行为数据
np.random.seed(42)
n_users = 5000
user_data = pd.DataFrame({
'user_id': range(1, n_users + 1),
'tenure': np.random.randint(1, 36, n_users), # 在网时长(月)
'monthly_usage': np.random.normal(500, 200, n_users), # 月度使用量
'support_calls': np.random.poisson(2, n_users), # 客服电话次数
'payment_delay': np.random.choice([0, 1], n_users, p=[0.8, 0.2]), # 延迟支付
'age': np.random.randint(18, 70, n_users)
})
# 模拟流失标签(基于规则)
# 在网时长<6个月 + 使用量低 + 客服电话多 + 延迟支付 = 高流失风险
conditions = (
(user_data['tenure'] < 6) &
(user_data['monthly_usage'] < 300) &
(user_data['support_calls'] >= 3) &
(user_data['payment_delay'] == 1)
)
user_data['churn'] = conditions.astype(int)
print("流失用户分布:")
print(user_data['churn'].value_counts(normalize=True))
# 特征和标签
features = ['tenure', 'monthly_usage', 'support_calls', 'payment_delay', 'age']
X = user_data[features]
y = user_data['churn']
# 划分数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
# 训练模型
model = LogisticRegression(random_state=42, max_iter=1000)
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
# 评估
print("\n模型评估报告:")
print(classification_report(y_test, y_pred))
# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['未流失', '流失'], yticklabels=['未流失', '流失'])
plt.title('混淆矩阵', fontsize=14, fontweight='bold')
plt.ylabel('真实值')
plt.xlabel('预测值')
plt.show()
# 特征重要性
feature_importance = pd.DataFrame({
'feature': features,
'coefficient': model.coef_[0]
}).sort_values('coefficient', key=abs, ascending=False)
print("\n特征重要性(影响流失的因素):")
print(feature_importance)
# 预测新用户
new_users = pd.DataFrame({
'tenure': [2, 12, 5],
'monthly_usage': [150, 600, 200],
'support_calls': [5, 1, 4],
'payment_delay': [1, 0, 1],
'age': [25, 45, 30]
})
new_pred = model.predict_proba(new_users)[:, 1]
print("\n新用户流失概率预测:")
for i, prob in enumerate(new_pred):
risk = "高风险" if prob > 0.7 else "中风险" if prob > 0.3 else "低风险"
print(f"用户{i+1}: {prob:.1%} ({risk})")
churn_prediction()
第六章:数据工程与自动化
6.1 自动化数据管道
import schedule
import time
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_pipeline.log'),
logging.StreamHandler()
]
)
class DataPipeline:
"""自动化数据管道"""
def __init__(self, source_path, dest_path):
self.source_path = source_path
self.dest_path = dest_path
self.logger = logging.getLogger(__name__)
def extract(self):
"""数据抽取"""
self.logger.info(f"开始抽取数据: {self.source_path}")
try:
# 模拟从API或数据库抽取
df = pd.DataFrame({
'timestamp': [datetime.now()],
'data': [np.random.random(10).tolist()]
})
self.logger.info(f"成功抽取 {len(df)} 条记录")
return df
except Exception as e:
self.logger.error(f"抽取失败: {e}")
return None
def transform(self, df):
"""数据转换"""
if df is None:
return None
self.logger.info("开始数据转换")
try:
# 数据清洗和转换
df['processed_at'] = datetime.now()
df['data_mean'] = df['data'].apply(lambda x: np.mean(x))
df['data_std'] = df['data'].apply(lambda x: np.std(x))
self.logger.info("数据转换完成")
return df
except Exception as e:
self.logger.error(f"转换失败: {e}")
return None
def load(self, df):
"""数据加载"""
if df is None:
return False
self.logger.info(f"开始加载数据到: {self.dest_path}")
try:
# 追加模式保存
df.to_csv(self.dest_path, mode='a', header=not pd.io.common.file_exists(self.dest_path), index=False)
self.logger.info("数据加载成功")
return True
except Exception as e:
self.logger.error(f"加载失败: {e}")
return False
def run(self):
"""执行完整管道"""
self.logger.info("=" * 50)
self.logger.info("管道开始执行")
# ETL流程
data = self.extract()
transformed_data = self.transform(data)
success = self.load(transformed_data)
if success:
self.logger.info("管道执行成功")
else:
self.logger.error("管道执行失败")
return success
# 创建管道实例
pipeline = DataPipeline('source.csv', 'processed_data.csv')
# 定时任务(每天凌晨2点执行)
schedule.every().day.at("02:00").do(pipeline.run)
# 测试运行
if __name__ == "__main__":
# 立即运行一次测试
pipeline.run()
# 后台运行定时任务
print("定时任务已启动,按 Ctrl+C 退出")
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
6.2 数据质量监控
import pandas as pd
import numpy as np
from datetime import datetime
class DataQualityMonitor:
"""数据质量监控"""
def __init__(self, df):
self.df = df
self.report = {}
def check_completeness(self):
"""完整性检查"""
self.report['completeness'] = {
'total_rows': len(self.df),
'missing_values': self.df.isnull().sum().to_dict(),
'missing_rate': (self.df.isnull().sum() / len(self.df)).to_dict(),
'is_complete': self.df.isnull().sum().sum() == 0
}
def check_accuracy(self):
"""准确性检查"""
# 检查数值范围
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
accuracy_report = {}
for col in numeric_cols:
if col in ['age']:
# 年龄应该在合理范围内
invalid_count = ((self.df[col] < 0) | (self.df[col] > 120)).sum()
accuracy_report[col] = {
'invalid_count': invalid_count,
'invalid_rate': invalid_count / len(self.df),
'is_valid': invalid_count == 0
}
self.report['accuracy'] = accuracy_report
def check_consistency(self):
"""一致性检查"""
# 检查重复记录
duplicates = self.df.duplicated().sum()
self.report['consistency'] = {
'duplicates': duplicates,
'duplicate_rate': duplicates / len(self.df),
'is_consistent': duplicates == 0
}
def check_timeliness(self):
"""时效性检查"""
if 'timestamp' in self.df.columns:
max_time = self.df['timestamp'].max()
min_time = self.df['timestamp'].min()
self.report['timeliness'] = {
'data_age_hours': (datetime.now() - max_time).total_seconds() / 3600,
'time_range_hours': (max_time - min_time).total_seconds() / 3600,
'is_fresh': (datetime.now() - max_time).total_seconds() < 86400 # 24小时内
}
def generate_report(self):
"""生成完整报告"""
self.check_completeness()
self.check_accuracy()
self.check_consistency()
self.check_timeliness()
# 综合评分
total_checks = 0
passed_checks = 0
for category, details in self.report.items():
if isinstance(details, dict):
for key, value in details.items():
if isinstance(value, dict) and 'is_valid' in value:
total_checks += 1
if value['is_valid']:
passed_checks += 1
elif isinstance(value, bool) and key.startswith('is_'):
total_checks += 1
if value:
passed_checks += 1
quality_score = (passed_checks / total_checks * 100) if total_checks > 0 else 0
return {
'quality_score': quality_score,
'report': self.report,
'passed_checks': passed_checks,
'total_checks': total_checks
}
# 使用示例
sample_data = pd.DataFrame({
'user_id': range(1, 1001),
'age': np.random.randint(18, 70, 1000),
'score': np.random.normal(75, 15, 1000),
'timestamp': pd.date_range('2023-01-01', periods=1000, freq='H')
})
# 故意添加一些质量问题
sample_data.loc[0:5, 'age'] = -1 # 无效年龄
sample_data.loc[10:15, 'score'] = np.nan # 缺失值
sample_data = pd.concat([sample_data, sample_data.iloc[:5]]) # 重复记录
# 监控
monitor = DataQualityMonitor(sample_data)
quality_report = monitor.generate_report()
print("数据质量报告:")
print(f"综合评分: {quality_report['quality_score']:.1f}%")
print(f"通过检查: {quality_report['passed_checks']}/{quality_report['total_checks']}")
print("\n详细报告:")
import json
print(json.dumps(quality_report['report'], indent=2, default=str))
第七章:综合实战案例
7.1 完整的电商数据分析项目
# 由于代码较长,这里展示关键部分
def ecommerce_analysis_project():
"""电商数据分析完整项目"""
# 1. 数据加载与探索
print("=== 阶段1: 数据加载与探索 ===")
# 模拟电商数据
np.random.seed(42)
# 用户数据
users = pd.DataFrame({
'user_id': range(1, 1001),
'signup_date': pd.date_range('2022-01-01', periods=1000, freq='D'),
'age': np.random.randint(18, 65, 1000),
'gender': np.random.choice(['M', 'F'], 1000, p=[0.55, 0.45]),
'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], 1000)
})
# 订单数据
orders = []
for user_id in range(1, 1001):
num_orders = np.random.randint(0, 15)
for _ in range(num_orders):
orders.append({
'order_id': len(orders) + 1,
'user_id': user_id,
'order_date': users.loc[users['user_id'] == user_id, 'signup_date'].iloc[0] +
pd.Timedelta(days=np.random.randint(0, 365)),
'amount': np.random.lognormal(3, 0.5),
'category': np.random.choice(['电子', '服装', '家居', '美妆'], p=[0.3, 0.25, 0.25, 0.2])
})
orders_df = pd.DataFrame(orders)
# 2. 数据清洗
print("\n=== 阶段2: 数据清洗 ===")
# 处理异常值
orders_df = orders_df[orders_df['amount'] > 0]
orders_df['amount'] = np.clip(orders_df['amount'], 10, 5000) # 限制在合理范围
# 3. 分析计算
print("\n=== 阶段3: 核心指标计算 ===")
# 用户生命周期价值
user_ltv = orders_df.groupby('user_id').agg({
'amount': ['sum', 'mean', 'count'],
'order_date': ['min', 'max']
}).round(2)
user_ltv.columns = ['总消费', '平均订单额', '订单数', '首次购买', '最后购买']
user_ltv['生命周期(天)'] = (user_ltv['最后购买'] - user_ltv['首次购买']).dt.days
# 4. 用户分层
print("\n=== 阶段4: 用户分层 ===")
# 使用RFM模型
current_date = orders_df['order_date'].max()
rfm = orders_df.groupby('user_id').agg({
'order_date': lambda x: (current_date - x.max()).days,
'order_id': 'count',
'amount': 'sum'
})
rfm.columns = ['R', 'F', 'M']
# 评分
rfm['R_score'] = pd.qcut(rfm['R'], 5, labels=[5,4,3,2,1])
rfm['F_score'] = pd.qcut(rfm['F'].rank(method='first'), 5, labels=[1,2,3,4,5])
rfm['M_score'] = pd.qcut(rfm['M'], 5, labels=[1,2,3,4,5])
# 分层
def segment(row):
score = int(row['R_score']) + int(row['F_score']) + int(row['M_score'])
if score >= 12: return 'VIP'
elif score >= 9: return '高价值'
elif score >= 6: return '潜力'
else: return '一般'
rfm['segment'] = rfm.apply(segment, axis=1)
# 5. 业务洞察
print("\n=== 阶段5: 业务洞察 ===")
# 各城市销售对比
city_sales = orders_df.merge(users[['user_id', 'city']], on='user_id').groupby('city')['amount'].sum()
print("各城市销售额:")
print(city_sales)
# 品类分析
category_growth = orders_df.groupby([orders_df['order_date'].dt.to_period('M'), 'category'])['amount'].sum().unstack()
print("\n月度品类趋势:")
print(category_growth.tail())
# 6. 可视化
print("\n=== 阶段6: 可视化 ===")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# RFM分布
rfm_score = rfm['R_score'].astype(str) + rfm['F_score'].astype(str) + rfm['M_score'].astype(str)
rfm_score.value_counts().head(10).plot(kind='bar', ax=axes[0,0], color='#2E86AB')
axes[0,0].set_title('Top10 RFM组合分布')
# 用户分层
rfm['segment'].value_counts().plot(kind='pie', ax=axes[0,1], autopct='%1.1f%%')
axes[0,1].set_title('用户分层占比')
# 城市销售
city_sales.plot(kind='bar', ax=axes[1,0], color='#F18F01')
axes[1,0].set_title('各城市销售额')
# 品类趋势
category_growth.plot(kind='line', ax=axes[1,1])
axes[1,1].set_title('品类月度趋势')
axes[1,1].legend(title='品类')
plt.tight_layout()
plt.show()
# 7. 输出报告
print("\n=== 阶段7: 分析报告 ===")
total_revenue = orders_df['amount'].sum()
total_users = orders_df['user_id'].nunique()
avg_order_value = orders_df['amount'].mean()
print(f"""
电商数据分析总结报告
====================
核心指标:
- 总销售额: ¥{total_revenue:,.2f}
- 活跃用户数: {total_users:,}
- 平均订单价值: ¥{avg_order_value:.2f}
关键发现:
1. VIP用户占比: {len(rfm[rfm['segment']=='VIP'])/len(rfm)*100:.1f}%
2. 最高销售额城市: {city_sales.idxmax()}
3. 最受欢迎品类: {orders_df['category'].mode().iloc[0]}
行动建议:
- 针对VIP用户设计专属权益
- 加强在{city_sales.idxmax()}市场的推广
- 优化{orders_df['category'].mode().iloc[0]}品类的供应链
""")
# 执行项目
ecommerce_analysis_project()
第八章:持续学习与职业发展
8.1 建立个人数据分析作品集
def create_portfolio_project():
"""创建数据分析作品集项目模板"""
project_template = {
"项目名称": "用户行为分析与精准营销",
"项目描述": "通过分析用户行为数据,构建用户画像,实现精准营销",
"技术栈": ["Python", "Pandas", "Scikit-learn", "Plotly"],
"数据源": "模拟电商用户行为数据",
"关键步骤": [
"数据清洗与预处理",
"用户行为模式分析",
"用户分群(K-Means聚类)",
"购买预测模型",
"营销策略建议"
],
"预期成果": [
"用户分群报告",
"购买预测模型(准确率>85%)",
"营销策略建议书",
"交互式可视化仪表板"
],
"代码仓库": "https://github.com/yourusername/project-name",
"展示方式": "Jupyter Notebook + Streamlit应用"
}
print("数据分析作品集项目模板:")
print(json.dumps(project_template, indent=2, ensure_ascii=False))
return project_template
# 生成学习路径
def learning_roadmap():
"""数据分析学习路径"""
roadmap = {
"阶段1: 基础夯实 (1-2个月)": [
"Python基础语法",
"Pandas数据处理",
"Matplotlib/Seaborn可视化",
"SQL基础查询"
],
"阶段2: 进阶提升 (2-3个月)": [
"高级Pandas技巧",
"数据清洗最佳实践",
"统计分析基础",
"数据可视化进阶"
],
"阶段3: 机器学习入门 (3-4个月)": [
"Scikit-learn基础",
"回归与分类模型",
"模型评估与调优",
"特征工程"
],
"阶段4: 业务应用 (持续)": [
"行业业务理解",
"分析框架构建",
"数据故事讲述",
"商业洞察提炼"
],
"阶段5: 工程能力 (持续)": [
"数据管道构建",
"自动化脚本",
"性能优化",
"代码规范"
]
}
print("\n数据分析学习路线图:")
for phase, skills in roadmap.items():
print(f"\n{phase}:")
for skill in skills:
print(f" - {skill}")
return roadmap
# 执行
portfolio = create_portfolio_project()
roadmap = learning_roadmap()
总结
通过本课程的系统学习,你将掌握从基础数据处理到高级分析建模的完整技能链。关键要点:
- 技术层面:精通Pandas高级操作、高性能计算、复杂可视化
- 业务层面:建立数据思维,掌握RFM、漏斗分析等经典模型
- 工程层面:构建自动化数据管道,实现数据质量监控
- 职业层面:打造个人作品集,形成持续学习能力
记住,数据分析的核心价值不在于工具的使用,而在于从数据中发现业务洞察并推动决策的能力。持续实践,不断挑战复杂项目,你将成为职场中不可或缺的数据专家。
下一步行动建议:
- 选择1-2个本课程中的案例进行实战演练
- 将学到的技巧应用到当前工作中
- 在GitHub上建立个人数据分析项目仓库
- 参与Kaggle竞赛或开源数据分析项目
- 定期复盘和总结自己的分析方法论
祝你在数据分析的道路上越走越远,成为真正的数据驱动决策专家!
