引言:为什么选择Python进行数据分析?

Python已经成为数据科学领域的王者语言。根据2023年Kaggle调查,超过90%的数据科学家使用Python作为主要编程语言。本课程将带你从入门走向精通,涵盖从基础数据处理到高级机器学习建模的全过程,并为你规划清晰的职业发展路径。

学习目标

  • 掌握Python数据分析核心库(Pandas、NumPy、Matplotlib/Seaborn)
  • 理解数据清洗、探索性分析(EDA)和特征工程
  • 学习使用Scikit-learn构建机器学习模型
  • 掌握时间序列分析和大数据处理技术
  • 了解数据可视化和仪表板开发
  • 规划数据分析师/科学家的职业路径

第一章:Python数据分析基础(入门阶段)

1.1 环境搭建与工具链

推荐工具组合:

  • Anaconda:一站式数据科学环境(包含Jupyter、Pandas等)
  • Jupyter Notebook/Lab:交互式编程环境
  • VS Code:轻量级编辑器(适合生产环境)
  • Git/GitHub:版本控制和项目展示
# 安装Anaconda(推荐)
wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh
bash Anaconda3-2023.09-0-Linux-x86_64.sh

# 创建专用环境
conda create -n data_analysis python=3.10
conda activate data_analysis

# 安装核心库
conda install pandas numpy matplotlib seaborn scikit-learn
pip install jupyterlab

1.2 NumPy:科学计算基础

NumPy是Python科学生态系统的基石,提供高性能的多维数组对象。

import numpy as np

# 创建数组
arr = np.array([[1, 2, 3], [4, 5, 6]])
print(f"数组形状: {arr.shape}")  # (2, 3)

# 基本运算(向量化)
arr2 = arr * 2  # 所有元素乘以2
arr3 = arr + np.array([10, 20, 30])  # 广播机制

# 索引与切片
print(arr[0, 1])  # 第一行第二列:2
print(arr[:, 1])  # 所有行第二列:[2 5]

# 常用函数
mean = np.mean(arr)  # 平均值
std = np.std(arr)    # 标准差
normalized = (arr - mean) / std  # 标准化

# 随机数生成
np.random.seed(42)  # 固定随机种子
random_data = np.random.normal(0, 1, (1000, 5))  # 1000行5列正态分布

性能对比示例:

import time

# Python原生列表
python_list = list(range(1000000))
start = time.time()
result = [x * 2 for x in python_list]
print(f"Python列表耗时: {time.time() - start:.4f}秒")

# NumPy数组
numpy_array = np.arange(1000000)
start = time.time()
result = numpy_array * 2
print(f"NumPy数组耗时: {time.time() - start:.4f}秒")
# NumPy通常比原生Python快10-100倍

1.3 Pandas:数据处理核心库

Pandas提供了DataFrame这一强大的表格数据结构,是数据分析的瑞士军刀。

import pandas as pd

# 创建DataFrame
data = {
    '姓名': ['张三', '李四', '王五', '赵六'],
    '年龄': [25, 30, 35, 28],
    '城市': ['北京', '上海', '广州', '深圳'],
    '薪资': [15000, 20000, 25000, 18000]
}
df = pd.DataFrame(data)

# 基础探索
print(df.head())  # 前5行
print(df.info())  # 数据类型和缺失值
print(df.describe())  # 统计摘要

# 数据筛选
# 筛选年龄大于28的记录
senior = df[df['年龄'] > 28]
# 多条件筛选:年龄>28且薪资>20000
condition = (df['年龄'] > 28) & (df['薪资'] > 20000)
senior_high = df[condition]

# 数据分组聚合
# 按城市计算平均薪资
city_salary = df.groupby('城市')['薪资'].agg(['mean', 'count', 'std'])
print(city_salary)

# 处理缺失值
df_with_nan = df.copy()
df_with_nan.loc[1, '薪资'] = np.nan
# 填充缺失值
df_filled = df_with_nan.fillna({'薪资': df_with_nan['薪资'].median()})
# 删除缺失值
df_dropped = df_with_nan.dropna()

# 数据合并
df1 = df.iloc[:2]
df2 = df.iloc[2:]
merged = pd.concat([df1, df2], ignore_index=True)

# 时间序列处理
date_range = pd.date_range('2023-01-01', periods=5, freq='D')
df_time = pd.DataFrame({
    'date': date_range,
    'value': np.random.randn(5)
})
df_time['year'] = df_time['date'].dt.year
df_time['day_of_week'] = df_time['date'].dt.dayofweek

Pandas性能优化技巧:

# 1. 使用向量化操作替代循环
# 慢:for循环
# 快:df['new'] = df['col1'] * df['col2']

# 2. 使用Categorical类型处理重复字符串
df['城市'] = df['城市'].astype('category')  # 节省内存,加速分组

# 3. 使用query()方法进行高效筛选
# 慢:df[(df['年龄'] > 25) & (df['薪资'] < 20000)]
# 快:df.query('年龄 > 25 and 薪资 < 20000')

# 4. 使用eval()进行高效计算
df['总成本'] = df.eval('薪资 * 1.5 + 5000')

1.4 数据可视化基础

import matplotlib.pyplot as plt
import seaborn as sns

# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 1. 折线图
plt.figure(figsize=(10, 6))
plt.plot(df['年龄'], df['薪资'], marker='o', linestyle='--', color='blue')
plt.title('年龄与薪资关系', fontsize=14)
plt.xlabel('年龄')
plt.ylabel('薪资')
plt.grid(True, alpha=0.3)
plt.show()

# 2. 柱状图
plt.figure(figsize=(10, 6))
plt.bar(df['城市'], df['薪资'], color=['red', 'green', 'blue', 'orange'])
plt.title('各城市薪资对比')
plt.xlabel('城市')
plt.ylabel('薪资')
plt.show()

# 3. 散点图(Seaborn)
sns.set_style("whitegrid")
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='年龄', y='薪资', hue='城市', size='年龄', sizes=(100, 300))
plt.title('年龄与薪资关系(按城市着色)')
plt.show()

# 4. 箱线图
plt.figure(figsize=(10, 6))
sns.boxplot(data=df, x='城市', y='薪资')
plt.title('各城市薪资分布')
plt.show()

# 5. 相关性热力图
corr_matrix = df[['年龄', '薪资']].corr()
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('相关性热力图')
plt.show()

1.5 实战案例:电商销售数据分析

# 生成模拟数据
np.random.seed(42)
n = 1000
data = {
    'order_id': range(1000, 1000+n),
    'customer_id': np.random.randint(1, 200, n),
    'product': np.random.choice(['手机', '电脑', '耳机', '平板'], n),
    'category': np.random.choice(['电子', '配件', '家电'], n),
    'quantity': np.random.randint(1, 5, n),
    'price': np.random.uniform(100, 1000, n),
    'order_date': pd.date_range('2023-01-01', periods=n, freq='H'),
    'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], n)
}
df_ecom = pd.DataFrame(data)
df_ecom['revenue'] = df_ecom['quantity'] * df_ecom['price']

# 1. 销售总额
total_revenue = df_ecom['revenue'].sum()
print(f"总销售额: {total_revenue:,.2f}")

# 2. 按产品统计
product_stats = df_ecom.groupby('product').agg({
    'revenue': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).round(2)
print(product_stats)

# 3. 按月份统计
df_ecom['month'] = df_ecom['order_date'].dt.month
monthly_sales = df_ecom.groupby('month')['revenue'].sum()
print(monthly_sales)

# 4. 热销产品TOP5
top_products = df_ecom.groupby('product')['revenue'].sum().nlargest(5)
print("热销产品TOP5:\n", top_products)

# 5. 可视化:月度销售趋势
plt.figure(figsize=(12, 6))
monthly_sales.plot(kind='bar', color='skyblue')
plt.title('2023年月度销售趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.xticks(rotation=0)
plt.show()

第二章:数据清洗与探索性分析(EDA)

2.1 数据质量评估

# 模拟脏数据
df_dirty = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva', None, 'Grace'],
    'age': [25, 30, np.nan, 35, 28, 40, 29],
    'salary': [50000, 60000, 70000, 80000, 55000, 65000, np.nan],
    'email': ['alice@company.com', 'bob@company.com', 'invalid-email', 'david@company.com', 'eva@company.com', 'grace@company.com', None],
    'join_date': ['2020-01-15', '2019-03-20', '2021-06-10', '2018-11-05', '2020-09-12', '2019-07-08', '2021-02-28']
})

# 1. 缺失值分析
def analyze_missing(df):
    missing = df.isnull().sum()
    missing_percent = (missing / len(df)) * 100
    missing_df = pd.DataFrame({'缺失数量': missing, '缺失率(%)': missing_percent.round(2)})
    return missing_df.sort_values('缺失数量', ascending=False)

print("缺失值分析:")
print(analyze_missing(df_dirty))

# 2. 数据类型检查
print("\n数据类型:")
print(df_dirty.dtypes)

# 3. 重复值检查
df_duplicate = pd.DataFrame({
    'A': [1, 2, 2, 3],
    'B': ['x', 'y', 'y', 'z']
})
print(f"\n重复行数: {df_duplicate.duplicated().sum()}")
print("重复行:")
print(df_duplicate[df_duplicate.duplicated()])

# 4. 异常值检测(IQR方法)
def detect_outliers_iqr(series):
    Q1 = series.quantile(0.25)
    Q3 = series.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return (series < lower_bound) | (series > upper_bound)

# 检测年龄异常值
age_outliers = detect_outliers_iqr(df_dirty['age'])
print(f"\n年龄异常值:\n{df_dirty[age_outliers]}")

2.2 数据清洗实战

# 1. 处理缺失值
# 删除缺失值
df_cleaned = df_dirty.dropna(subset=['name', 'email'])  # 删除关键字段缺失的行

# 填充缺失值
df_filled = df_dirty.copy()
df_filled['age'] = df_filled['age'].fillna(df_filled['age'].median())
df_filled['salary'] = df_filled['dollar'].fillna(df_filled['salary'].median())

# 2. 处理异常值
# 年龄限制在18-65岁
df_filled['age'] = df_filled['age'].clip(18, 65)

# 3. 数据格式标准化
# 邮箱格式验证
import re
def is_valid_email(email):
    if pd.isna(email):
        return False
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return re.match(pattern, email) is not None

df_filled['email_valid'] = df_filled['email'].apply(is_valid_email)
print("邮箱验证结果:")
print(df_filled[['email', 'email_valid']])

# 4. 日期格式标准化
df_filled['join_date'] = pd.to_datetime(df_filled['join_date'], errors='coerce')
df_filled['tenure_days'] = (pd.Timestamp.now() - df_filled['join_date']).dt.days

# 5. 文本清洗
df_filled['name'] = df_filled['name'].str.strip().str.title()
df_filled['name_clean'] = df_filled['name'].str.replace(r'[^a-zA-Z]', '', regex=True)

2.3 探索性数据分析(EDA)

# 完整的EDA函数
def perform_eda(df, target_col=None):
    """
    执行完整的探索性数据分析
    """
    print("="*50)
    print("数据概览")
    print("="*50)
    print(f"数据形状: {df.shape}")
    print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    print("\n" + "="*50)
    print("数据类型分布")
    print("="*50)
    print(df.dtypes.value_counts())
    
    print("\n" + "="*50)
    print("数值型变量统计")
    print("="*50)
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        print(df[numeric_cols].describe().round(2))
    
    print("\n" + "="*50)
    print("类别型变量统计")
    print("="*50)
    categorical_cols = df.select_dtypes(include=['object', 'category']).columns
    for col in categorical_cols:
        print(f"\n{col}:")
        print(df[col].value_counts().head())
    
    print("\n" + "="*50)
    print("缺失值统计")
    print("="*50)
    missing = df.isnull().sum()
    missing = missing[missing > 0]
    if len(missing) > 0:
        print(missing.sort_values(ascending=False))
    else:
        print("无缺失值")
    
    # 如果有目标变量,分析其分布
    if target_col and target_col in df.columns:
        print("\n" + "="*50)
        print(f"目标变量 {target_col} 分布")
        print("="*50)
        if df[target_col].dtype in [np.number]:
            print(df[target_col].describe())
            plt.figure(figsize=(10, 6))
            sns.histplot(df[target_col], kde=True)
            plt.title(f'{target_col} 分布直方图')
            plt.show()
        else:
            print(df[target_col].value_counts())
            plt.figure(figsize=(10, 6))
            df[target_col].value_counts().plot(kind='bar')
            plt.title(f'{target_col} 分布')
            plt.show()

# 执行EDA
perform_eda(df_ecom, target_col='revenue')

2.4 特征工程基础

# 1. 创建新特征
df_ecom['price_per_item'] = df_ecom['revenue'] / df_ecom['quantity']
df_ecom['is_high_price'] = (df_ecom['price'] > df_ec0m['price'].median()).astype(int)
df_ecom['order_hour'] = df_ecom['order_date'].dt.hour
df_ecom['is_weekend'] = df_ecom['order_date'].dt.dayofweek.isin([5, 6]).astype(int)

# 2. 分箱(Binning)
# 将年龄分箱
df_ecom['age_bin'] = pd.cut(df_ecom['quantity'], bins=[0, 2, 3, 5], labels=['低', '中', '高'])

# 3. 编码类别变量
# 独热编码
df_encoded = pd.get_dummies(df_ecom, columns=['product', 'city'], prefix=['prod', 'city'])

# 4. 文本特征提取
df_text = pd.DataFrame({
    'comment': ['这个产品很好', '质量不错', '物流太慢', '非常满意', '一般般']
})
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(df_text['comment'])
print("文本特征:")
print(pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out()))

第三章:高级数据处理与性能优化

3.1 大数据处理技巧

# 1. 分块读取大文件
def process_large_file(file_path, chunksize=10000):
    """
    分块读取CSV文件,处理大数据集
    """
    chunks = []
    for chunk in pd.read_csv(file_path, chunksize=chunksize):
        # 对每个chunk进行处理
        chunk['new_col'] = chunk['col1'] * chunk['col2']
        chunks.append(chunk)
    return pd.concat(chunks, ignore_index=True)

# 2. 使用Dask处理超大数据(替代Pandas)
try:
    import dask.dataframe as dd
    # Dask延迟计算,适合内存不足的情况
    ddf = dd.read_csv('large_file.csv')
    result = ddf.groupby('category').revenue.sum().compute()
except ImportError:
    print("Dask未安装,使用Pandas替代")

# 3. 内存优化技巧
def optimize_memory(df):
    """
    优化DataFrame内存使用
    """
    start_mem = df.memory_usage(deep=True).sum() / 1024**2
    
    # 优化数值类型
    for col in df.select_dtypes(include=['int']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    
    for col in df.select_dtypes(include=['float']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    # 优化对象类型
    for col in df.select_dtypes(include=['object']).columns:
        num_unique = df[col].nunique()
        num_total = len(df)
        if num_unique / num_total < 0.5:
            df[col] = df[col].astype('category')
    
    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"内存优化: {start_mem:.2f} MB → {end_mem:.2f} MB ({(1-end_mem/start_mem)*100:.1f}% 减少)")
    return df

# 4. 并行处理
from joblib import Parallel, delayed
import multiprocessing

def process_chunk(chunk):
    return chunk.groupby('category').revenue.sum()

def parallel_groupby(df, n_jobs=-1):
    """
    并行处理大数据分组
    """
    if n_jobs == -1:
        n_jobs = multiprocessing.cpu_count()
    
    # 将数据分块
    chunks = np.array_split(df, n_jobs)
    results = Parallel(n_jobs=n_jobs)(delayed(process_chunk)(chunk) for chunk in chunks)
    
    # 合并结果
    return pd.concat(results).groupby(level=0).sum()

3.2 时间序列分析

# 1. 创建时间序列数据
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
ts_data = pd.DataFrame({
    'date': dates,
    'sales': np.random.normal(1000, 200, len(dates)).cumsum() + np.random.normal(0, 50, len(dates))
})
ts_data.set_index('date', inplace=True)

# 2. 时间序列重采样
# 按月汇总
monthly = ts_data.resample('M').sum()
# 按周平均
weekly = ts_data.resample('W').mean()

# 3. 滚动统计
ts_data['rolling_7d'] = ts_data['sales'].rolling(window=7).mean()
ts_data['rolling_30d'] = ts_data['sales'].rolling(window=30).mean()
ts_data['expanding_mean'] = ts_data['sales'].expanding().mean()

# 4. 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
# 需要安装statsmodels: pip install statsmodels

# 模拟月度数据
monthly_data = ts_data.resample('M').sum()
try:
    decomposition = seasonal_decompose(monthly_data['sales'], model='additive', period=3)
    fig = decomposition.plot()
    fig.set_size_inches(12, 8)
    plt.show()
except:
    print("statsmodels未安装,跳过季节性分解")

# 5. 时间序列可视化
plt.figure(figsize=(14, 7))
plt.plot(ts_data.index, ts_data['sales'], label='原始数据', alpha=0.7)
plt.plot(ts_data.index, ts_data['rolling_7d'], label='7日移动平均', linewidth=2)
plt.plot(ts_data.index, ts_data['rolling_30d'], label='30日移动平均', linewidth=2)
plt.title('销售时间序列分析')
plt.legend()
plt.show()

# 6. 滞后特征(用于机器学习)
ts_data['lag_1'] = ts_data['sales'].shift(1)  # 前1天
ts_data['lag_7'] = ts_data['sales'].shift(7)  # 前7天(上周同一天)
ts_data['diff_1'] = ts_data['sales'].diff(1)  # 日环比变化
ts_data['pct_change_7'] = ts_data['sales'].pct_change(7)  # 周同比变化

3.3 高级数据合并与重塑

# 1. 多表合并
df1 = pd.DataFrame({'key': ['A', 'B', 'C'], 'value1': [1, 2, 3]})
df2 = pd.DataFrame({'key': ['B', 'C', 'D'], 'value2': [4, 5, 6]})

# 合并类型
inner_join = pd.merge(df1, df2, on='key', how='inner')  # 内连接
left_join = pd.merge(df1, df2, on='key', how='left')    # 左连接
right_join = pd.DataFrame({'key': ['B', 'C', 'D'], 'value2': [4, 5, 6]})
right_join = pd.merge(df1, df2, on='key', how='right')   # 右连接
outer_join = pd.merge(df1, df2, on='key', how='outer')   # 外连接

# 2. 数据透视表
pivot_data = pd.DataFrame({
    'date': ['2023-01', '2023-01', '2023-02', '2023-02'],
    'city': ['北京', '上海', '北京', '上海'],
    'sales': [100, 150, 120, 180],
    'profit': [20, 30, 25, 35]
})
pivot_table = pd.pivot_table(pivot_data, 
                             values=['sales', 'profit'], 
                             index='date', 
                             columns='city', 
                             aggfunc='sum',
                             margins=True)
print("透视表:")
print(pivot_table)

# 3. Melt和Stack
# Melt: 宽表转长表
wide_df = pd.DataFrame({
    'product': ['A', 'B'],
    'Q1': [100, 120],
    'Q2': [110, 130],
    'Q3': [120, 140],
    'Q4': [130, 150]
})
long_df = pd.melt(wide_df, id_vars=['product'], var_name='quarter', value_name='sales')
print("\nMelt结果:")
print(long_df)

# Stack: 列转行
stacked = pivot_table.stack()
print("\nStack结果:")
print(stacked.head())

第四章:机器学习入门与Scikit-learn实战

4.1 Scikit-learn基础

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import mean_squared_error, accuracy_score, classification_report
from sklearn.datasets import load_iris, load_boston
from sklearn.pipeline import Pipeline

# 1. 回归问题示例:预测房价
# 加载数据
boston = load_boston()
X = pd.DataFrame(boston.data, columns=boston.feature_names)
y = boston.target

# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 创建Pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),  # 标准化
    ('regressor', LinearRegression())  # 线性回归
])

# 训练模型
pipeline.fit(X_train, y_train)

# 预测与评估
y_pred = pipeline.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = pipeline.score(X_test, y_test)
print(f"均方误差: {mse:.2f}")
print(f"R²分数: {r2:.2f}")

# 2. 分类问题示例:鸢尾花分类
iris = load_iris()
X_iris = iris.data
y_iris = iris.target

X_train, X_test, y_train, y_test = train_test_split(X_iris, y_iris, test_size=0.3, random_state=42)

# 逻辑回归
clf = LogisticRegression(max_iter=200)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"\n分类准确率: {accuracy:.2f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))

4.2 模型评估与交叉验证

from sklearn.model_selection import cross_val_score, KFold, StratifiedKFold
from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_curve
import seaborn as sns

# 1. 交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(pipeline, X, y, cv=kfold, scoring='r2')
print(f"交叉验证R²分数: {cv_scores}")
print(f"平均R²: {cv_scores.mean():.2f} (+/- {cv_scores.std() * 2:.2f})")

# 2. 混淆矩阵(分类问题)
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=iris.target_names, 
            yticklabels=iris.target_names)
plt.title('混淆矩阵')
plt.ylabel('真实标签')
plt.xlabel('预测标签')
plt.show()

# 3. ROC曲线(二分类)
# 模拟二分类数据
from sklearn.datasets import make_classification
X_binary, y_binary = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_binary, y_binary, test_size=0.3, random_state=42)

clf_binary = LogisticRegression()
clf_binary.fit(X_train, y_train)
y_proba = clf_binary.predict_proba(X_test)[:, 1]

fpr, tpr, thresholds = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

4.3 特征选择与降维

from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor

# 1. 特征选择
# 使用随机森林评估特征重要性
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)

feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)

print("特征重要性(随机森林):")
print(feature_importance)

# 可视化
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance.head(10), x='importance', y='feature')
plt.title('Top 10 Feature Importance')
plt.show()

# 2. 选择Top K特征
selector = SelectKBest(score_func=f_regression, k=5)
X_selected = selector.fit_transform(X_train, y_train)
selected_features = X.columns[selector.get_support()]
print(f"\n选择的特征: {list(selected_features)}")

# 3. PCA降维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(StandardScaler().fit_transform(X))

plt.figure(figsize=(10, 6))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='viridis', alpha=0.6)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
plt.title('PCA降维可视化')
plt.colorbar(label='Target')
plt.show()

4.4 超参数调优

from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint

# 1. 网格搜索
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}

rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train, y_train)

print("最佳参数:", grid_search.best_params_)
print("最佳分数:", grid_search.best_score_)

# 2. 随机搜索(更高效)
param_dist = {
    'n_estimators': randint(50, 200),
    'max_depth': randint(5, 50),
    'min_samples_split': randint(2, 20)
}

random_search = RandomizedSearchCV(rf, param_dist, n_iter=20, cv=3, random_state=42, n_jobs=-1)
random_search.fit(X_train, y_train)
print("\n随机搜索最佳参数:", random_search.best_params_)

第五章:高级机器学习与集成学习

5.1 集成学习方法

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import VotingClassifier, StackingClassifier

# 1. Bagging:随机森林
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
print(f"随机森林准确率: {rf.score(X_test, y_test):.3f}")

# 2. Boosting:梯度提升
gb = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42)
gb.fit(X_train, y_train)
print(f"梯度提升准确率: {gb.score(X_test, y_test):.3f}")

# 3. 投票集成(Voting)
clf1 = LogisticRegression(max_iter=200)
clf2 = RandomForestClassifier(n_estimators=100, random_state=42)
clf3 = SVC(probability=True, random_state=42)

voting_clf = VotingClassifier(
    estimators=[('lr', clf1), ('rf', clf2), ('svc', clf3)],
    voting='soft'  # 使用概率投票
)
voting_clf.fit(X_train, y_train)
print(f"投票集成准确率: {voting_clf.score(X_test, y_test):.3f}")

# 4. 堆叠集成(Stacking)
estimators = [
    ('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
    ('svc', SVC(probability=True, random_state=42))
]

stacking_clf = StackingClassifier(
    estimators=estimators,
    final_estimator=LogisticRegression()
)
stacking_clf.fit(X_train, y_train)
print(f"堆叠集成准确率: {stacking_clf.score(X_test, y_test):.3f}")

5.2 XGBoost与LightGBM(工业级工具)

# 需要安装:pip install xgboost lightgbm
try:
    from xgboost import XGBClassifier, XGBRegressor
    from lightgbm import LGBMClassifier, LGBMRegressor
    
    # XGBoost分类
    xgb_clf = XGBClassifier(
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        eval_metric='logloss'
    )
    xgb_clf.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
    print(f"XGBoost准确率: {xgb_clf.score(X_test, y_test):.3f}")
    
    # LightGBM分类
    lgb_clf = LGBMClassifier(
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        random_state=42
    )
    lgb_clf.fit(X_train, y_train)
    print(f"LightGBM准确率: {lgb_clf.score(X_test, y_test):.3f}")
    
    # 特征重要性对比
    importance_df = pd.DataFrame({
        'feature': X.columns,
        'xgb_importance': xgb_clf.feature_importances_,
        'lgb_importance': lgb_clf.feature_importances_
    }).sort_values('xgb_importance', ascending=False)
    
    print("\nXGBoost vs LightGBM 特征重要性:")
    print(importance_df.head())
    
except ImportError:
    print("XGBoost/LightGBM未安装,跳过该部分")

5.3 聚类分析

from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score

# 1. K-Means聚类
# 生成数据
X_cluster, _ = make_classification(n_samples=500, n_features=2, n_informative=2, 
                                   n_redundant=0, n_clusters_per_class=1, random_state=42)

# 确定最佳K值(肘部法则)
inertias = []
K_range = range(2, 11)
for k in K_range:
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(X_cluster)
    inertias.append(kmeans.inertia_)

plt.figure(figsize=(10, 6))
plt.plot(K_range, inertias, marker='o')
plt.title('肘部法则确定最佳K值')
plt.xlabel('K值')
plt.ylabel('Inertia')
plt.show()

# 使用K=3
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
clusters = kmeans.fit_predict(X_cluster)

# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_cluster[:, 0], X_cluster[:, 1], c=clusters, cmap='viridis', alpha=0.6)
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1], 
            s=300, c='red', marker='X', label='Centroids')
plt.title('K-Means聚类结果')
plt.legend()
plt.show()

# 2. DBSCAN(密度聚类)
dbscan = DBSCAN(eps=0.5, min_samples=5)
clusters_dbscan = dbscan.fit_predict(X_cluster)
print(f"DBSCAN发现{len(set(clusters_dbscan)) - (1 if -1 in clusters_dbscan else 0)}个簇")

# 3. 层次聚类
agg = AgglomerativeClustering(n_clusters=3)
clusters_agg = agg.fit_predict(X_cluster)

# 4. 聚类评估
silhouette_kmeans = silhouette_score(X_cluster, clusters)
print(f"K-Means轮廓系数: {silhouette_kmeans:.3f}")

5.4 异常检测

from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope

# 生成包含异常值的数据
np.random.seed(42)
X_normal = np.random.normal(0, 1, (1000, 2))
X_outliers = np.random.uniform(-4, 4, (50, 2))
X = np.vstack([X_normal, X_outliers])
y_true = np.array([0] * 1000 + [1] * 50)  # 0=正常, 1=异常

# 1. 孤立森林
iso_forest = IsolationForest(contamination=0.05, random_state=42)
y_pred_iso = iso_forest.fit_predict(X)
y_pred_iso = [1 if x == -1 else 0 for x in y_pred_iso]  # 转换为0/1

# 2. One-Class SVM
oc_svm = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
y_pred_svm = oc_svm.fit_predict(X)
y_pred_svm = [1 if x == -1 else 0 for x in y_pred_svm]

# 3. 评估
from sklearn.metrics import classification_report, confusion_matrix

print("孤立森林结果:")
print(classification_report(y_true, y_pred_iso, target_names=['正常', '异常']))

print("\nOne-Class SVM结果:")
print(classification_report(y_true, y_pred_svm, target_names=['正常', '异常']))

# 可视化
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_pred_iso, cmap='coolwarm', alpha=0.6)
plt.title('孤立森林异常检测')
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred_svm, cmap='coolwarm', alpha=0.6)
plt.title('One-Class SVM异常检测')
plt.show()

第六章:数据可视化与仪表板开发

6.1 高级可视化库

import plotly.express as px
import plotly.graph_objects as go
import plotly.offline as pyo

# 1. Plotly交互式图表
# 散点图
fig = px.scatter(df_ecom, x='age', y='revenue', color='city', 
                 size='quantity', hover_data=['product', 'price'],
                 title='销售散点图(交互式)')
fig.show()

# 2. Plotly高级图表
# 箱线图
fig = px.box(df_ecom, x='product', y='revenue', color='city',
             title='产品-城市销售分布')
fig.show()

# 3. 时间序列图
ts_sample = ts_data.head(100)
fig = go.Figure()
fig.add_trace(go.Scatter(x=ts_sample.index, y=ts_sample['sales'],
                         mode='lines+markers', name='销售额'))
fig.add_trace(go.Scatter(x=ts_sample.index, y=ts_sample['rolling_7d'],
                         mode='lines', name='7日移动平均',
                         line=dict(color='red', width=2)))
fig.update_layout(title='销售时间序列(Plotly)', xaxis_title='日期', yaxis_title='销售额')
fig.show()

# 4. 热力图
pivot_sample = df_ecom.pivot_table(values='revenue', index='age', columns='city', aggfunc='mean')
fig = px.imshow(pivot_sample, title='年龄-城市平均销售额热力图')
fig.show()

6.2 Seaborn高级可视化

# 1. 分布图组合
plt.figure(figsize=(12, 8))
sns.set_style("whitegrid")

# 子图1:直方图+KDE
plt.subplot(2, 2, 1)
sns.histplot(df_ecom['revenue'], kde=True, bins=30)
plt.title('收入分布')

# 子图2:箱线图
plt.subplot(2, 2, 2)
sns.boxplot(data=df_ecom, x='product', y='revenue')
plt.title('产品收入箱线图')
plt.xticks(rotation=45)

# 子图3:小提琴图
plt.subplot(2, 2, 3)
sns.violinplot(data=df_ecom, x='city', y='revenue', inner='quartile')
plt.title('城市收入小提琴图')

# 子图4:蜂群图
plt.subplot(2, 2, 4)
sns.swarmplot(data=df_ecom.head(100), x='product', y='revenue', hue='city')
plt.title('蜂群图(前100条)')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# 2. 关系图矩阵
sns.pairplot(df_ecom[['age', 'revenue', 'quantity', 'price']], diag_kind='kde', corner=True)
plt.suptitle('关系图矩阵', y=1.02)
plt.show()

# 3. 分面网格
g = sns.FacetGrid(df_ecom, col='city', hue='product', col_wrap=3, height=4)
g.map(sns.scatterplot, 'age', 'revenue', alpha=0.6)
g.add_legend()
plt.suptitle('分面散点图', y=1.02)
plt.show()

6.3 仪表板开发(Streamlit)

# 需要安装:pip install streamlit
# 运行:streamlit run your_script.py

"""
# 数据分析仪表板示例

import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 页面配置
st.set_page_config(page_title="销售分析仪表板", layout="wide")

# 侧边栏
st.sidebar.header("数据配置")
sample_size = st.sidebar.slider("样本大小", 100, 1000, 500)
show_raw_data = st.sidebar.checkbox("显示原始数据")

# 生成数据
@st.cache_data  # 缓存数据
def generate_data(n):
    np.random.seed(42)
    data = {
        'city': np.random.choice(['北京', '上海', '广州', '深圳'], n),
        'product': np.random.choice(['手机', '电脑', '耳机'], n),
        'sales': np.random.normal(1000, 200, n),
        'profit': np.random.normal(200, 50, n)
    }
    return pd.DataFrame(data)

df = generate_data(sample_size)

# 主标题
st.title("📊 销售分析仪表板")

# 关键指标
col1, col2, col3 = st.columns(3)
with col1:
    st.metric("总销售额", f"¥{df['sales'].sum():,.0f}")
with col2:
    st.metric("平均利润", f"¥{df['profit'].mean():.0f}")
with col3:
    st.metric("订单数", len(df))

# 图表区域
st.subheader("可视化分析")

tab1, tab2, tab3 = st.tabs(["销售分布", "城市对比", "产品分析"])

with tab1:
    fig, ax = plt.subplots(figsize=(10, 6))
    sns.histplot(data=df, x='sales', kde=True, ax=ax)
    st.pyplot(fig)

with tab2:
    fig, ax = plt.subplots(figsize=(10, 6))
    sns.boxplot(data=df, x='city', y='sales', ax=ax)
    st.pyplot(fig)

with tab3:
    fig, ax = plt.subplots(figsize=(10, 6))
    product_summary = df.groupby('product')['sales'].sum()
    product_summary.plot(kind='bar', ax=ax)
    st.pyplot(fig)

# 原始数据
if show_raw_data:
    st.subheader("原始数据")
    st.dataframe(df)

# 数据下载
st.sidebar.download_button(
    label="下载数据CSV",
    data=df.to_csv(index=False).encode('utf-8'),
    file_name="sales_data.csv",
    mime="text/csv"
)
"""

6.4 自动化报告生成

# 使用Jupyter Notebook自动生成报告
def generate_analysis_report(df, output_path="analysis_report.html"):
    """
    生成HTML分析报告
    """
    from nbconvert import HTMLExporter
    import nbformat as nbf
    
    # 创建Notebook对象
    nb = nbf.v4.new_notebook()
    
    # 添加Markdown单元格
    markdown_text = f"""
# 数据分析报告

## 数据概览
- 数据形状: {df.shape}
- 内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB

## 关键指标
- 总记录数: {len(df)}
- 缺失值: {df.isnull().sum().sum()}
- 重复值: {df.duplicated().sum()}
"""
    
    # 添加代码单元格
    code_cells = [
        "import pandas as pd\nimport matplotlib.pyplot as plt\nimport seaborn as sns\n\n# 加载数据\n# df = pd.read_csv('your_data.csv')",
        "df.head()",
        "df.describe()",
        "plt.figure(figsize=(10, 6))\nsns.heatmap(df.corr(), annot=True, cmap='coolwarm')\nplt.title('相关性热力图')\nplt.show()"
    ]
    
    nb['cells'] = [
        nbf.v4.new_markdown_cell(markdown_text),
        nbf.v4.new_code_cell(code_cells[0]),
        nbf.v4.new_code_cell(code_cells[1]),
        nbf.v4.new_code_cell(code_cells[2]),
        nbf.v4.new_code_cell(code_cells[3])
    ]
    
    # 导出为HTML
    with open('temp_notebook.ipynb', 'w') as f:
        nbf.write(nb, f)
    
    # 转换为HTML
    html_exporter = HTMLExporter()
    body, resources = html_exporter.from_filename('temp_notebook.ipynb')
    
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(body)
    
    print(f"报告已生成: {output_path}")
    import os
    os.remove('temp_notebook.ipynb')

# 使用示例
# generate_analysis_report(df_ecom)

第七章:大数据处理与分布式计算

7.1 Dask:Pandas的分布式替代品

# 需要安装:pip install dask[complete]
try:
    import dask.dataframe as dd
    import dask.array as da
    from dask.distributed import Client, LocalCluster
    
    # 启动本地集群
    cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='2GB')
    client = Client(cluster)
    print(f"Dask Dashboard: {client.dashboard_link}")
    
    # 创建Dask DataFrame
    # 模拟大数据
    df_large = pd.DataFrame({
        'id': range(1000000),
        'value': np.random.randn(1000000),
        'category': np.random.choice(['A', 'B', 'C'], 1000000)
    })
    df_large.to_csv('large_data.csv', index=False)
    
    # 使用Dask读取
    ddf = dd.read_csv('large_data.csv')
    print(f"Dask DataFrame形状: {ddf.shape}")
    print(f"分区数: {ddf.npartitions}")
    
    # 延迟计算
    result = ddf.groupby('category')['value'].mean()
    print("延迟计算对象:", result)
    
    # 触发计算
    computed_result = result.compute()
    print("计算结果:\n", computed_result)
    
    # 复杂操作
    result_complex = ddf[
        ddf['value'] > 0
    ].groupby('category').agg({
        'value': ['mean', 'std', 'count']
    }).compute()
    print("复杂计算结果:\n", result_complex)
    
    # 关闭集群
    client.close()
    cluster.close()
    
except ImportError:
    print("Dask未安装,跳过该部分")

7.2 PySpark基础

# 需要安装:pip install pyspark
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sum, avg, count, when
    
    # 创建Spark会话
    spark = SparkSession.builder \
        .appName("DataAnalysis") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    # 创建DataFrame
    data = [(1, "Alice", 25, "Engineer", 80000),
            (2, "Bob", 30, "Manager", 100000),
            (3, "Charlie", 35, "Director", 150000),
            (4, "David", 28, "Engineer", 85000)]
    
    columns = ["id", "name", "age", "title", "salary"]
    df_spark = spark.createDataFrame(data, columns)
    
    # 基本操作
    df_spark.show()
    df_spark.printSchema()
    
    # SQL查询
    df_spark.createOrReplaceTempView("employees")
    result = spark.sql("""
        SELECT title, 
               AVG(salary) as avg_salary,
               COUNT(*) as count
        FROM employees
        GROUP BY title
        ORDER BY avg_salary DESC
    """)
    result.show()
    
    # DataFrame API
    df_processed = df_spark \
        .filter(col("age") > 25) \
        .groupBy("title") \
        .agg(
            avg("salary").alias("avg_salary"),
            count("*").alias("count")
        ) \
        .orderBy("avg_salary", ascending=False)
    
    df_processed.show()
    
    # 转换为Pandas(小数据量)
    if df_processed.count() < 10000:
        pandas_df = df_processed.toPandas()
        print("转换为Pandas DataFrame:")
        print(pandas_df)
    
    spark.stop()
    
except ImportError:
    print("PySpark未安装,跳过该部分")
except Exception as e:
    print(f"Spark启动失败: {e}")

7.3 数据库连接与查询

import sqlite3
import sqlalchemy
from sqlalchemy import create_engine, text

# 1. SQLite示例
conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS sales (
    id INTEGER PRIMARY KEY,
    product TEXT,
    city TEXT,
    revenue REAL,
    sale_date DATE
)
""")

# 插入数据
sample_data = [
    ('手机', '北京', 15000, '2023-01-01'),
    ('电脑', '上海', 25000, '2023-01-02'),
    ('耳机', '广州', 8000, '2023-01-03')
]
cursor.executemany("INSERT INTO sales (product, city, revenue, sale_date) VALUES (?, ?, ?, ?)", sample_data)
conn.commit()

# 查询
cursor.execute("SELECT * FROM sales WHERE revenue > 10000")
results = cursor.fetchall()
print("SQLite查询结果:")
for row in results:
    print(row)

conn.close()

# 2. SQLAlchemy(支持多种数据库)
# 创建引擎(SQLite)
engine = create_engine('sqlite:///example.db')

# 使用Pandas读取
df_from_db = pd.read_sql("SELECT * FROM sales", engine)
print("\nSQLAlchemy读取结果:")
print(df_from_db)

# 使用Pandas写入
df_ecom_sample = df_ecom.head(100)
df_ecom_sample.to_sql('ecom_sales', engine, if_exists='replace', index=False)

# 使用SQL查询
with engine.connect() as conn:
    result = conn.execute(text("SELECT city, AVG(revenue) as avg_rev FROM ecom_sales GROUP BY city"))
    print("\nSQL查询结果:")
    for row in result:
        print(row)

# 3. 连接PostgreSQL(需要安装psycopg2)
# engine = create_engine('postgresql://user:password@localhost:5432/mydb')

第八章:时间序列预测与高级分析

8.1 时间序列预测基础

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

# 1. 创建时间序列数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
# 包含趋势、季节性和噪声
trend = np.linspace(100, 200, len(dates))
seasonality = 20 * np.sin(2 * np.pi * np.arange(len(dates)) / 12)
noise = np.random.normal(0, 5, len(dates))
sales = trend + seasonality + noise

ts_data = pd.DataFrame({'date': dates, 'sales': sales})
ts_data.set_index('date', inplace=True)

# 2. 划分训练测试集
train_size = int(len(ts_data) * 0.8)
train, test = ts_data.iloc[:train_size], ts_data.iloc[train_size:]

# 3. ARIMA模型
# 自动选择参数(简化版)
def find_best_arima(train, test, p_range=3, d_range=2, q_range=3):
    best_score = float('inf')
    best_params = None
    
    for p in range(p_range):
        for d in range(d_range):
            for q in range(q_range):
                try:
                    model = ARIMA(train, order=(p, d, q))
                    model_fit = model.fit()
                    forecast = model_fit.forecast(steps=len(test))
                    mse = mean_squared_error(test, forecast)
                    if mse < best_score:
                        best_score = mse
                        best_params = (p, d, q)
                except:
                    continue
    
    return best_params, best_score

best_params, best_score = find_best_arima(train['sales'], test['sales'])
print(f"最佳ARIMA参数: {best_params}, MSE: {best_score:.2f}")

# 使用最佳参数训练
model = ARIMA(train['sales'], order=best_params)
model_fit = model.fit()
print(model_fit.summary())

# 预测
forecast = model_fit.forecast(steps=len(test))
forecast_df = pd.DataFrame({'forecast': forecast}, index=test.index)

# 评估
mse = mean_squared_error(test['sales'], forecast)
mae = mean_absolute_error(test['sales'], forecast)
print(f"\nARIMA评估:")
print(f"均方误差: {mse:.2f}")
print(f"平均绝对误差: {mae:.2f}")

# 可视化
plt.figure(figsize=(12, 6))
plt.plot(train.index, train['sales'], label='训练集')
plt.plot(test.index, test['sales'], label='真实值', color='orange')
plt.plot(forecast_df.index, forecast_df['forecast'], label='ARIMA预测', color='red', linestyle='--')
plt.title('ARIMA时间序列预测')
plt.legend()
plt.show()

8.2 Prophet:Facebook时间序列预测

# 需要安装:pip install prophet
try:
    from prophet import Prophet
    
    # 准备数据(Prophet需要特定格式)
    prophet_df = ts_data.reset_index()
    prophet_df.columns = ['ds', 'y']
    
    # 训练模型
    model_prophet = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=False,
        daily_seasonality=False,
        changepoint_prior_scale=0.05
    )
    model_prophet.fit(prophet_df)
    
    # 创建未来数据框
    future = model_prophet.make_future_dataframe(periods=12, freq='M')
    
    # 预测
    forecast_prophet = model_prophet.predict(future)
    
    # 可视化
    fig1 = model_prophet.plot(forecast_prophet)
    plt.title('Prophet预测')
    plt.show()
    
    fig2 = model_prophet.plot_components(forecast_prophet)
    plt.show()
    
    # 评估(在测试集上)
    test_prophet = forecast_prophet.tail(len(test))
    mse_prophet = mean_squared_error(test['sales'], test_prophet['yhat'])
    print(f"Prophet MSE: {mse_prophet:.2f}")
    
except ImportError:
    print("Prophet未安装,跳过该部分")

8.3 高级时间序列分析

# 1. 自相关分析
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plot_acf(ts_data['sales'], lags=24, ax=plt.gca())
plt.title('自相关函数(ACF)')

plt.subplot(2, 1, 2)
plot_pacf(ts_data['sales'], lags=24, ax=plt.gca())
plt.title('偏自相关函数(PACF)')
plt.tight_layout()
plt.show()

# 2. 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(ts_data['sales'], model='additive', period=12)
fig = decomposition.plot()
fig.set_size_inches(12, 8)
plt.show()

# 3. 平稳性检验(ADF检验)
from statsmodels.tsa.stattools import adfuller
result = adfuller(ts_data['sales'])
print('ADF检验结果:')
print(f'统计量: {result[0]}')
print(f'p值: {result[1]}')
print('临界值:')
for key, value in result[4].items():
    print(f'   {key}: {value}')
if result[1] < 0.05:
    print("序列是平稳的")
else:
    print("序列是非平稳的,需要差分")

# 4. 差分处理
ts_data['sales_diff1'] = ts_data['sales'].diff(1)
ts_data['sales_diff12'] = ts_data['sales'].diff(12)

# 5. 协整检验(多变量)
def cointegration_test(df, col1, col2):
    from statsmodels.tsa.stattools import coint
    score, p_value, _ = coint(df[col1], df[col2])
    print(f"协整检验 p值: {p_value:.4f}")
    if p_value < 0.05:
        print(f"{col1} 和 {col2} 存在协整关系")
    else:
        print(f"{col1} 和 {col2} 不存在协整关系")

# 示例:如果有多个时间序列
# cointegration_test(ts_data, 'sales', 'sales_diff1')

第九章:机器学习工程化与部署

9.1 模型持久化

import joblib
import pickle
import os

# 1. 使用joblib(推荐用于sklearn模型)
def save_model_joblib(model, filename):
    """保存模型"""
    joblib.dump(model, filename)
    print(f"模型已保存: {filename}")

def load_model_joblib(filename):
    """加载模型"""
    if os.path.exists(filename):
        model = joblib.load(filename)
        print(f"模型已加载: {filename}")
        return model
    else:
        print("模型文件不存在")
        return None

# 训练并保存模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
save_model_joblib(model, 'random_forest_model.joblib')

# 加载并使用
loaded_model = load_model_joblib('random_forest_model.joblib')
if loaded_model:
    predictions = loaded_model.predict(X_test)
    print(f"加载模型准确率: {accuracy_score(y_test, predictions):.3f}")

# 2. 使用pickle(通用)
def save_model_pickle(model, filename):
    with open(filename, 'wb') as f:
        pickle.dump(model, f)
    print(f"模型已保存: {filename}")

def load_model_pickle(filename):
    with open(filename, 'rb') as f:
        model = pickle.load(f)
    return model

# 3. 保存预处理管道
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('model', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)
save_model_joblib(pipeline, 'full_pipeline.joblib')

# 4. 模型版本管理
def save_model_versioned(model, prefix='model', version='1.0'):
    """版本化保存模型"""
    filename = f"{prefix}_v{version}.joblib"
    save_model_joblib(model, filename)
    return filename

# 保存多个版本
save_model_versioned(model, prefix='rf', version='1.0')
save_model_versioned(model, prefix='rf', version='1.1')

9.2 模型API开发(FastAPI)

# 需要安装:pip install fastapi uvicorn
"""
# 保存为 api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd

app = FastAPI(title="机器学习模型API", version="1.0")

# 加载模型
model = joblib.load('random_forest_model.joblib')

# 定义输入数据模型
class PredictionInput(BaseModel):
    features: list[float]
    feature_names: list[str] = None

# 定义输出模型
class PredictionOutput(BaseModel):
    prediction: int
    probability: float

@app.get("/")
def read_root():
    return {"message": "机器学习模型API", "version": "1.0"}

@app.post("/predict", response_model=PredictionOutput)
def predict(input_data: PredictionInput):
    try:
        # 转换为DataFrame
        if input_data.feature_names:
            X = pd.DataFrame([input_data.features], columns=input_data.feature_names)
        else:
            X = np.array([input_data.features])
        
        # 预测
        prediction = model.predict(X)[0]
        probability = model.predict_proba(X)[0][prediction]
        
        return PredictionOutput(prediction=int(prediction), probability=float(probability))
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict_batch")
def predict_batch(inputs: list[PredictionInput]):
    try:
        features_list = []
        for input_data in inputs:
            features_list.append(input_data.features)
        
        X = np.array(features_list)
        predictions = model.predict(X)
        probabilities = model.predict_proba(X)
        
        results = []
        for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
            results.append({
                "index": i,
                "prediction": int(pred),
                "probability": float(prob[pred])
            })
        
        return {"predictions": results}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

# 运行命令: uvicorn api:app --reload
"""

9.3 模型监控与评估

# 1. 持续评估指标
def calculate_drift_metrics(old_data, new_data, threshold=0.05):
    """
    检测数据漂移
    """
    from scipy import stats
    
    drift_results = {}
    for col in old_data.columns:
        if old_data[col].dtype in [np.number]:
            # KS检验
            ks_stat, p_value = stats.ks_2samp(old_data[col], new_data[col])
            drift_results[col] = {
                'ks_statistic': ks_stat,
                'p_value': p_value,
                'drift': p_value < threshold
            }
    return drift_results

# 2. 模型性能监控
def monitor_model_performance(y_true, y_pred, y_proba, threshold=0.8):
    """
    监控模型性能指标
    """
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred, average='weighted'),
        'recall': recall_score(y_true, y_pred, average='weighted'),
        'f1': f1_score(y_true, y_pred, average='weighted')
    }
    
    # 检查是否需要重新训练
    needs_retraining = metrics['accuracy'] < threshold
    
    return metrics, needs_retraining

# 3. 日志记录
import logging
logging.basicConfig(
    filename='model_monitoring.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def log_prediction(features, prediction, probability):
    logging.info(f"Features: {features}, Prediction: {prediction}, Probability: {probability}")

# 4. A/B测试框架
class ABTestFramework:
    def __init__(self, model_a, model_b, traffic_split=0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.traffic_split = traffic_split
        self.results = {'A': [], 'B': []}
    
    def predict(self, X):
        import random
        if random.random() < self.traffic_split:
            model = self.model_a
            version = 'A'
        else:
            model = self.model_b
            version = 'B'
        
        pred = model.predict(X)
        prob = model.predict_proba(X)
        
        self.results[version].append({
            'prediction': pred[0],
            'probability': prob[0][pred[0]]
        })
        
        return pred, prob, version
    
    def get_stats(self):
        stats = {}
        for version in ['A', 'B']:
            if self.results[version]:
                avg_prob = np.mean([r['probability'] for r in self.results[version]])
                stats[version] = {
                    'count': len(self.results[version]),
                    'avg_probability': avg_prob
                }
        return stats

# 使用示例
# ab_test = ABTestFramework(model_v1, model_v2)
# pred, prob, version = ab_test.predict(X_test[:1])

第十章:职业发展路径与实战项目

10.1 数据分析师职业路径

阶段1:初级数据分析师(0-2年)

  • 核心技能
    • 精通SQL和Excel
    • 掌握Python基础(Pandas、NumPy)
    • 数据可视化(Matplotlib、Seaborn)
    • 统计学基础(描述统计、假设检验)
  • 典型工作
    • 制作日报/周报
    • 基础数据提取和清洗
    • 描述性分析报告
  • 薪资范围:8-15万/年(国内)

阶段2:中级数据分析师(2-5年)

  • 新增技能
    • 掌握机器学习基础(Scikit-learn)
    • A/B测试设计
    • 因果推断
    • 数据仓库基础
  • 典型工作
    • 深入业务分析
    • 构建分析框架
    • 预测模型开发
  • 薪资范围:15-30万/年

阶段3:高级数据分析师/数据科学家(5年+)

  • 新增技能
    • 高级统计建模
    • 深度学习
    • 大数据技术(Spark、Hadoop)
    • 工程化能力(API开发、模型部署)
  • 典型工作
    • 复杂建模
    • 战略级分析
    • 技术架构设计
  • 薪资范围:30-60万/年,部分可达80万+

10.2 数据科学家核心能力矩阵

能力维度 初级 中级 高级 专家
编程能力 Python基础 熟练Pandas 熟练Spark 系统架构
统计学 描述统计 假设检验 贝叶斯方法 理论创新
机器学习 基础算法 熟练调参 集成学习 深度学习
业务理解 理解指标 洞察驱动 战略思维 商业闭环
工程能力 Jupyter Git/Docker CI/CD MLOps
可视化 基础图表 交互式仪表板 自动化报告 数据产品

10.3 实战项目推荐

项目1:电商用户行为分析

  • 技术栈:Pandas + Seaborn + SQL
  • 数据:用户浏览、购买、评价数据
  • 输出:用户画像、RFM模型、转化漏斗
  • 亮点:业务价值明确,适合简历

项目2:销售预测系统

  • 技术栈:Prophet/XGBoost + Streamlit + Docker
  • 数据:历史销售数据
  • 输出:预测模型 + 交互式仪表板
  • 亮点:端到端项目,展示工程能力

项目3:用户流失预测

  • 技术栈:Scikit-learn + 特征工程 + 模型解释
  • 数据:用户行为数据
  • 输出:流失预测模型 + 特征重要性分析
  • 亮点:分类问题,业务价值高

项目4:实时推荐系统

  • 技术栈:PySpark + Kafka + Flask/FastAPI
  • 数据:用户行为日志
  • 输出:实时推荐API
  • 亮点:大数据处理,实时计算

项目5:异常检测系统

  • 技术栈:Isolation Forest + DBSCAN + 可视化
  • 数据:监控数据、交易数据
  • 输出:异常检测模型 + 告警系统
  • 亮点:算法深度,工程实践

10.4 简历与面试准备

简历要点:

  1. 量化成果:不要写”优化了模型”,要写”将模型准确率从85%提升到92%,减少业务损失200万/年”
  2. 项目结构:STAR法则(情境、任务、行动、结果)
  3. 技术关键词:根据JD调整,突出匹配技能
  4. GitHub:展示代码质量,有README和文档

面试准备:

  • 技术面试

    • SQL手写(窗口函数、CTE)
    • Python手写(Pandas操作、算法题)
    • 统计学(假设检验、置信区间)
    • 机器学习(过拟合、特征工程、模型评估)
  • 业务面试

    • A/B测试设计
    • 指标体系搭建
    • 异常分析思路
    • 业务场景建模
  • 行为面试

    • 项目难点与解决
    • 跨部门协作
    • 持续学习能力

10.5 持续学习资源

在线课程:

  • Coursera: Andrew Ng机器学习
  • Kaggle Learn: 免费实战课程
  • DataCamp: 交互式编程

书籍推荐:

  • 《利用Python进行数据分析》
  • 《Python数据科学手册》
  • 《统计学习方法》
  • 《机器学习》(周志华)

社区与竞赛:

  • Kaggle: 参加比赛,学习优秀方案
  • 天池大赛: 国内竞赛平台
  • Kaggle Discussion: 学习讨论
  • GitHub: 关注热门项目

技术博客:

  • Towards Data Science
  • 机器之心
  • 量子位
  • 个人博客(如李沐、吴恩达)

10.6 薪资谈判技巧

市场调研:

  • 使用Glassdoor、拉勾网、Boss直聘调研目标公司薪资
  • 了解行业平均薪资水平
  • 考虑城市差异(一线城市高30-50%)

谈判策略:

  1. 不要先报价:让HR先给范围
  2. 基于价值:强调你能为公司创造的价值
  3. 总包概念:base + 奖金 + 股票 + 福利
  4. 备选方案:有其他offer时更有议价权
  5. 职业发展:考虑长期成长而非短期薪资

常见误区:

  • 只看base,忽略奖金和股票
  • 忽视公司成长性
  • 过早暴露底线
  • 缺乏备选方案

附录:常用工具与快捷键

A.1 Jupyter Notebook快捷键

  • Shift + Enter: 运行单元格
  • A: 在上方插入单元格
  • B: 在下方插入单元格
  • D + D: 删除单元格
  • M: 切换到Markdown模式
  • Y: 切换到代码模式
  • H: 显示快捷键帮助

A.2 Pandas常用操作速查

# 读取数据
df = pd.read_csv('file.csv')
df = pd.read_excel('file.xlsx')
df = pd.read_json('file.json')

# 查看数据
df.head()
df.info()
df.describe()
df.shape
df.dtypes

# 选择数据
df['column']           # 单列
df[['col1', 'col2']]  # 多列
df.iloc[0:5, 0:3]     # 位置索引
df.loc[0:5, ['col1', 'col2']]  # 标签索引

# 筛选
df[df['col'] > 100]
df.query('col > 100')
df[df['col'].isin([1, 2, 3])]

# 处理缺失值
df.dropna()
df.fillna(value)
df.interpolate()

# 分组聚合
df.groupby('col')['value'].agg(['mean', 'sum', 'count'])

# 合并
pd.concat([df1, df2])
pd.merge(df1, df2, on='key', how='inner')

# 时间序列
df['date'] = pd.to_datetime(df['date'])
df.resample('M').sum()
df.rolling(7).mean()

# 处理文本
df['col'].str.upper()
df['col'].str.contains('pattern')
df['col'].str.replace('old', 'new')

# 处理类别
df['col'] = df['col'].astype('category')
pd.get_dummies(df, columns=['col'])

A.3 常用库安装命令

# 基础环境
conda create -n data_analysis python=3.10
conda activate data_analysis

# 核心库
conda install pandas numpy matplotlib seaborn scikit-learn
pip install jupyterlab

# 高级分析
pip install statsmodels
pip install xgboost lightgbm
pip install plotly
pip install streamlit

# 大数据
pip install dask[complete]
pip install pyspark

# 部署
pip install fastapi uvicorn
pip install joblib

# 开发工具
pip install black flake8 pytest

A.4 代码风格与最佳实践

# 1. 使用函数封装重复逻辑
def load_and_clean_data(filepath):
    """加载并清洗数据"""
    df = pd.read_csv(filepath)
    df = df.dropna(subset=['key_column'])
    df['date'] = pd.to_datetime(df['date'])
    return df

# 2. 使用类型提示
from typing import List, Optional, Dict
import numpy as np
import pandas as pd

def calculate_metrics(df: pd.DataFrame, 
                     target_col: str, 
                     group_col: Optional[str] = None) -> Dict[str, float]:
    """计算指标"""
    if group_col:
        result = df.groupby(group_col)[target_col].mean().to_dict()
    else:
        result = {target_col: df[target_col].mean()}
    return result

# 3. 使用配置文件
import yaml
def load_config(config_path='config.yaml'):
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

# config.yaml
# data:
#   filepath: "data.csv"
#   test_size: 0.2
# model:
#   random_state: 42

# 4. 使用日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_data(df):
    logger.info(f"Processing data with shape {df.shape}")
    # ... processing
    logger.info("Processing complete")
    return df

# 5. 使用if __name__ == "__main__"
if __name__ == "__main__":
    # 只在直接运行脚本时执行
    main()

结语

本课程从Python基础到高级机器学习,从数据处理到模型部署,涵盖了数据分析师/科学家的完整知识体系。记住,理论学习 + 项目实践 + 持续学习 是成长的关键。

下一步行动:

  1. 立即开始:选择一个项目,用本课程的代码模板开始实践
  2. 建立习惯:每天至少1小时编码,每周至少1个Kaggle Notebook
  3. 社区参与:在GitHub上贡献代码,在Kaggle上分享方案
  4. 职业规划:根据当前水平,制定6个月学习计划

最后建议:

  • 不要追求完美,先完成再优化
  • 代码能跑通是第一步,然后是效率,最后是优雅
  • 业务理解比技术更重要
  • 保持好奇心,持续学习新技术

祝你在数据分析的道路上越走越远,成为真正的数据驱动决策者!