引言:为什么需要Python数据分析进阶技能?

在当今数据驱动的职场环境中,掌握Python数据分析已经从一项加分项变成了必备技能。然而,许多初学者在掌握了基础的Pandas和Matplotlib后,往往陷入”工具人”的困境——只会机械地执行代码,却无法从数据中提取真正的商业洞察。本课程将带你突破这一瓶颈,从入门走向精通。

学习目标

  • 掌握高级数据处理技巧:处理大规模数据、复杂数据清洗
  • 提升数据可视化能力:创建专业级图表,讲述数据故事
  • 培养数据思维:从数据中发现业务机会和风险
  • 解决真实职场问题:销售分析、用户行为分析、运营效率优化

第一章:Pandas高级数据处理技巧

1.1 高效处理大规模数据集

当数据集超过内存限制时,传统的Pandas操作会变得缓慢甚至崩溃。以下是几种解决方案:

方法一:分块读取(Chunking)

import pandas as pd

# 处理10GB的销售数据文件
chunk_size = 100000  # 每次读取10万行
results = []

for chunk in pd.read_csv('large_sales_data.csv', chunksize=chunk_size):
    # 对每个数据块进行处理
    chunk['profit_margin'] = (chunk['revenue'] - chunk['cost']) / chunk['revenue']
    monthly_stats = chunk.groupby('month').agg({
        'revenue': 'sum',
        'profit_margin': 'mean'
    })
    results.append(monthly_stats)

# 合并结果
final_result = pd.concat(results).groupby(level=0).mean()
print(f"处理完成!平均利润率: {final_result['profit_margin'].mean():.2%}")

方法二:使用Dask进行并行计算

import dask.dataframe as dd

# Dask会自动并行处理大数据
df = dd.read_csv('large_sales_data.csv')

# 延迟计算,只有在调用compute()时才执行
result = df[df['region'] == '华东'].groupby('product')['revenue'].sum().compute()
print(result)

1.2 复杂数据清洗实战

职场中80%的时间都在处理脏数据,以下是真实场景案例:

场景:客户信息清洗

import pandas as pd
import numpy as np
import re

# 模拟脏数据
data = {
    'customer_id': [1, 2, 3, 4, 5],
    'name': ['张三', '李四', '王五', '赵六', '钱七'],
    'phone': ['138-1234-5678', '13912345678', '137-1234-567', '13612345678', '135-1234-5678'],
    'email': ['zhangsan@email.com', 'lisi@', 'wangwu@email.com', 'zhaoliu@company', 'qianqi@email.com'],
    'signup_date': ['2023-01-15', '2023/02/20', '2023-03-10', '2023-04-05', '2023-05-25'],
    'company': ['ABC公司', 'DEF公司', 'GHI公司', 'ABC公司', 'JKL公司']
}

df = pd.DataFrame(data)

# 1. 清洗电话号码
def clean_phone(phone):
    if pd.isna(phone):
        return None
    # 移除所有非数字字符
    digits = re.sub(r'\D', '', str(phone))
    # 验证是否为11位手机号
    if len(digits) == 11 and digits.startswith('1'):
        return digits
    return None

df['phone_clean'] = df['phone'].apply(clean_phone)

# 2. 验证邮箱格式
def validate_email(email):
    if pd.isna(email):
        return False
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, str(email)))

df['email_valid'] = df['email'].apply(validate_email)

# 3. 统一日期格式
df['signup_date'] = pd.to_datetime(df['signup_date'], errors='coerce')

# 4. 标准化公司名称
df['company_clean'] = df['company'].str.upper().str.replace(r'\s+', '', regex=True)

print("清洗后的数据:")
print(df[['customer_id', 'phone_clean', 'email_valid', 'signup_date', 'company_clean']])

1.3 高性能合并操作

职场中经常需要合并多个数据源,以下是优化技巧:

# 创建示例数据
df1 = pd.DataFrame({
    'id': range(1, 100001),
    'value1': np.random.randint(1, 100, 100000)
})

df2 = pd.DataFrame({
    'id': range(50000, 150001),
    'value2': np.random.randint(1, 100, 100001)
})

# 传统merge方法(较慢)
# result = pd.merge(df1, df2, on='id', how='inner')

# 优化方法1:使用索引合并
df1_indexed = df1.set_index('id')
df2_indexed = df2.set_index('id')
result_fast = df1_indexed.join(df2_indexed, how='inner').reset_index()

# 优化方法2:使用numpy的intersect1d(最快)
common_ids = np.intersect1d(df1['id'].values, df2['id'].values)
df1_filtered = df1[df1['id'].isin(common_ids)]
df2_filtered = df2[df2['id'].isin(common_ids)]
result_fastest = pd.merge(df1_filtered, df2_filtered, on='id', how='inner')

print(f"原始数据量: {len(df1)} x {len(df2)}")
print(f"合并后数据量: {len(result_fastest)}")

第二章:高级数据可视化与数据故事讲述

2.1 创建专业级图表

案例:销售仪表板

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 创建销售数据
np.random.seed(42)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
sales_data = pd.DataFrame({
    'date': dates,
    'revenue': np.random.normal(50000, 15000, len(dates)).cumsum() / 100 + np.random.normal(0, 5000, len(dates)),
    'region': np.random.choice(['华东', '华南', '华北', '西南'], len(dates)),
    'product': np.random.choice(['产品A', '产品B', '产品C'], len(dates))
})

# 创建复合图表
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('2023年度销售分析仪表板', fontsize=20, fontweight='bold')

# 1. 时间序列趋势
axes[0, 0].plot(sales_data['date'], sales_data['revenue'], color='#2E86AB', linewidth=1.5, alpha=0.8)
axes[0, 0].set_title('每日收入趋势', fontweight='bold')
axes[0, 0].set_ylabel('收入 (元)')
axes[0, 0].grid(True, alpha=0.3)

# 2. 区域分布
region_sum = sales_data.groupby('region')['revenue'].sum()
colors = ['#A23B72', '#F18F01', '#C73E1D', '#3B1F2B']
axes[0, 1].pie(region_sum.values, labels=region_sum.index, autopct='%1.1f%%', colors=colors)
axes[0, 1].set_title('区域收入占比', fontweight='bold')

# 3. 产品对比
product_monthly = sales_data.groupby([sales_data['date'].dt.month, 'product'])['revenue'].sum().unstack()
product_monthly.plot(kind='bar', ax=axes[1, 0], color=['#06A77D', '#FF9F1C', '#2EC4B6'])
axes[1, 0].set_title('月度产品对比', fontweight='bold')
axes[1, 0].set_ylabel('收入 (元)')
axes[1, 0].legend(title='产品')

# 4. 箱线图(分布分析)
sns.boxplot(data=sales_data, x='region', y='revenue', ax=axes[1, 1], palette='Set2')
axes[1, 1].set_title('区域收入分布', fontweight='bold')
axes[1, 1].set_ylabel('收入 (元)')

plt.tight_layout()
plt.show()

2.2 使用Plotly创建交互式仪表板

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

# 创建交互式销售地图
def create_interactive_dashboard():
    # 模拟各城市销售数据
    cities = ['上海', '北京', '广州', '深圳', '杭州', '成都', '武汉', '南京']
    lat = [31.2304, 39.9042, 23.1291, 22.5431, 30.2741, 30.5728, 30.5428, 32.0603]
    lon = [121.4737, 116.4074, 113.2644, 114.0579, 120.1551, 104.0668, 114.3055, 118.7969]
    sales = [450000, 380000, 320000, 290000, 250000, 180000, 160000, 140000]
    
    fig = make_subplots(
        rows=2, cols=2,
        specs=[[{"type": "scattergeo"}, {"type": "bar"}],
               [{"type": "scatter"}, {"type": "domain"}]],
        subplot_titles=('销售地理分布', 'Top5城市', '趋势分析', '市场份额')
    )
    
    # 地图
    fig.add_trace(go.Scattergeo(
        lon=lon, lat=lat, text=cities,
        mode='markers+text', marker=dict(size=sales, color=sales, colorscale='Viridis'),
        name='销售额', textposition='top center'
    ), row=1, col=1)
    
    # 柱状图
    top5 = sorted(zip(cities, sales), key=lambda x: x[1], reverse=True)[:5]
    fig.add_trace(go.Bar(
        x=[x[0] for x in top5], y=[x[1] for x in top5],
        marker_color='#2E86AB', name='Top5城市'
    ), row=1, col=2)
    
    # 趋势线
    dates = pd.date_range('2023-01-01', periods=12, freq='M')
    trend = np.cumsum(np.random.normal(0, 20000, 12)) + 100000
    fig.add_trace(go.Scatter(
        x=dates, y=trend, mode='lines+markers',
        line=dict(color='#F18F01', width=3), name='月度趋势'
    ), row=2, col=1)
    
    # 饼图
    fig.add_trace(go.Pie(
        labels=cities[:4], values=sales[:4],
        hole=0.4, name='区域份额'
    ), row=2, col=2)
    
    fig.update_layout(
        height=800, showlegend=False,
        title_text="2023年度销售分析仪表板", title_x=0.5
    )
    
    fig.show()

create_interactive_dashboard()

第三章:数据思维与业务洞察

3.1 构建分析框架

RFM客户价值分析模型

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# 模拟电商订单数据
np.random.seed(42)
customer_ids = np.arange(1, 1001)
orders = []

for cid in customer_ids:
    # 每个客户1-10笔订单
    num_orders = np.random.randint(1, 11)
    for _ in RFM分析:
    # 1. 计算RFM指标
    current_date = datetime(2023, 12, 31)
    rfm = df.groupby('customer_id').agg({
        'order_date': lambda x: (current_date - x.max()).days,  # Recency
        'order_id': 'count',  # Frequency
        'amount': 'sum'  # Monetary
    }).rename(columns={
        'order_date': 'recency',
        'order_id': 'frequency',
        'amount': 'monetary'
    })
    
    # 2. 评分(1-5分)
    rfm['R_score'] = pd.qcut(rfm['recency'], 5, labels=[5,4,3,2,1])  # 越小越好
    rfm['F_score'] = pd.qcut(rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
    rfm['M_score'] = pd.qcut(rfm['monetary'], 5, labels=[1,2,3,4,5])
    
    # 3. 客户分层
    rfm['RFM_score'] = rfm['R_score'].astype(str) + rfm['F_score'].astype(str) + rfm['M_score'].astype(str)
    
    def segment_customer(row):
        score = int(row['RFM_score'])
        if score >= 555:
            return 'VIP客户'
        elif score >= 444:
            return '高价值客户'
        elif score >= 333:
            return '潜力客户'
        elif score >= 222:
            return '一般客户'
        else:
            return '流失风险客户'
    
    rfm['segment'] = rfm.apply(segment_customer, axis=1)
    
    print("客户分层结果:")
    print(rfm['segment'].value_counts())
    
    # 4. 可视化
    segment_summary = rfm.groupby('segment').agg({
        'recency': 'mean',
        'frequency': 'mean',
        'monetary': 'mean'
    }).round(2)
    
    print("\n各分层平均值:")
    print(segment_summary)
    
    return rfm

# 执行分析
rfm_result = rfm_analysis(orders_df)

3.2 漏斗分析:用户转化路径分析

def funnel_analysis():
    # 模拟用户行为数据
    stages = ['访问', '注册', '浏览商品', '加入购物车', '下单', '支付成功']
    users = np.arange(1, 10001)
    
    # 模拟每个用户在各阶段的转化情况
    data = []
    for user in users:
        # 转化率递减
        prob = 0.95
        for stage in stages:
            if np.random.random() < prob:
                data.append({
                    'user_id': user,
                    'stage': stage,
                    'timestamp': datetime.now() - timedelta(days=np.random.randint(0, 30))
                })
                prob *= 0.85  # 每个阶段转化率下降
    
    df = pd.DataFrame(data)
    
    # 计算漏斗转化率
    funnel_counts = df.groupby('stage').size().reindex(stages)
    funnel_conversion = funnel_counts / funnel_counts.shift(1) * 100
    
    print("漏斗分析结果:")
    for i, stage in enumerate(stages):
        count = funnel_counts[stage]
        if i == 0:
            print(f"{stage}: {count} (100%)")
        else:
            conversion = funnel_conversion[stage]
            print(f"{stage}: {count} ({conversion:.1f}%)")
    
    # 可视化
    fig, ax = plt.subplots(figsize=(12, 8))
    
    # 漏斗图
    y_pos = np.arange(len(stages))
    widths = funnel_counts.values / funnel_counts.values[0] * 100
    
    bars = ax.barh(y_pos, widths, color='#2E86AB', alpha=0.8)
    
    # 添加数值标签
    for i, (stage, count) in enumerate(zip(stages, funnel_counts)):
        ax.text(widths[i] + 1, i, f'{count} ({widths[i]:.1f}%)', 
                va='center', fontweight='bold')
    
    ax.set_yticks(y_pos)
    ax.set_yticklabels(stages)
    ax.set_xlabel('转化率 (%)')
    ax.set_title('用户转化漏斗分析', fontsize=16, fontweight='bold')
    ax.grid(True, alpha=0.3, axis='x')
    
    plt.tight_layout()
    plt.show()

funnel_analysis()

第四章:性能优化与最佳实践

4.1 内存优化技巧

import pandas as pd
import numpy as np

def optimize_memory(df):
    """优化DataFrame内存使用"""
    start_mem = df.memory_usage().sum() / 1024**2
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            # 对象类型转换为category如果唯一值较少
            num_unique_values = len(df[col].unique())
            num_total_values = len(df[col])
            if num_unique_values / num_total_values < 0.5:
                df[col] = df[col].astype('category')
    
    end_mem = df.memory_usage().sum() / 1024**2
    print(f"内存优化: {start_mem:.2f} MB -> {end_mem:.2f} MB ({100*(start_mem-end_mem)/start_mem:.1f}% reduction)")
    return df

# 示例
df = pd.DataFrame({
    'id': range(1000000),
    'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
    'value': np.random.random(1000000)
})

df_optimized = optimize_memory(df)

4.2 代码性能分析

import time
import cProfile
import pstats

def performance_analysis():
    """性能分析示例"""
    
    # 创建测试数据
    df = pd.DataFrame({
        'A': np.random.random(100000),
        'B': np.random.random(100000),
        'C': np.random.choice(['X', 'Y', 'Z'], 100000)
    })
    
    # 方法1:使用apply(较慢)
    def method1():
        return df.apply(lambda row: row['A'] * 2 if row['B'] > 0.5 else row['A'], axis=1)
    
    # 方法2:使用numpy向量化(快)
    def method2():
        return np.where(df['B'] > 0.5, df['A'] * 2, df['A'])
    
    # 方法3:使用numba加速
    from numba import jit
    
    @jit(nopython=True)
    def calculate_fast(a, b):
        result = np.empty_like(a)
        for i in range(len(a)):
            if b[i] > 0.5:
                result[i] = a[i] * 2
            else:
                result[i] = a['A'][i]
        return result
    
    def method3():
        return calculate_fast(df['A'].values, df['B'].values)
    
    # 性能测试
    methods = [('Apply', method1), ('Numpy', method2), ('Numba', method3)]
    
    for name, func in methods:
        start = time.time()
        result = func()
        end = time.time()
        print(f"{name}: {end-start:.4f}秒")
    
    # 详细性能分析
    print("\n详细性能分析:")
    cProfile.run('method1()', sort='cumulative')
    stats = pstats.Stats(cProfile.run('method1()', sort='cumulative'))
    stats.sort_stats('cumulative').print_stats(10)

performance_analysis()

第五章:职场实战项目

5.1 销售预测与库存优化

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt

def sales_forecast():
    """销售预测与库存优化"""
    
    # 模拟历史销售数据
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')
    
    # 创建特征
    df = pd.DataFrame({'date': dates})
    df['year'] = df['date'].dt.year
    df['month'] = df['date'].dt.month
    df['day'] = df['date'].dt.day
    df['dayofweek'] = df['date'].dt.dayofweek
    df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
    
    # 模拟销售量(有趋势、季节性和随机性)
    base = 1000
    trend = np.linspace(0, 500, len(df))
    seasonal = 200 * np.sin(2 * np.pi * df['month'] / 12)
    random = np.random.normal(0, 50, len(df))
    
    df['sales'] = base + trend + seasonal + random
    
    # 添加节假日效应
    holidays = [pd.Timestamp('2020-01-25'), pd.Timestamp('2021-02-12'), 
                pd.Timestamp('2022-02-01'), pd.Timestamp('2023-01-22')]
    for holiday in holidays:
        mask = (df['date'] >= holiday) & (df['date'] <= holiday + pd.Timedelta(days=3))
        df.loc[mask, 'sales'] *= 1.5
    
    # 特征工程
    features = ['year', 'month', 'day', 'dayofweek', 'is_weekend']
    X = df[features]
    y = df['sales']
    
    # 划分训练测试集
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 训练模型
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # 预测
    y_pred = model.predict(X_test)
    
    # 评估
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    
    print(f"预测准确度评估:")
    print(f"平均绝对误差: {mae:.2f}")
    print(f"均方根误差: {rmse:.2f}")
    
    # 库存优化建议
    future_dates = pd.date_range('2024-01-01', '2024-01-31', freq='D')
    future_df = pd.DataFrame({'date': future_dates})
    future_df['year'] = future_df['date'].dt.year
    future_df['month'] = future_df['date'].dt.month
    future_df['day'] = future_df['date'].dt.day
    future_df['dayofweek'] = future_df['date'].dt.dayofweek
    future_df['is_weekend'] = future_df['dayofweek'].isin([5, 6]).astype(int)
    
    future_sales = model.predict(future_df[features])
    
    # 计算安全库存(考虑预测误差)
    safety_stock = rmse * 1.65  # 95%置信区间
    reorder_point = future_sales.mean() + safety_stock
    
    print(f"\n库存优化建议:")
    print(f"平均日销量预测: {future_sales.mean():.2f}")
    print(f"安全库存水平: {safety_stock:.2f}")
    print(f"补货点: {reorder_point:.2f}")
    
    # 可视化
    plt.figure(figsize=(14, 6))
    plt.plot(df['date'].tail(100), df['sales'].tail(100), label='历史数据', color='gray', alpha=0.7)
    plt.plot(future_dates, future_sales, label='预测值', color='#2E86AB', linewidth=2)
    plt.axhline(y=reorder_point, color='#F18F01', linestyle='--', label='补货点')
    plt.fill_between(future_dates, future_sales - safety_stock, future_sales + safety_stock, 
                     alpha=0.2, color='#2E86AB', label='95%置信区间')
    plt.title('销售预测与库存优化', fontsize=16, fontweight='bold')
    plt.xlabel('日期')
    plt.ylabel('销售量')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

sales_forecast()

5.2 用户流失预警系统

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

def churn_prediction():
    """用户流失预测"""
    
    # 模拟用户行为数据
    np.random.seed(42)
    n_users = 5000
    
    user_data = pd.DataFrame({
        'user_id': range(1, n_users + 1),
        'tenure': np.random.randint(1, 36, n_users),  # 在网时长(月)
        'monthly_usage': np.random.normal(500, 200, n_users),  # 月度使用量
        'support_calls': np.random.poisson(2, n_users),  # 客服电话次数
        'payment_delay': np.random.choice([0, 1], n_users, p=[0.8, 0.2]),  # 延迟支付
        'age': np.random.randint(18, 70, n_users)
    })
    
    # 模拟流失标签(基于规则)
    # 在网时长<6个月 + 使用量低 + 客服电话多 + 延迟支付 = 高流失风险
    conditions = (
        (user_data['tenure'] < 6) & 
        (user_data['monthly_usage'] < 300) & 
        (user_data['support_calls'] >= 3) & 
        (user_data['payment_delay'] == 1)
    )
    user_data['churn'] = conditions.astype(int)
    
    print("流失用户分布:")
    print(user_data['churn'].value_counts(normalize=True))
    
    # 特征和标签
    features = ['tenure', 'monthly_usage', 'support_calls', 'payment_delay', 'age']
    X = user_data[features]
    y = user_data['churn']
    
    # 划分数据集
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    
    # 训练模型
    model = LogisticRegression(random_state=42, max_iter=1000)
    model.fit(X_train, y_train)
    
    # 预测
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    
    # 评估
    print("\n模型评估报告:")
    print(classification_report(y_test, y_pred))
    
    # 混淆矩阵
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['未流失', '流失'], yticklabels=['未流失', '流失'])
    plt.title('混淆矩阵', fontsize=14, fontweight='bold')
    plt.ylabel('真实值')
    plt.xlabel('预测值')
    plt.show()
    
    # 特征重要性
    feature_importance = pd.DataFrame({
        'feature': features,
        'coefficient': model.coef_[0]
    }).sort_values('coefficient', key=abs, ascending=False)
    
    print("\n特征重要性(影响流失的因素):")
    print(feature_importance)
    
    # 预测新用户
    new_users = pd.DataFrame({
        'tenure': [2, 12, 5],
        'monthly_usage': [150, 600, 200],
        'support_calls': [5, 1, 4],
        'payment_delay': [1, 0, 1],
        'age': [25, 45, 30]
    })
    
    new_pred = model.predict_proba(new_users)[:, 1]
    print("\n新用户流失概率预测:")
    for i, prob in enumerate(new_pred):
        risk = "高风险" if prob > 0.7 else "中风险" if prob > 0.3 else "低风险"
        print(f"用户{i+1}: {prob:.1%} ({risk})")

churn_prediction()

第六章:数据工程与自动化

6.1 自动化数据管道

import schedule
import time
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('data_pipeline.log'),
        logging.StreamHandler()
    ]
)

class DataPipeline:
    """自动化数据管道"""
    
    def __init__(self, source_path, dest_path):
        self.source_path = source_path
        self.dest_path = dest_path
        self.logger = logging.getLogger(__name__)
    
    def extract(self):
        """数据抽取"""
        self.logger.info(f"开始抽取数据: {self.source_path}")
        try:
            # 模拟从API或数据库抽取
            df = pd.DataFrame({
                'timestamp': [datetime.now()],
                'data': [np.random.random(10).tolist()]
            })
            self.logger.info(f"成功抽取 {len(df)} 条记录")
            return df
        except Exception as e:
            self.logger.error(f"抽取失败: {e}")
            return None
    
    def transform(self, df):
        """数据转换"""
        if df is None:
            return None
        
        self.logger.info("开始数据转换")
        try:
            # 数据清洗和转换
            df['processed_at'] = datetime.now()
            df['data_mean'] = df['data'].apply(lambda x: np.mean(x))
            df['data_std'] = df['data'].apply(lambda x: np.std(x))
            self.logger.info("数据转换完成")
            return df
        except Exception as e:
            self.logger.error(f"转换失败: {e}")
            return None
    
    def load(self, df):
        """数据加载"""
        if df is None:
            return False
        
        self.logger.info(f"开始加载数据到: {self.dest_path}")
        try:
            # 追加模式保存
            df.to_csv(self.dest_path, mode='a', header=not pd.io.common.file_exists(self.dest_path), index=False)
            self.logger.info("数据加载成功")
            return True
        except Exception as e:
            self.logger.error(f"加载失败: {e}")
            return False
    
    def run(self):
        """执行完整管道"""
        self.logger.info("=" * 50)
        self.logger.info("管道开始执行")
        
        # ETL流程
        data = self.extract()
        transformed_data = self.transform(data)
        success = self.load(transformed_data)
        
        if success:
            self.logger.info("管道执行成功")
        else:
            self.logger.error("管道执行失败")
        
        return success

# 创建管道实例
pipeline = DataPipeline('source.csv', 'processed_data.csv')

# 定时任务(每天凌晨2点执行)
schedule.every().day.at("02:00").do(pipeline.run)

# 测试运行
if __name__ == "__main__":
    # 立即运行一次测试
    pipeline.run()
    
    # 后台运行定时任务
    print("定时任务已启动,按 Ctrl+C 退出")
    while True:
        schedule.run_pending()
        time.sleep(60)  # 每分钟检查一次

6.2 数据质量监控

import pandas as pd
import numpy as np
from datetime import datetime

class DataQualityMonitor:
    """数据质量监控"""
    
    def __init__(self, df):
        self.df = df
        self.report = {}
    
    def check_completeness(self):
        """完整性检查"""
        self.report['completeness'] = {
            'total_rows': len(self.df),
            'missing_values': self.df.isnull().sum().to_dict(),
            'missing_rate': (self.df.isnull().sum() / len(self.df)).to_dict(),
            'is_complete': self.df.isnull().sum().sum() == 0
        }
    
    def check_accuracy(self):
        """准确性检查"""
        # 检查数值范围
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        accuracy_report = {}
        
        for col in numeric_cols:
            if col in ['age']:
                # 年龄应该在合理范围内
                invalid_count = ((self.df[col] < 0) | (self.df[col] > 120)).sum()
                accuracy_report[col] = {
                    'invalid_count': invalid_count,
                    'invalid_rate': invalid_count / len(self.df),
                    'is_valid': invalid_count == 0
                }
        
        self.report['accuracy'] = accuracy_report
    
    def check_consistency(self):
        """一致性检查"""
        # 检查重复记录
        duplicates = self.df.duplicated().sum()
        self.report['consistency'] = {
            'duplicates': duplicates,
            'duplicate_rate': duplicates / len(self.df),
            'is_consistent': duplicates == 0
        }
    
    def check_timeliness(self):
        """时效性检查"""
        if 'timestamp' in self.df.columns:
            max_time = self.df['timestamp'].max()
            min_time = self.df['timestamp'].min()
            self.report['timeliness'] = {
                'data_age_hours': (datetime.now() - max_time).total_seconds() / 3600,
                'time_range_hours': (max_time - min_time).total_seconds() / 3600,
                'is_fresh': (datetime.now() - max_time).total_seconds() < 86400  # 24小时内
            }
    
    def generate_report(self):
        """生成完整报告"""
        self.check_completeness()
        self.check_accuracy()
        self.check_consistency()
        self.check_timeliness()
        
        # 综合评分
        total_checks = 0
        passed_checks = 0
        
        for category, details in self.report.items():
            if isinstance(details, dict):
                for key, value in details.items():
                    if isinstance(value, dict) and 'is_valid' in value:
                        total_checks += 1
                        if value['is_valid']:
                            passed_checks += 1
                    elif isinstance(value, bool) and key.startswith('is_'):
                        total_checks += 1
                        if value:
                            passed_checks += 1
        
        quality_score = (passed_checks / total_checks * 100) if total_checks > 0 else 0
        
        return {
            'quality_score': quality_score,
            'report': self.report,
            'passed_checks': passed_checks,
            'total_checks': total_checks
        }

# 使用示例
sample_data = pd.DataFrame({
    'user_id': range(1, 1001),
    'age': np.random.randint(18, 70, 1000),
    'score': np.random.normal(75, 15, 1000),
    'timestamp': pd.date_range('2023-01-01', periods=1000, freq='H')
})

# 故意添加一些质量问题
sample_data.loc[0:5, 'age'] = -1  # 无效年龄
sample_data.loc[10:15, 'score'] = np.nan  # 缺失值
sample_data = pd.concat([sample_data, sample_data.iloc[:5]])  # 重复记录

# 监控
monitor = DataQualityMonitor(sample_data)
quality_report = monitor.generate_report()

print("数据质量报告:")
print(f"综合评分: {quality_report['quality_score']:.1f}%")
print(f"通过检查: {quality_report['passed_checks']}/{quality_report['total_checks']}")
print("\n详细报告:")
import json
print(json.dumps(quality_report['report'], indent=2, default=str))

第七章:综合实战案例

7.1 完整的电商数据分析项目

# 由于代码较长,这里展示关键部分
def ecommerce_analysis_project():
    """电商数据分析完整项目"""
    
    # 1. 数据加载与探索
    print("=== 阶段1: 数据加载与探索 ===")
    # 模拟电商数据
    np.random.seed(42)
    
    # 用户数据
    users = pd.DataFrame({
        'user_id': range(1, 1001),
        'signup_date': pd.date_range('2022-01-01', periods=1000, freq='D'),
        'age': np.random.randint(18, 65, 1000),
        'gender': np.random.choice(['M', 'F'], 1000, p=[0.55, 0.45]),
        'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], 1000)
    })
    
    # 订单数据
    orders = []
    for user_id in range(1, 1001):
        num_orders = np.random.randint(0, 15)
        for _ in range(num_orders):
            orders.append({
                'order_id': len(orders) + 1,
                'user_id': user_id,
                'order_date': users.loc[users['user_id'] == user_id, 'signup_date'].iloc[0] + 
                             pd.Timedelta(days=np.random.randint(0, 365)),
                'amount': np.random.lognormal(3, 0.5),
                'category': np.random.choice(['电子', '服装', '家居', '美妆'], p=[0.3, 0.25, 0.25, 0.2])
            })
    
    orders_df = pd.DataFrame(orders)
    
    # 2. 数据清洗
    print("\n=== 阶段2: 数据清洗 ===")
    # 处理异常值
    orders_df = orders_df[orders_df['amount'] > 0]
    orders_df['amount'] = np.clip(orders_df['amount'], 10, 5000)  # 限制在合理范围
    
    # 3. 分析计算
    print("\n=== 阶段3: 核心指标计算 ===")
    
    # 用户生命周期价值
    user_ltv = orders_df.groupby('user_id').agg({
        'amount': ['sum', 'mean', 'count'],
        'order_date': ['min', 'max']
    }).round(2)
    
    user_ltv.columns = ['总消费', '平均订单额', '订单数', '首次购买', '最后购买']
    user_ltv['生命周期(天)'] = (user_ltv['最后购买'] - user_ltv['首次购买']).dt.days
    
    # 4. 用户分层
    print("\n=== 阶段4: 用户分层 ===")
    # 使用RFM模型
    current_date = orders_df['order_date'].max()
    rfm = orders_df.groupby('user_id').agg({
        'order_date': lambda x: (current_date - x.max()).days,
        'order_id': 'count',
        'amount': 'sum'
    })
    rfm.columns = ['R', 'F', 'M']
    
    # 评分
    rfm['R_score'] = pd.qcut(rfm['R'], 5, labels=[5,4,3,2,1])
    rfm['F_score'] = pd.qcut(rfm['F'].rank(method='first'), 5, labels=[1,2,3,4,5])
    rfm['M_score'] = pd.qcut(rfm['M'], 5, labels=[1,2,3,4,5])
    
    # 分层
    def segment(row):
        score = int(row['R_score']) + int(row['F_score']) + int(row['M_score'])
        if score >= 12: return 'VIP'
        elif score >= 9: return '高价值'
        elif score >= 6: return '潜力'
        else: return '一般'
    
    rfm['segment'] = rfm.apply(segment, axis=1)
    
    # 5. 业务洞察
    print("\n=== 阶段5: 业务洞察 ===")
    
    # 各城市销售对比
    city_sales = orders_df.merge(users[['user_id', 'city']], on='user_id').groupby('city')['amount'].sum()
    print("各城市销售额:")
    print(city_sales)
    
    # 品类分析
    category_growth = orders_df.groupby([orders_df['order_date'].dt.to_period('M'), 'category'])['amount'].sum().unstack()
    print("\n月度品类趋势:")
    print(category_growth.tail())
    
    # 6. 可视化
    print("\n=== 阶段6: 可视化 ===")
    
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # RFM分布
    rfm_score = rfm['R_score'].astype(str) + rfm['F_score'].astype(str) + rfm['M_score'].astype(str)
    rfm_score.value_counts().head(10).plot(kind='bar', ax=axes[0,0], color='#2E86AB')
    axes[0,0].set_title('Top10 RFM组合分布')
    
    # 用户分层
    rfm['segment'].value_counts().plot(kind='pie', ax=axes[0,1], autopct='%1.1f%%')
    axes[0,1].set_title('用户分层占比')
    
    # 城市销售
    city_sales.plot(kind='bar', ax=axes[1,0], color='#F18F01')
    axes[1,0].set_title('各城市销售额')
    
    # 品类趋势
    category_growth.plot(kind='line', ax=axes[1,1])
    axes[1,1].set_title('品类月度趋势')
    axes[1,1].legend(title='品类')
    
    plt.tight_layout()
    plt.show()
    
    # 7. 输出报告
    print("\n=== 阶段7: 分析报告 ===")
    total_revenue = orders_df['amount'].sum()
    total_users = orders_df['user_id'].nunique()
    avg_order_value = orders_df['amount'].mean()
    
    print(f"""
    电商数据分析总结报告
    ====================
    核心指标:
    - 总销售额: ¥{total_revenue:,.2f}
    - 活跃用户数: {total_users:,}
    - 平均订单价值: ¥{avg_order_value:.2f}
    
    关键发现:
    1. VIP用户占比: {len(rfm[rfm['segment']=='VIP'])/len(rfm)*100:.1f}%
    2. 最高销售额城市: {city_sales.idxmax()}
    3. 最受欢迎品类: {orders_df['category'].mode().iloc[0]}
    
    行动建议:
    - 针对VIP用户设计专属权益
    - 加强在{city_sales.idxmax()}市场的推广
    - 优化{orders_df['category'].mode().iloc[0]}品类的供应链
    """)

# 执行项目
ecommerce_analysis_project()

第八章:持续学习与职业发展

8.1 建立个人数据分析作品集

def create_portfolio_project():
    """创建数据分析作品集项目模板"""
    
    project_template = {
        "项目名称": "用户行为分析与精准营销",
        "项目描述": "通过分析用户行为数据,构建用户画像,实现精准营销",
        "技术栈": ["Python", "Pandas", "Scikit-learn", "Plotly"],
        "数据源": "模拟电商用户行为数据",
        "关键步骤": [
            "数据清洗与预处理",
            "用户行为模式分析",
            "用户分群(K-Means聚类)",
            "购买预测模型",
            "营销策略建议"
        ],
        "预期成果": [
            "用户分群报告",
            "购买预测模型(准确率>85%)",
            "营销策略建议书",
            "交互式可视化仪表板"
        ],
        "代码仓库": "https://github.com/yourusername/project-name",
        "展示方式": "Jupyter Notebook + Streamlit应用"
    }
    
    print("数据分析作品集项目模板:")
    print(json.dumps(project_template, indent=2, ensure_ascii=False))
    
    return project_template

# 生成学习路径
def learning_roadmap():
    """数据分析学习路径"""
    
    roadmap = {
        "阶段1: 基础夯实 (1-2个月)": [
            "Python基础语法",
            "Pandas数据处理",
            "Matplotlib/Seaborn可视化",
            "SQL基础查询"
        ],
        "阶段2: 进阶提升 (2-3个月)": [
            "高级Pandas技巧",
            "数据清洗最佳实践",
            "统计分析基础",
            "数据可视化进阶"
        ],
        "阶段3: 机器学习入门 (3-4个月)": [
            "Scikit-learn基础",
            "回归与分类模型",
            "模型评估与调优",
            "特征工程"
        ],
        "阶段4: 业务应用 (持续)": [
            "行业业务理解",
            "分析框架构建",
            "数据故事讲述",
            "商业洞察提炼"
        ],
        "阶段5: 工程能力 (持续)": [
            "数据管道构建",
            "自动化脚本",
            "性能优化",
            "代码规范"
        ]
    }
    
    print("\n数据分析学习路线图:")
    for phase, skills in roadmap.items():
        print(f"\n{phase}:")
        for skill in skills:
            print(f"  - {skill}")
    
    return roadmap

# 执行
portfolio = create_portfolio_project()
roadmap = learning_roadmap()

总结

通过本课程的系统学习,你将掌握从基础数据处理到高级分析建模的完整技能链。关键要点:

  1. 技术层面:精通Pandas高级操作、高性能计算、复杂可视化
  2. 业务层面:建立数据思维,掌握RFM、漏斗分析等经典模型
  3. 工程层面:构建自动化数据管道,实现数据质量监控
  4. 职业层面:打造个人作品集,形成持续学习能力

记住,数据分析的核心价值不在于工具的使用,而在于从数据中发现业务洞察并推动决策的能力。持续实践,不断挑战复杂项目,你将成为职场中不可或缺的数据专家。


下一步行动建议

  1. 选择1-2个本课程中的案例进行实战演练
  2. 将学到的技巧应用到当前工作中
  3. 在GitHub上建立个人数据分析项目仓库
  4. 参与Kaggle竞赛或开源数据分析项目
  5. 定期复盘和总结自己的分析方法论

祝你在数据分析的道路上越走越远,成为真正的数据驱动决策专家!