引言:为什么需要Python数据分析进阶?

在当今数据驱动的时代,Python已经成为数据分析领域的首选语言。从初学者到专业数据分析师,掌握Python数据分析技能不仅能提升个人竞争力,还能为企业创造巨大价值。然而,许多学习者在掌握了基础语法和简单数据处理后,往往陷入”入门容易精通难”的困境。

本指南旨在帮助已经具备Python基础的学习者突破瓶颈,系统掌握进阶数据分析技能。我们将从数据处理的深度优化开始,逐步深入到统计分析、机器学习、大数据处理等高级主题,并通过完整的实战项目来巩固所学知识。

第一部分:数据处理的进阶技巧

1.1 高效数据清洗与预处理

数据清洗是数据分析中最耗时但至关重要的环节。进阶的数据清洗不仅要处理明显的错误,还要识别和处理隐藏的数据质量问题。

1.1.1 缺失值处理的高级策略

import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

# 创建包含复杂缺失模式的数据集
np.random.seed(42)
data = {
    'age': [25, 30, np.nan, 35, 40, np.nan, 45, 50],
    'income': [50000, 60000, 55000, np.nan, 70000, 65000, np.nan, 80000],
    'education': ['Bachelors', 'Masters', 'PhD', 'Bachelors', np.nan, 'Masters', 'PhD', 'Bachelors'],
    'region': ['North', 'South', 'North', 'South', 'North', 'South', 'North', 'South']
}
df = pd.DataFrame(data)

# 策略1:基于业务逻辑的填充
# 年龄缺失时,用同地区平均年龄填充
df['age'] = df.groupby('region')['age'].transform(
    lambda x: x.fillna(x.mean())
)

# 策略2:使用机器学习模型预测缺失值
imputer = IterativeImputer(random_state=42)
df[['income']] = imputer.fit_transform(df[['income']])

# 策略3:分类变量的智能填充
df['education'] = df.groupby('region')['education'].transform(
    lambda x: x.fillna(x.mode()[0] if not x.mode().empty else 'Unknown')
)

print("处理后的数据:")
print(df)

详细说明:

  • IterativeImputer 使用所有其他特征来预测缺失值,比简单均值填充更准确
  • 分组填充(groupby)能保持数据的局部特征
  • 分类变量填充时使用众数(mode)更合理

1.1.2 异常值检测与处理

import pandas as pd
import numpy as np
from scipy import stats

# 创建包含异常值的数据
np.random.seed(42)
normal_data = np.random.normal(100, 15, 100)
outliers = np.array([5, 8, 200, 250, 195])
data = np.concatenate([normal_data, outliers])

# 方法1:Z-score方法
z_scores = np.abs(stats.zscore(data))
threshold = 2.5
outliers_z = data[z_scores > threshold]
print(f"Z-score检测到的异常值: {outliers_z}")

# 方法2:IQR方法(四分位距)
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_iqr = data[(data < lower_bound) | (data > upper_bound)]
print(f"IQR检测到的异常值: {outliers_iqr}")

# 方法3:孤立森林(适用于高维数据)
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
outliers_if = iso_forest.fit_predict(data.reshape(-1, 1))
print(f"孤立森林检测到的异常值索引: {np.where(outliers_if == -1)[0]}")

1.2 高性能数据操作

1.2.1 向量化操作与避免循环

import pandas as pd
import numpy as np

# 创建大型数据集
df = pd.DataFrame({
    'A': np.random.randint(1, 100, 1000000),
    'B': np.random.randint(1, 100, 1000000),
    'C': np.random.randint(1, 100, 1000000)
})

# ❌ 低效的循环方式(不要这样做)
def slow_calculation(df):
    result = []
    for i in range(len(df)):
        if df.loc[i, 'A'] > 50 and df.loc[i, 'B'] < 80:
            result.append(df.loc[i, 'A'] * df.loc[i, 'B'] + df.loc[i, 'C'])
        else:
            result.append(df.loc[i, 'C'])
    return pd.Series(result)

# ✅ 高效的向量化方式
def fast_calculation(df):
    condition = (df['A'] > 50) & (df['B'] < 80)
    result = np.where(condition, df['A'] * df['B'] + df['C'], df['C'])
    return pd.Series(result)

# 性能对比
import time
start = time.time()
slow_result = slow_calculation(df)
time_slow = time.time() - start

start = time.time()
fast_result = fast_calculation(df)
time_fast = time.time() - start

print(f"循环方式耗时: {time_slow:.4f}秒")
print(f"向量化方式耗时: {time_fast:.4f}秒")
print(f"性能提升: {time_slow/time_fast:.2f}倍")

1.2.2 多表合并与连接优化

import pandas as pd
import numpy as np

# 创建多个DataFrame
df1 = pd.DataFrame({
    'id': np.arange(1, 100001),
    'value1': np.random.random(100000)
})

df2 = pd.DataFrame({
    'id': np.arange(1, 100001),
    'value2': np.random.random(100000)
})

df3 = pd.DataFrame({
    'id': np.random.choice(np.arange(1, 100001), 80000, replace=False),
    'value3': np.random.random(80000)
})

# 方法1:多次merge(低效)
start = time.time()
result1 = df1.merge(df2, on='id').merge(df3, on='id')
time1 = time.time() - start

# 方法2:使用join(高效)
start = time.time()
df2_indexed = df2.set_index('id')
df3_indexed = df3.set_index('id')
result2 = df1.set_index('id').join(df2_indexed).join(df3_indexed).reset_index()
time2 = time.time() - start

# 方法3:使用concat(适用于特定场景)
start = time.time()
df3_full = df3.set_index('id').reindex(df1['id']).reset_index()
result3 = pd.concat([df1.set_index('id'), df2.set_index('id'), df3_full.set_index('id')], axis=1).reset_index()
time3 = time.time() - start

print(f"多次merge耗时: {time1:.4f}秒")
print(f"join方式耗时: {time2:.4f}秒")
print(f"concat方式耗时: {time3:.4f}秒")

第二部分:统计分析与假设检验

2.1 高级统计分析方法

2.1.1 多变量相关性分析

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, spearmanr
from sklearn.preprocessing import StandardScaler

# 创建复杂数据集
np.random.seed(42)
n = 1000
data = {
    'temperature': np.random.normal(25, 5, n),
    'humidity': np.random.normal(60, 10, n),
    'pressure': np.random.normal(1013, 20, n),
    'sales': np.random.normal(1000, 200, n)
}
df = pd.DataFrame(data)

# 添加非线性关系
df['sales'] = df['sales'] + 0.5 * df['temperature']**2 - 0.1 * df['humidity'] * df['pressure']

# 1. Pearson相关系数(线性关系)
corr_pearson = df.corr(method='pearson')
print("Pearson相关系数矩阵:")
print(corr_pearson)

# 2. Spearman相关系数(单调关系)
corr_spearman = df.corr(method='spearman')
print("\nSpearman相关系数矩阵:")
print(corr_spearman)

# 3. 偏相关分析(控制其他变量影响)
from pingouin import partial_corr
partial_corr_result = partial_corr(data=df, x='temperature', y='sales', covar=['humidity', 'pressure'])
print("\n偏相关分析结果(控制湿度和气压):")
print(partial_corr_result)

# 4. 可视化相关性热力图
plt.figure(figsize=(10, 8))
sns.heatmap(corr_pearson, annot=True, cmap='coolwarm', center=0, square=True)
plt.title('Pearson相关系数热力图')
plt.tight_layout()
plt.show()

2.1.2 回归分析深入

import statsmodels.api as sm
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

# 创建回归数据集
np.random.seed(42)
n = 1000
X = np.random.multivariate_normal(
    mean=[0, 0, 0],
    cov=[[1, 0.5, 0.3],
         [0.5, 1, 0.2],
         [0.3, 0.2, 1]],
    size=n
)
y = 2 + 3*X[:, 0] + 1.5*X[:, 1] - 0.8*X[:, 2] + np.random.normal(0, 1, n)

df = pd.DataFrame(X, columns=['X1', 'X2', 'X3'])
df['y'] = y

# 1. 普通最小二乘回归(OLS)
X = df[['X1', 'X2', 'X3']]
X = sm.add_constant(X)  # 添加截距项
y = df['y']

model = sm.OLS(y, X).fit()
print("OLS回归结果:")
print(model.summary())

# 2. 岭回归(处理多重共线性)
from sklearn.linear_model import Ridge
ridge = Ridge(alpha=1.0)
ridge.fit(df[['X1', 'X2', 'X3']], y)
print(f"\n岭回归系数: {ridge.coef_}")
print(f"岭回归截距: {ridge.intercept_}")

# 3. 交叉验证评估
from sklearn.model_selection import cross_val_score
scores = cross_val_score(ridge, df[['X1', 'X2', 'X3']], y, cv=5, scoring='r2')
print(f"\n5折交叉验证R²分数: {scores}")
print(f"平均R²: {scores.mean():.4f} (+/- {scores.std():.4f})")

2.2 假设检验的高级应用

2.2.1 多重假设检验校正

import numpy as np
from scipy import stats
from statsmodels.stats.multitest import multipletests

# 模拟基因表达数据(1000个基因,20个样本)
np.random.seed(42)
n_genes = 1000
n_samples = 20

# 990个基因无差异,10个基因有差异
gene_expression = np.random.normal(0, 1, (n_genes, n_samples))
gene_expression[10:20, 10:] += 2  # 10个基因在后10个样本中表达上调

# 进行t检验
p_values = []
for i in range(n_genes):
    t_stat, p_val = stats.ttest_ind(
        gene_expression[i, :10],
        gene_expression[i, 10:]
    )
    p_values.append(p_val)

p_values = np.array(p_values)

# 未校正的显著性结果
significant_raw = np.sum(p_values < 0.05)
print(f"未校正的显著性基因数量: {significant_raw}")

# Bonferroni校正
bonferroni = multipletests(p_values, alpha=0.05, method='bonferroni')
significant_bonferroni = np.sum(bonferroni[0])
print(f"Bonferroni校正后的显著性基因数量: {significant_bonferroni}")

# FDR校正(Benjamini-Hochberg)
fdr = multipletests(p_values, alpha=0.05, method='fdr_bh')
significant_fdr = np.sum(fdr[0])
print(f"FDR校正后的显著性基因数量: {significant_fdr}")

# 结果对比
print("\n多重检验校正效果对比:")
print(f"原始p值<0.05的数量: {significant_raw}")
print(f"Bonferroni校正后: {significant_bonferroni}")
print(f"FDR校正后: {significant_fdr}")

第三部分:机器学习在数据分析中的应用

3.1 特征工程与选择

3.1.1 自动特征工程

import pandas as pd
import numpy as np
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.preprocessing import PolynomialFeatures
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel

# 创建特征工程数据集
np.random.seed(42)
n_samples = 1000
X = np.random.randn(n_samples, 10)
y = (X[:, 0] + X[:, 1]**2 + X[:, 2]*X[:, 3] + np.random.randn(n_samples) > 1).astype(int)

df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(10)])

# 1. 多项式特征
poly = PolynomialFeatures(degree=2, interaction_only=False, include_bias=False)
X_poly = poly.fit_transform(df)
poly_features = poly.get_feature_names_out(df.columns)
print(f"原始特征数: {df.shape[1]}, 多项式特征数: {X_poly.shape[1]}")

# 2. 基于统计的特征选择
selector_f = SelectKBest(score_func=f_classif, k=5)
X_f = selector_f.fit_transform(df, y)
selected_features_f = df.columns[selector_f.get_support()]
print(f"F检验选择的特征: {list(selected_features_f)}")

# 3. 基于模型的特征选择
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(df, y)
selector_model = SelectFromModel(rf, prefit=True, threshold='median')
X_model = selector_model.transform(df)
selected_features_model = df.columns[selector_model.get_support()]
print(f"随机森林选择的特征: {list(selected_features_model)}")

# 4. 递归特征消除(RFE)
from sklearn.feature_selection import RFE
rfe = RFE(estimator=RandomForestClassifier(n_estimators=50, random_state=42), n_features_to_select=5)
rfe.fit(df, y)
selected_features_rfe = df.columns[rfe.support_]
print(f"RFE选择的特征: {list(selected_features_rfe)}")

3.2 模型训练与评估

3.2.1 交叉验证策略

from sklearn.model_selection import (
    StratifiedKFold, TimeSeriesSplit, 
    cross_val_score, cross_validate
)
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

# 创建分类数据集
X, y = make_classification(
    n_samples=1000, n_features=20, n_informative=15,
    n_redundant=5, n_classes=2, random_state=42
)

model = RandomForestClassifier(n_estimators=100, random_state=42)

# 1. 标准K折交叉验证
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(model, X, y, cv=cv, scoring='accuracy')
print(f"标准5折交叉验证准确率: {scores}")
print(f"平均准确率: {scores.mean():.4f} (+/- {scores.std():.4f})")

# 2. 时间序列交叉验证(适用于时间序列数据)
tscv = TimeSeriesSplit(n_splits=5)
scores_ts = cross_val_score(model, X, y, cv=tscv, scoring='accuracy')
print(f"\n时间序列交叉验证准确率: {scores_ts}")
print(f"平均准确率: {scores_ts.mean():.4f} (+/- {scores_ts.std():.4f})")

# 3. 多指标交叉验证
scoring = ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']
cv_results = cross_validate(model, X, y, cv=cv, scoring=scoring, return_train_score=True)

print("\n多指标交叉验证结果:")
for metric in scoring:
    test_score = cv_results[f'test_{metric}'].mean()
    train_score = cv_results[f'train_{metric}'].mean()
    print(f"{metric}: 测试集={test_score:.4f}, 训练集={train_score:.4f}")

第四部分:大数据处理与性能优化

4.1 处理超出内存的数据集

4.1.1 使用Dask进行并行计算

import dask.dataframe as dd
import pandas as pd
import numpy as np
import os

# 创建模拟大数据集(分块保存)
def create_large_dataset():
    """创建多个CSV文件模拟大数据"""
    if not os.path.exists('data_chunks'):
        os.makedirs('data_chunks')
    
    for i in range(10):
        chunk = pd.DataFrame({
            'id': np.arange(i*100000, (i+1)*100000),
            'value': np.random.random(100000),
            'category': np.random.choice(['A', 'B', 'C'], 100000),
            'timestamp': pd.date_range('2020-01-01', periods=100000, freq='1min')
        })
        chunk.to_csv(f'data_chunks/chunk_{i}.csv', index=False)

create_large_dataset()

# 使用Dask读取和处理大数据
ddf = dd.read_csv('data_chunks/chunk_*.csv')

# Dask延迟计算(不会立即执行)
# 1. 基本统计
mean_value = ddf['value'].mean()
std_value = ddf['value'].std()

# 2. 分组聚合
category_stats = ddf.groupby('category').agg({
    'value': ['mean', 'std', 'count']
}).compute()  # compute()触发实际计算

print("Dask计算结果:")
print(f"平均值: {mean_value.compute():.4f}")
print(f"标准差: {std_value.compute():.4f}")
print("\n分类统计:")
print(category_stats)

# 3. 复杂操作
result = (
    ddf[ddf['value'] > 0.5]
    .groupby('category')
    .agg({'value': 'sum'})
    .compute()
)
print(f"\nvalue>0.5的分类求和:\n{result}")

# 清理
import shutil
shutil.rmtree('data_chunks')

4.1.2 内存优化技巧

import pandas as pd
import numpy as np
import sys

def optimize_memory(df):
    """优化DataFrame内存使用"""
    start_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"原始内存使用: {start_mem:.2f} MB")
    
    # 优化数值类型
    for col in df.select_dtypes(include=['int']).columns:
        col_min = df[col].min()
        col_max = df[col].max()
        
        if col_min >= 0:
            if col_max < 256:
                df[col] = df[col].astype('uint8')
            elif col_max < 65536:
                df[col] = df[col].astype('uint16')
            elif col_max < 4294967296:
                df[col] = df[col].astype('uint32')
        else:
            if col_min > np.iinfo(np.int8).min and col_max < np.iinfo(np.int8).max:
                df[col] = df[col].astype('int8')
            elif col_min > np.iinfo(np.int16).min and col_max < np.iinfo(np.int16).max:
                df[col] = df[col].astype('int16')
            elif col_min > np.iinfo(np.int32).min and col_max < np.iinfo(np.int32).max:
                df[col] = df[col].astype('int32')
    
    # 优化浮点类型
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    # 优化对象类型
    for col in df.select_dtypes(include=['object']).columns:
        num_unique = df[col].nunique()
        num_total = len(df)
        if num_unique / num_total < 0.5:
            df[col] = df[col].astype('category')
    
    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"优化后内存使用: {end_mem:.2f} MB")
    print(f"内存节省: {((start_mem - end_mem) / start_mem * 100):.2f}%")
    
    return df

# 测试内存优化
df = pd.DataFrame({
    'int_col': np.random.randint(0, 100, 100000),
    'float_col': np.random.random(100000),
    'category_col': np.random.choice(['A', 'B', 'C', 'D'], 100000),
    'large_int': np.random.randint(0, 1000, 100000)
})

optimized_df = optimize_memory(df)
print("\n优化后的数据类型:")
print(optimized_df.dtypes)

第五部分:实战项目:销售数据分析系统

5.1 项目概述与数据准备

import pandas as pd
import numpy as np
import matplotlib.pyplot as2
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体(根据系统调整)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 1. 创建模拟销售数据
def create_sales_data():
    """创建包含1年销售数据的模拟数据集"""
    np.random.seed(42)
    
    # 基础数据
    n_records = 50000
    start_date = datetime(2023, 1, 1)
    
    # 日期
    dates = [start_date + timedelta(days=np.random.randint(0, 365)) for _ in range(n_records)]
    
    # 产品
    products = ['Laptop', 'Phone', 'Tablet', 'Monitor', 'Keyboard', 'Mouse', 'Headphone']
    categories = ['Electronics', 'Electronics', 'Electronics', 'Electronics', 'Accessories', 'Accessories', 'Accessories']
    
    # 地区
    regions = ['North', 'South', 'East', 'West']
    
    # 生成数据
    data = {
        'date': dates,
        'product': np.random.choice(products, n_records),
        'category': np.random.choice(categories, n_records),
        'region': np.random.choice(regions, n_records),
        'quantity': np.random.randint(1, 10, n_records),
        'unit_price': np.random.uniform(50, 2000, n_records),
        'discount': np.random.choice([0, 0.05, 0.1, 0.15, 0.2], n_records, p=[0.6, 0.15, 0.1, 0.1, 0.05])
    }
    
    df = pd.DataFrame(data)
    
    # 计算销售额
    df['revenue'] = df['quantity'] * df['unit_price'] * (1 - df['discount'])
    
    # 添加季节性特征
    df['month'] = df['date'].dt.month
    df['quarter'] = df['date'].dt.quarter
    df['day_of_week'] = df['date'].dt.dayofweek
    
    # 添加促销标记
    df['is_promotion'] = df['discount'] > 0.1
    
    return df

sales_df = create_sales_data()
print("销售数据概览:")
print(sales_df.head())
print(f"\n数据形状: {sales_df.shape}")
print(f"\n数据类型:\n{sales_df.dtypes}")

5.2 数据清洗与探索性分析

# 2. 数据质量检查
def data_quality_report(df):
    """生成数据质量报告"""
    report = pd.DataFrame({
        '数据类型': df.dtypes,
        '缺失值': df.isnull().sum(),
        '唯一值': df.nunique(),
        '最小值': df.min(),
        '最大值': df.max(),
        '均值': df.mean(numeric_only=True),
        '中位数': df.median(numeric_only=True)
    })
    return report

print("数据质量报告:")
print(data_quality_report(sales_df))

# 3. 异常值检测
def detect_outliers_iqr(df, column):
    """使用IQR方法检测异常值"""
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
    return outliers, lower_bound, upper_bound

# 检查销售额异常值
outliers, lower, upper = detect_outliers_iqr(sales_df, 'revenue')
print(f"\n销售额异常值检测:")
print(f"异常值数量: {len(outliers)}")
print(f"异常值占比: {len(outliers)/len(sales_df)*100:.2f}%")
print(f"正常范围: [{lower:.2f}, {upper:.2f}]")

# 4. 探索性数据分析
def exploratory_analysis(df):
    """执行探索性数据分析"""
    print("\n=== 探索性数据分析 ===")
    
    # 销售额统计
    print("\n销售额统计:")
    print(df['revenue'].describe())
    
    # 按产品统计
    product_stats = df.groupby('product').agg({
        'revenue': ['sum', 'mean', 'count'],
        'quantity': 'sum'
    }).round(2)
    print("\n按产品统计:")
    print(product_stats)
    
    # 按地区统计
    region_stats = df.groupby('region')['revenue'].agg(['sum', 'mean', 'count']).round(2)
    print("\n按地区统计:")
    print(region_stats)
    
    # 月度趋势
    monthly_trend = df.groupby('month')['revenue'].sum()
    print("\n月度销售趋势:")
    print(monthly_trend)

exploratory_analysis(sales_df)

5.3 高级分析与可视化

# 5. 高级分析:RFM模型
def calculate_rfm(df):
    """计算RFM指标"""
    # 计算最近购买时间(Recency)
    max_date = df['date'].max()
    recency = df.groupby('region')['date'].apply(lambda x: (max_date - x.max()).days)
    
    # 计算购买频率(Frequency)
    frequency = df.groupby('region').size()
    
    # 计算货币价值(Monetary)
    monetary = df.groupby('region')['revenue'].sum()
    
    rfm = pd.DataFrame({
        'Recency': recency,
        'Frequency': frequency,
        'Monetary': monetary
    })
    
    # RFM评分(1-5分)
    rfm['R_Score'] = pd.qcut(rfm['Recency'], 5, labels=[5,4,3,2,1])  # 越小越好
    rfm['F_Score'] = pd.qcut(rfm['Frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
    rfm['M_Score'] = pd.qcut(rfm['Monetary'], 5, labels=[1,2,3,4,5])
    
    rfm['RFM_Score'] = rfm['R_Score'].astype(str) + rfm['F_Score'].astype(str) + rfm['M_Score'].astype(str)
    
    return rfm

rfm_result = calculate_rfm(sales_df)
print("RFM分析结果:")
print(rfm_result)

# 6. 可视化分析
def create_visualizations(df):
    """创建高级可视化"""
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # 1. 销售额分布
    sns.histplot(data=df, x='revenue', bins=50, kde=True, ax=axes[0,0])
    axes[0,0].set_title('销售额分布')
    axes[0,0].set_xlabel('销售额')
    
    # 2. 产品销售额对比
    product_revenue = df.groupby('product')['revenue'].sum().sort_values(ascending=False)
    sns.barplot(x=product_revenue.index, y=product_revenue.values, ax=axes[0,1])
    axes[0,1].set_title('各产品销售额')
    axes[0,1].set_xticklabels(axes[0,1].get_xticklabels(), rotation=45)
    
    # 3. 月度趋势
    monthly_revenue = df.groupby('month')['revenue'].sum()
    sns.lineplot(x=monthly_revenue.index, y=monthly_revenue.values, marker='o', ax=axes[1,0])
    axes[1,0].set_title('月度销售趋势')
    axes[1,0].set_xlabel('月份')
    axes[1,0].set_xticks(range(1,13))
    
    # 4. 地区-产品热力图
    pivot_data = df.pivot_table(values='revenue', index='region', columns='product', aggfunc='sum')
    sns.heatmap(pivot_data, annot=True, fmt='.0f', cmap='YlOrRd', ax=axes[1,1])
    axes[1,1].set_title('地区-产品销售额热力图')
    
    plt.tight_layout()
    plt.show()

create_visualizations(sales_df)

5.4 预测分析与机器学习

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import LabelEncoder

# 7. 特征工程
def prepare_features(df):
    """准备机器学习特征"""
    df_ml = df.copy()
    
    # 编码分类变量
    le_product = LabelEncoder()
    df_ml['product_encoded'] = le_product.fit_transform(df_ml['product'])
    
    le_region = LabelEncoder()
    df_ml['region_encoded'] = le_region.fit_transform(df_ml['region'])
    
    # 提取时间特征
    df_ml['day_of_year'] = df_ml['date'].dt.dayofyear
    df_ml['week_of_year'] = df_ml['date'].dt.isocalendar().week
    
    # 聚合特征(按产品-地区-月份)
    agg_features = df_ml.groupby(['product', 'region', 'month'])['revenue'].agg(['mean', 'std', 'count']).reset_index()
    agg_features.columns = ['product', 'region', 'month', 'rev_mean', 'rev_std', 'rev_count']
    
    df_ml = df_ml.merge(agg_features, on=['product', 'region', 'month'], how='left')
    
    # 选择特征
    feature_cols = [
        'quantity', 'unit_price', 'discount', 'month', 'quarter', 'day_of_week',
        'is_promotion', 'product_encoded', 'region_encoded', 'day_of_year',
        'week_of_year', 'rev_mean', 'rev_std', 'rev_count'
    ]
    
    X = df_ml[feature_cols]
    y = df_ml['revenue']
    
    return X, y, df_ml

X, y, df_ml = prepare_features(sales_df)

# 8. 模型训练与评估
def train_and_evaluate_model(X, y):
    """训练和评估模型"""
    # 划分数据集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    # 训练随机森林模型
    model = RandomForestRegressor(
        n_estimators=100,
        max_depth=10,
        min_samples_split=5,
        random_state=42,
        n_jobs=-1
    )
    
    model.fit(X_train, y_train)
    
    # 预测
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)
    
    # 评估指标
    def calculate_metrics(y_true, y_pred, dataset_name):
        mae = mean_absolute_error(y_true, y_pred)
        mse = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_true, y_pred)
        
        print(f"\n{dataset_name}评估结果:")
        print(f"MAE: {mae:.2f}")
        print(f"RMSE: {rmse:.2f}")
        print(f"R²: {r2:.4f}")
        
        return mae, rmse, r2
    
    train_metrics = calculate_metrics(y_train, y_train_pred, "训练集")
    test_metrics = calculate_metrics(y_test, y_test_pred, "测试集")
    
    # 特征重要性
    feature_importance = pd.DataFrame({
        'feature': X.columns,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print("\n特征重要性:")
    print(feature_importance.head(10))
    
    return model, feature_importance

model, feature_importance = train_and_evaluate_model(X, y)

# 9. 模型解释与业务洞察
def generate_insights(df, model, feature_importance):
    """生成业务洞察"""
    print("\n" + "="*50)
    print("业务洞察与建议")
    print("="*50)
    
    # 1. 最佳销售产品
    best_product = df.groupby('product')['revenue'].sum().idxmax()
    best_product_revenue = df.groupby('product')['revenue'].sum().max()
    print(f"\n1. 最佳销售产品: {best_product} (销售额: {best_product_revenue:,.2f})")
    
    # 2. 最佳销售地区
    best_region = df.groupby('region')['revenue'].sum().idxmax()
    best_region_revenue = df.groupby('region')['revenue'].sum().max()
    print(f"2. 最佳销售地区: {best_region} (销售额: {best_region_revenue:,.2f})")
    
    # 3. 促销效果分析
    promotion_effect = df.groupby('is_promotion')['revenue'].mean()
    print(f"3. 促销效果: 促销期间平均销售额={promotion_effect[True]:.2f}, 非促销={promotion_effect[False]:.2f}")
    
    # 4. 季节性洞察
    monthly_trend = df.groupby('month')['revenue'].sum()
    best_month = monthly_trend.idxmax()
    print(f"4. 销售旺季: {best_month}月")
    
    # 5. 模型预测的业务应用
    print(f"\n5. 模型预测能力: R²={feature_importance['importance'].sum():.4f}")
    print("   可用于预测未来销售额,优化库存管理")

generate_insights(sales_df, model, feature_importance)

第六部分:性能优化与最佳实践

6.1 代码性能分析

import cProfile
import pstats
import time
import io

def slow_function():
    """模拟一个性能较差的函数"""
    result = []
    for i in range(100000):
        if i % 2 == 0:
            result.append(i**2)
    return result

def fast_function():
    """模拟一个性能较好的函数"""
    return [i**2 for i in range(100000) if i % 2 == 0]

# 性能分析
def profile_function(func, name):
    """分析函数性能"""
    profiler = cProfile.Profile()
    profiler.enable()
    
    start = time.time()
    result = func()
    end = time.time()
    
    profiler.disable()
    
    # 输出结果
    s = io.StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
    ps.print_stats(10)
    
    print(f"\n{name}执行时间: {end-start:.4f}秒")
    print("性能分析(前10个函数):")
    print(s.getvalue())

profile_function(slow_function, "慢函数")
profile_function(fast_function, "快函数")

6.2 并行处理

from multiprocessing import Pool, cpu_count
import numpy as np
import time

def process_chunk(chunk):
    """处理数据块"""
    return np.mean(chunk) * np.std(chunk)

def parallel_processing():
    """并行处理演示"""
    # 创建大数据
    data = np.random.random((1000000, 10))
    
    # 分块
    n_cores = cpu_count()
    chunks = np.array_split(data, n_cores)
    
    # 串行处理
    start = time.time()
    serial_results = [process_chunk(chunk) for chunk in chunks]
    serial_time = time.time() - start
    
    # 并行处理
    start = time.time()
    with Pool(n_cores) as pool:
        parallel_results = pool.map(process_chunk, chunks)
    parallel_time = time.time() - start
    
    print(f"串行处理时间: {serial_time:.4f}秒")
    print(f"并行处理时间: {parallel_time:.4f}秒")
    print(f"加速比: {serial_time/parallel_time:.2f}x")
    print(f"结果一致性: {np.allclose(serial_results, parallel_results)}")

parallel_processing()

第七部分:总结与进阶学习路径

7.1 核心技能回顾

通过本课程的学习,我们系统掌握了以下进阶技能:

  1. 数据处理:高级缺失值处理、异常值检测、高性能操作
  2. 统计分析:多变量分析、回归分析、假设检验校正
  3. 机器学习:特征工程、模型评估、交叉验证
  4. 大数据处理:Dask并行计算、内存优化
  5. 实战项目:完整的销售分析系统,从数据到洞察

7.2 进阶学习路径建议

短期目标(1-3个月)

  • 深入学习Scikit-learn的高级功能
  • 掌握Pandas的Prophet时间序列预测
  • 学习SQL与Python结合的数据分析

中期目标(3-6个月)

  • 学习深度学习基础(TensorFlow/PyTorch)
  • 掌握大数据框架(Spark)
  • 学习数据可视化高级库(Plotly, Bokeh)

长期目标(6个月以上)

  • 机器学习工程化(MLflow, Kubeflow)
  • 云平台数据分析(AWS, Azure, GCP)
  • 领域专业知识(金融、医疗、电商等)

7.3 常用工具与资源推荐

核心库

  • Pandas, NumPy(数据处理)
  • Scikit-learn(机器学习)
  • Statsmodels(统计分析)
  • Dask(大数据)
  • Matplotlib, Seaborn, Plotly(可视化)

学习资源

  • 官方文档(永远是第一选择)
  • Kaggle竞赛(实战练习)
  • GitHub开源项目(学习最佳实践)
  • Medium技术博客(了解前沿技术)

7.4 最后的建议

  1. 实践为王:理论学习必须配合大量实践
  2. 代码审查:定期回顾和优化自己的代码
  3. 社区参与:加入数据分析社区,分享和学习
  4. 业务理解:技术服务于业务,理解业务才能创造价值
  5. 持续学习:技术更新迭代快,保持学习热情

记住,成为数据分析专家是一个持续的过程。希望本指南能为你的进阶之路提供清晰的指引和实用的工具。祝你在数据分析的道路上越走越远!