引言:为什么选择Python进行数据分析?
Python已经成为数据科学领域的王者语言。根据2023年Kaggle调查,超过90%的数据科学家使用Python作为主要编程语言。本课程将带你从入门走向精通,涵盖从基础数据处理到高级机器学习建模的全过程,并为你规划清晰的职业发展路径。
学习目标
- 掌握Python数据分析核心库(Pandas、NumPy、Matplotlib/Seaborn)
- 理解数据清洗、探索性分析(EDA)和特征工程
- 学习使用Scikit-learn构建机器学习模型
- 掌握时间序列分析和大数据处理技术
- 了解数据可视化和仪表板开发
- 规划数据分析师/科学家的职业路径
第一章:Python数据分析基础(入门阶段)
1.1 环境搭建与工具链
推荐工具组合:
- Anaconda:一站式数据科学环境(包含Jupyter、Pandas等)
- Jupyter Notebook/Lab:交互式编程环境
- VS Code:轻量级编辑器(适合生产环境)
- Git/GitHub:版本控制和项目展示
# 安装Anaconda(推荐)
wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh
bash Anaconda3-2023.09-0-Linux-x86_64.sh
# 创建专用环境
conda create -n data_analysis python=3.10
conda activate data_analysis
# 安装核心库
conda install pandas numpy matplotlib seaborn scikit-learn
pip install jupyterlab
1.2 NumPy:科学计算基础
NumPy是Python科学生态系统的基石,提供高性能的多维数组对象。
import numpy as np
# 创建数组
arr = np.array([[1, 2, 3], [4, 5, 6]])
print(f"数组形状: {arr.shape}") # (2, 3)
# 基本运算(向量化)
arr2 = arr * 2 # 所有元素乘以2
arr3 = arr + np.array([10, 20, 30]) # 广播机制
# 索引与切片
print(arr[0, 1]) # 第一行第二列:2
print(arr[:, 1]) # 所有行第二列:[2 5]
# 常用函数
mean = np.mean(arr) # 平均值
std = np.std(arr) # 标准差
normalized = (arr - mean) / std # 标准化
# 随机数生成
np.random.seed(42) # 固定随机种子
random_data = np.random.normal(0, 1, (1000, 5)) # 1000行5列正态分布
性能对比示例:
import time
# Python原生列表
python_list = list(range(1000000))
start = time.time()
result = [x * 2 for x in python_list]
print(f"Python列表耗时: {time.time() - start:.4f}秒")
# NumPy数组
numpy_array = np.arange(1000000)
start = time.time()
result = numpy_array * 2
print(f"NumPy数组耗时: {time.time() - start:.4f}秒")
# NumPy通常比原生Python快10-100倍
1.3 Pandas:数据处理核心库
Pandas提供了DataFrame这一强大的表格数据结构,是数据分析的瑞士军刀。
import pandas as pd
# 创建DataFrame
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'薪资': [15000, 20000, 25000, 18000]
}
df = pd.DataFrame(data)
# 基础探索
print(df.head()) # 前5行
print(df.info()) # 数据类型和缺失值
print(df.describe()) # 统计摘要
# 数据筛选
# 筛选年龄大于28的记录
senior = df[df['年龄'] > 28]
# 多条件筛选:年龄>28且薪资>20000
condition = (df['年龄'] > 28) & (df['薪资'] > 20000)
senior_high = df[condition]
# 数据分组聚合
# 按城市计算平均薪资
city_salary = df.groupby('城市')['薪资'].agg(['mean', 'count', 'std'])
print(city_salary)
# 处理缺失值
df_with_nan = df.copy()
df_with_nan.loc[1, '薪资'] = np.nan
# 填充缺失值
df_filled = df_with_nan.fillna({'薪资': df_with_nan['薪资'].median()})
# 删除缺失值
df_dropped = df_with_nan.dropna()
# 数据合并
df1 = df.iloc[:2]
df2 = df.iloc[2:]
merged = pd.concat([df1, df2], ignore_index=True)
# 时间序列处理
date_range = pd.date_range('2023-01-01', periods=5, freq='D')
df_time = pd.DataFrame({
'date': date_range,
'value': np.random.randn(5)
})
df_time['year'] = df_time['date'].dt.year
df_time['day_of_week'] = df_time['date'].dt.dayofweek
Pandas性能优化技巧:
# 1. 使用向量化操作替代循环
# 慢:for循环
# 快:df['new'] = df['col1'] * df['col2']
# 2. 使用Categorical类型处理重复字符串
df['城市'] = df['城市'].astype('category') # 节省内存,加速分组
# 3. 使用query()方法进行高效筛选
# 慢:df[(df['年龄'] > 25) & (df['薪资'] < 20000)]
# 快:df.query('年龄 > 25 and 薪资 < 20000')
# 4. 使用eval()进行高效计算
df['总成本'] = df.eval('薪资 * 1.5 + 5000')
1.4 数据可视化基础
import matplotlib.pyplot as plt
import seaborn as sns
# 设置中文字体(解决中文显示问题)
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 1. 折线图
plt.figure(figsize=(10, 6))
plt.plot(df['年龄'], df['薪资'], marker='o', linestyle='--', color='blue')
plt.title('年龄与薪资关系', fontsize=14)
plt.xlabel('年龄')
plt.ylabel('薪资')
plt.grid(True, alpha=0.3)
plt.show()
# 2. 柱状图
plt.figure(figsize=(10, 6))
plt.bar(df['城市'], df['薪资'], color=['red', 'green', 'blue', 'orange'])
plt.title('各城市薪资对比')
plt.xlabel('城市')
plt.ylabel('薪资')
plt.show()
# 3. 散点图(Seaborn)
sns.set_style("whitegrid")
plt.figure(figsize=(10, 6))
sns.scatterplot(data=df, x='年龄', y='薪资', hue='城市', size='年龄', sizes=(100, 300))
plt.title('年龄与薪资关系(按城市着色)')
plt.show()
# 4. 箱线图
plt.figure(figsize=(10, 6))
sns.boxplot(data=df, x='城市', y='薪资')
plt.title('各城市薪资分布')
plt.show()
# 5. 相关性热力图
corr_matrix = df[['年龄', '薪资']].corr()
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('相关性热力图')
plt.show()
1.5 实战案例:电商销售数据分析
# 生成模拟数据
np.random.seed(42)
n = 1000
data = {
'order_id': range(1000, 1000+n),
'customer_id': np.random.randint(1, 200, n),
'product': np.random.choice(['手机', '电脑', '耳机', '平板'], n),
'category': np.random.choice(['电子', '配件', '家电'], n),
'quantity': np.random.randint(1, 5, n),
'price': np.random.uniform(100, 1000, n),
'order_date': pd.date_range('2023-01-01', periods=n, freq='H'),
'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], n)
}
df_ecom = pd.DataFrame(data)
df_ecom['revenue'] = df_ecom['quantity'] * df_ecom['price']
# 1. 销售总额
total_revenue = df_ecom['revenue'].sum()
print(f"总销售额: {total_revenue:,.2f}")
# 2. 按产品统计
product_stats = df_ecom.groupby('product').agg({
'revenue': ['sum', 'mean', 'count'],
'quantity': 'sum'
}).round(2)
print(product_stats)
# 3. 按月份统计
df_ecom['month'] = df_ecom['order_date'].dt.month
monthly_sales = df_ecom.groupby('month')['revenue'].sum()
print(monthly_sales)
# 4. 热销产品TOP5
top_products = df_ecom.groupby('product')['revenue'].sum().nlargest(5)
print("热销产品TOP5:\n", top_products)
# 5. 可视化:月度销售趋势
plt.figure(figsize=(12, 6))
monthly_sales.plot(kind='bar', color='skyblue')
plt.title('2023年月度销售趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.xticks(rotation=0)
plt.show()
第二章:数据清洗与探索性分析(EDA)
2.1 数据质量评估
# 模拟脏数据
df_dirty = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eva', None, 'Grace'],
'age': [25, 30, np.nan, 35, 28, 40, 29],
'salary': [50000, 60000, 70000, 80000, 55000, 65000, np.nan],
'email': ['alice@company.com', 'bob@company.com', 'invalid-email', 'david@company.com', 'eva@company.com', 'grace@company.com', None],
'join_date': ['2020-01-15', '2019-03-20', '2021-06-10', '2018-11-05', '2020-09-12', '2019-07-08', '2021-02-28']
})
# 1. 缺失值分析
def analyze_missing(df):
missing = df.isnull().sum()
missing_percent = (missing / len(df)) * 100
missing_df = pd.DataFrame({'缺失数量': missing, '缺失率(%)': missing_percent.round(2)})
return missing_df.sort_values('缺失数量', ascending=False)
print("缺失值分析:")
print(analyze_missing(df_dirty))
# 2. 数据类型检查
print("\n数据类型:")
print(df_dirty.dtypes)
# 3. 重复值检查
df_duplicate = pd.DataFrame({
'A': [1, 2, 2, 3],
'B': ['x', 'y', 'y', 'z']
})
print(f"\n重复行数: {df_duplicate.duplicated().sum()}")
print("重复行:")
print(df_duplicate[df_duplicate.duplicated()])
# 4. 异常值检测(IQR方法)
def detect_outliers_iqr(series):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return (series < lower_bound) | (series > upper_bound)
# 检测年龄异常值
age_outliers = detect_outliers_iqr(df_dirty['age'])
print(f"\n年龄异常值:\n{df_dirty[age_outliers]}")
2.2 数据清洗实战
# 1. 处理缺失值
# 删除缺失值
df_cleaned = df_dirty.dropna(subset=['name', 'email']) # 删除关键字段缺失的行
# 填充缺失值
df_filled = df_dirty.copy()
df_filled['age'] = df_filled['age'].fillna(df_filled['age'].median())
df_filled['salary'] = df_filled['dollar'].fillna(df_filled['salary'].median())
# 2. 处理异常值
# 年龄限制在18-65岁
df_filled['age'] = df_filled['age'].clip(18, 65)
# 3. 数据格式标准化
# 邮箱格式验证
import re
def is_valid_email(email):
if pd.isna(email):
return False
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return re.match(pattern, email) is not None
df_filled['email_valid'] = df_filled['email'].apply(is_valid_email)
print("邮箱验证结果:")
print(df_filled[['email', 'email_valid']])
# 4. 日期格式标准化
df_filled['join_date'] = pd.to_datetime(df_filled['join_date'], errors='coerce')
df_filled['tenure_days'] = (pd.Timestamp.now() - df_filled['join_date']).dt.days
# 5. 文本清洗
df_filled['name'] = df_filled['name'].str.strip().str.title()
df_filled['name_clean'] = df_filled['name'].str.replace(r'[^a-zA-Z]', '', regex=True)
2.3 探索性数据分析(EDA)
# 完整的EDA函数
def perform_eda(df, target_col=None):
"""
执行完整的探索性数据分析
"""
print("="*50)
print("数据概览")
print("="*50)
print(f"数据形状: {df.shape}")
print(f"内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print("\n" + "="*50)
print("数据类型分布")
print("="*50)
print(df.dtypes.value_counts())
print("\n" + "="*50)
print("数值型变量统计")
print("="*50)
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
print(df[numeric_cols].describe().round(2))
print("\n" + "="*50)
print("类别型变量统计")
print("="*50)
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
print(f"\n{col}:")
print(df[col].value_counts().head())
print("\n" + "="*50)
print("缺失值统计")
print("="*50)
missing = df.isnull().sum()
missing = missing[missing > 0]
if len(missing) > 0:
print(missing.sort_values(ascending=False))
else:
print("无缺失值")
# 如果有目标变量,分析其分布
if target_col and target_col in df.columns:
print("\n" + "="*50)
print(f"目标变量 {target_col} 分布")
print("="*50)
if df[target_col].dtype in [np.number]:
print(df[target_col].describe())
plt.figure(figsize=(10, 6))
sns.histplot(df[target_col], kde=True)
plt.title(f'{target_col} 分布直方图')
plt.show()
else:
print(df[target_col].value_counts())
plt.figure(figsize=(10, 6))
df[target_col].value_counts().plot(kind='bar')
plt.title(f'{target_col} 分布')
plt.show()
# 执行EDA
perform_eda(df_ecom, target_col='revenue')
2.4 特征工程基础
# 1. 创建新特征
df_ecom['price_per_item'] = df_ecom['revenue'] / df_ecom['quantity']
df_ecom['is_high_price'] = (df_ecom['price'] > df_ec0m['price'].median()).astype(int)
df_ecom['order_hour'] = df_ecom['order_date'].dt.hour
df_ecom['is_weekend'] = df_ecom['order_date'].dt.dayofweek.isin([5, 6]).astype(int)
# 2. 分箱(Binning)
# 将年龄分箱
df_ecom['age_bin'] = pd.cut(df_ecom['quantity'], bins=[0, 2, 3, 5], labels=['低', '中', '高'])
# 3. 编码类别变量
# 独热编码
df_encoded = pd.get_dummies(df_ecom, columns=['product', 'city'], prefix=['prod', 'city'])
# 4. 文本特征提取
df_text = pd.DataFrame({
'comment': ['这个产品很好', '质量不错', '物流太慢', '非常满意', '一般般']
})
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer()
X = vectorizer.fit_transform(df_text['comment'])
print("文本特征:")
print(pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out()))
第三章:高级数据处理与性能优化
3.1 大数据处理技巧
# 1. 分块读取大文件
def process_large_file(file_path, chunksize=10000):
"""
分块读取CSV文件,处理大数据集
"""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunksize):
# 对每个chunk进行处理
chunk['new_col'] = chunk['col1'] * chunk['col2']
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)
# 2. 使用Dask处理超大数据(替代Pandas)
try:
import dask.dataframe as dd
# Dask延迟计算,适合内存不足的情况
ddf = dd.read_csv('large_file.csv')
result = ddf.groupby('category').revenue.sum().compute()
except ImportError:
print("Dask未安装,使用Pandas替代")
# 3. 内存优化技巧
def optimize_memory(df):
"""
优化DataFrame内存使用
"""
start_mem = df.memory_usage(deep=True).sum() / 1024**2
# 优化数值类型
for col in df.select_dtypes(include=['int']).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
for col in df.select_dtypes(include=['float']).columns:
df[col] = pd.to_numeric(df[col], downcast='float')
# 优化对象类型
for col in df.select_dtypes(include=['object']).columns:
num_unique = df[col].nunique()
num_total = len(df)
if num_unique / num_total < 0.5:
df[col] = df[col].astype('category')
end_mem = df.memory_usage(deep=True).sum() / 1024**2
print(f"内存优化: {start_mem:.2f} MB → {end_mem:.2f} MB ({(1-end_mem/start_mem)*100:.1f}% 减少)")
return df
# 4. 并行处理
from joblib import Parallel, delayed
import multiprocessing
def process_chunk(chunk):
return chunk.groupby('category').revenue.sum()
def parallel_groupby(df, n_jobs=-1):
"""
并行处理大数据分组
"""
if n_jobs == -1:
n_jobs = multiprocessing.cpu_count()
# 将数据分块
chunks = np.array_split(df, n_jobs)
results = Parallel(n_jobs=n_jobs)(delayed(process_chunk)(chunk) for chunk in chunks)
# 合并结果
return pd.concat(results).groupby(level=0).sum()
3.2 时间序列分析
# 1. 创建时间序列数据
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
ts_data = pd.DataFrame({
'date': dates,
'sales': np.random.normal(1000, 200, len(dates)).cumsum() + np.random.normal(0, 50, len(dates))
})
ts_data.set_index('date', inplace=True)
# 2. 时间序列重采样
# 按月汇总
monthly = ts_data.resample('M').sum()
# 按周平均
weekly = ts_data.resample('W').mean()
# 3. 滚动统计
ts_data['rolling_7d'] = ts_data['sales'].rolling(window=7).mean()
ts_data['rolling_30d'] = ts_data['sales'].rolling(window=30).mean()
ts_data['expanding_mean'] = ts_data['sales'].expanding().mean()
# 4. 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
# 需要安装statsmodels: pip install statsmodels
# 模拟月度数据
monthly_data = ts_data.resample('M').sum()
try:
decomposition = seasonal_decompose(monthly_data['sales'], model='additive', period=3)
fig = decomposition.plot()
fig.set_size_inches(12, 8)
plt.show()
except:
print("statsmodels未安装,跳过季节性分解")
# 5. 时间序列可视化
plt.figure(figsize=(14, 7))
plt.plot(ts_data.index, ts_data['sales'], label='原始数据', alpha=0.7)
plt.plot(ts_data.index, ts_data['rolling_7d'], label='7日移动平均', linewidth=2)
plt.plot(ts_data.index, ts_data['rolling_30d'], label='30日移动平均', linewidth=2)
plt.title('销售时间序列分析')
plt.legend()
plt.show()
# 6. 滞后特征(用于机器学习)
ts_data['lag_1'] = ts_data['sales'].shift(1) # 前1天
ts_data['lag_7'] = ts_data['sales'].shift(7) # 前7天(上周同一天)
ts_data['diff_1'] = ts_data['sales'].diff(1) # 日环比变化
ts_data['pct_change_7'] = ts_data['sales'].pct_change(7) # 周同比变化
3.3 高级数据合并与重塑
# 1. 多表合并
df1 = pd.DataFrame({'key': ['A', 'B', 'C'], 'value1': [1, 2, 3]})
df2 = pd.DataFrame({'key': ['B', 'C', 'D'], 'value2': [4, 5, 6]})
# 合并类型
inner_join = pd.merge(df1, df2, on='key', how='inner') # 内连接
left_join = pd.merge(df1, df2, on='key', how='left') # 左连接
right_join = pd.DataFrame({'key': ['B', 'C', 'D'], 'value2': [4, 5, 6]})
right_join = pd.merge(df1, df2, on='key', how='right') # 右连接
outer_join = pd.merge(df1, df2, on='key', how='outer') # 外连接
# 2. 数据透视表
pivot_data = pd.DataFrame({
'date': ['2023-01', '2023-01', '2023-02', '2023-02'],
'city': ['北京', '上海', '北京', '上海'],
'sales': [100, 150, 120, 180],
'profit': [20, 30, 25, 35]
})
pivot_table = pd.pivot_table(pivot_data,
values=['sales', 'profit'],
index='date',
columns='city',
aggfunc='sum',
margins=True)
print("透视表:")
print(pivot_table)
# 3. Melt和Stack
# Melt: 宽表转长表
wide_df = pd.DataFrame({
'product': ['A', 'B'],
'Q1': [100, 120],
'Q2': [110, 130],
'Q3': [120, 140],
'Q4': [130, 150]
})
long_df = pd.melt(wide_df, id_vars=['product'], var_name='quarter', value_name='sales')
print("\nMelt结果:")
print(long_df)
# Stack: 列转行
stacked = pivot_table.stack()
print("\nStack结果:")
print(stacked.head())
第四章:机器学习入门与Scikit-learn实战
4.1 Scikit-learn基础
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.metrics import mean_squared_error, accuracy_score, classification_report
from sklearn.datasets import load_iris, load_boston
from sklearn.pipeline import Pipeline
# 1. 回归问题示例:预测房价
# 加载数据
boston = load_boston()
X = pd.DataFrame(boston.data, columns=boston.feature_names)
y = boston.target
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建Pipeline
pipeline = Pipeline([
('scaler', StandardScaler()), # 标准化
('regressor', LinearRegression()) # 线性回归
])
# 训练模型
pipeline.fit(X_train, y_train)
# 预测与评估
y_pred = pipeline.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = pipeline.score(X_test, y_test)
print(f"均方误差: {mse:.2f}")
print(f"R²分数: {r2:.2f}")
# 2. 分类问题示例:鸢尾花分类
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
X_train, X_test, y_train, y_test = train_test_split(X_iris, y_iris, test_size=0.3, random_state=42)
# 逻辑回归
clf = LogisticRegression(max_iter=200)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"\n分类准确率: {accuracy:.2f}")
print("\n分类报告:")
print(classification_report(y_test, y_pred, target_names=iris.target_names))
4.2 模型评估与交叉验证
from sklearn.model_selection import cross_val_score, KFold, StratifiedKFold
from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_curve
import seaborn as sns
# 1. 交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(pipeline, X, y, cv=kfold, scoring='r2')
print(f"交叉验证R²分数: {cv_scores}")
print(f"平均R²: {cv_scores.mean():.2f} (+/- {cv_scores.std() * 2:.2f})")
# 2. 混淆矩阵(分类问题)
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=iris.target_names,
yticklabels=iris.target_names)
plt.title('混淆矩阵')
plt.ylabel('真实标签')
plt.xlabel('预测标签')
plt.show()
# 3. ROC曲线(二分类)
# 模拟二分类数据
from sklearn.datasets import make_classification
X_binary, y_binary = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X_binary, y_binary, test_size=0.3, random_state=42)
clf_binary = LogisticRegression()
clf_binary.fit(X_train, y_train)
y_proba = clf_binary.predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()
4.3 特征选择与降维
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
# 1. 特征选择
# 使用随机森林评估特征重要性
rf = RandomForestRegressor(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': rf.feature_importances_
}).sort_values('importance', ascending=False)
print("特征重要性(随机森林):")
print(feature_importance)
# 可视化
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance.head(10), x='importance', y='feature')
plt.title('Top 10 Feature Importance')
plt.show()
# 2. 选择Top K特征
selector = SelectKBest(score_func=f_regression, k=5)
X_selected = selector.fit_transform(X_train, y_train)
selected_features = X.columns[selector.get_support()]
print(f"\n选择的特征: {list(selected_features)}")
# 3. PCA降维
pca = PCA(n_components=2)
X_pca = pca.fit_transform(StandardScaler().fit_transform(X))
plt.figure(figsize=(10, 6))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='viridis', alpha=0.6)
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} variance)')
plt.title('PCA降维可视化')
plt.colorbar(label='Target')
plt.show()
4.4 超参数调优
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import randint
# 1. 网格搜索
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10]
}
rf = RandomForestClassifier(random_state=42)
grid_search = GridSearchCV(rf, param_grid, cv=3, scoring='accuracy', n_jobs=-1)
grid_search.fit(X_train, y_train)
print("最佳参数:", grid_search.best_params_)
print("最佳分数:", grid_search.best_score_)
# 2. 随机搜索(更高效)
param_dist = {
'n_estimators': randint(50, 200),
'max_depth': randint(5, 50),
'min_samples_split': randint(2, 20)
}
random_search = RandomizedSearchCV(rf, param_dist, n_iter=20, cv=3, random_state=42, n_jobs=-1)
random_search.fit(X_train, y_train)
print("\n随机搜索最佳参数:", random_search.best_params_)
第五章:高级机器学习与集成学习
5.1 集成学习方法
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import VotingClassifier, StackingClassifier
# 1. Bagging:随机森林
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)
print(f"随机森林准确率: {rf.score(X_test, y_test):.3f}")
# 2. Boosting:梯度提升
gb = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42)
gb.fit(X_train, y_train)
print(f"梯度提升准确率: {gb.score(X_test, y_test):.3f}")
# 3. 投票集成(Voting)
clf1 = LogisticRegression(max_iter=200)
clf2 = RandomForestClassifier(n_estimators=100, random_state=42)
clf3 = SVC(probability=True, random_state=42)
voting_clf = VotingClassifier(
estimators=[('lr', clf1), ('rf', clf2), ('svc', clf3)],
voting='soft' # 使用概率投票
)
voting_clf.fit(X_train, y_train)
print(f"投票集成准确率: {voting_clf.score(X_test, y_test):.3f}")
# 4. 堆叠集成(Stacking)
estimators = [
('rf', RandomForestClassifier(n_estimators=100, random_state=42)),
('svc', SVC(probability=True, random_state=42))
]
stacking_clf = StackingClassifier(
estimators=estimators,
final_estimator=LogisticRegression()
)
stacking_clf.fit(X_train, y_train)
print(f"堆叠集成准确率: {stacking_clf.score(X_test, y_test):.3f}")
5.2 XGBoost与LightGBM(工业级工具)
# 需要安装:pip install xgboost lightgbm
try:
from xgboost import XGBClassifier, XGBRegressor
from lightgbm import LGBMClassifier, LGBMRegressor
# XGBoost分类
xgb_clf = XGBClassifier(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
eval_metric='logloss'
)
xgb_clf.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)
print(f"XGBoost准确率: {xgb_clf.score(X_test, y_test):.3f}")
# LightGBM分类
lgb_clf = LGBMClassifier(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42
)
lgb_clf.fit(X_train, y_train)
print(f"LightGBM准确率: {lgb_clf.score(X_test, y_test):.3f}")
# 特征重要性对比
importance_df = pd.DataFrame({
'feature': X.columns,
'xgb_importance': xgb_clf.feature_importances_,
'lgb_importance': lgb_clf.feature_importances_
}).sort_values('xgb_importance', ascending=False)
print("\nXGBoost vs LightGBM 特征重要性:")
print(importance_df.head())
except ImportError:
print("XGBoost/LightGBM未安装,跳过该部分")
5.3 聚类分析
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.metrics import silhouette_score
# 1. K-Means聚类
# 生成数据
X_cluster, _ = make_classification(n_samples=500, n_features=2, n_informative=2,
n_redundant=0, n_clusters_per_class=1, random_state=42)
# 确定最佳K值(肘部法则)
inertias = []
K_range = range(2, 11)
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans.fit(X_cluster)
inertias.append(kmeans.inertia_)
plt.figure(figsize=(10, 6))
plt.plot(K_range, inertias, marker='o')
plt.title('肘部法则确定最佳K值')
plt.xlabel('K值')
plt.ylabel('Inertia')
plt.show()
# 使用K=3
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
clusters = kmeans.fit_predict(X_cluster)
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_cluster[:, 0], X_cluster[:, 1], c=clusters, cmap='viridis', alpha=0.6)
plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1],
s=300, c='red', marker='X', label='Centroids')
plt.title('K-Means聚类结果')
plt.legend()
plt.show()
# 2. DBSCAN(密度聚类)
dbscan = DBSCAN(eps=0.5, min_samples=5)
clusters_dbscan = dbscan.fit_predict(X_cluster)
print(f"DBSCAN发现{len(set(clusters_dbscan)) - (1 if -1 in clusters_dbscan else 0)}个簇")
# 3. 层次聚类
agg = AgglomerativeClustering(n_clusters=3)
clusters_agg = agg.fit_predict(X_cluster)
# 4. 聚类评估
silhouette_kmeans = silhouette_score(X_cluster, clusters)
print(f"K-Means轮廓系数: {silhouette_kmeans:.3f}")
5.4 异常检测
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
# 生成包含异常值的数据
np.random.seed(42)
X_normal = np.random.normal(0, 1, (1000, 2))
X_outliers = np.random.uniform(-4, 4, (50, 2))
X = np.vstack([X_normal, X_outliers])
y_true = np.array([0] * 1000 + [1] * 50) # 0=正常, 1=异常
# 1. 孤立森林
iso_forest = IsolationForest(contamination=0.05, random_state=42)
y_pred_iso = iso_forest.fit_predict(X)
y_pred_iso = [1 if x == -1 else 0 for x in y_pred_iso] # 转换为0/1
# 2. One-Class SVM
oc_svm = OneClassSVM(nu=0.05, kernel='rbf', gamma='scale')
y_pred_svm = oc_svm.fit_predict(X)
y_pred_svm = [1 if x == -1 else 0 for x in y_pred_svm]
# 3. 评估
from sklearn.metrics import classification_report, confusion_matrix
print("孤立森林结果:")
print(classification_report(y_true, y_pred_iso, target_names=['正常', '异常']))
print("\nOne-Class SVM结果:")
print(classification_report(y_true, y_pred_svm, target_names=['正常', '异常']))
# 可视化
plt.figure(figsize=(14, 6))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y_pred_iso, cmap='coolwarm', alpha=0.6)
plt.title('孤立森林异常检测')
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=y_pred_svm, cmap='coolwarm', alpha=0.6)
plt.title('One-Class SVM异常检测')
plt.show()
第六章:数据可视化与仪表板开发
6.1 高级可视化库
import plotly.express as px
import plotly.graph_objects as go
import plotly.offline as pyo
# 1. Plotly交互式图表
# 散点图
fig = px.scatter(df_ecom, x='age', y='revenue', color='city',
size='quantity', hover_data=['product', 'price'],
title='销售散点图(交互式)')
fig.show()
# 2. Plotly高级图表
# 箱线图
fig = px.box(df_ecom, x='product', y='revenue', color='city',
title='产品-城市销售分布')
fig.show()
# 3. 时间序列图
ts_sample = ts_data.head(100)
fig = go.Figure()
fig.add_trace(go.Scatter(x=ts_sample.index, y=ts_sample['sales'],
mode='lines+markers', name='销售额'))
fig.add_trace(go.Scatter(x=ts_sample.index, y=ts_sample['rolling_7d'],
mode='lines', name='7日移动平均',
line=dict(color='red', width=2)))
fig.update_layout(title='销售时间序列(Plotly)', xaxis_title='日期', yaxis_title='销售额')
fig.show()
# 4. 热力图
pivot_sample = df_ecom.pivot_table(values='revenue', index='age', columns='city', aggfunc='mean')
fig = px.imshow(pivot_sample, title='年龄-城市平均销售额热力图')
fig.show()
6.2 Seaborn高级可视化
# 1. 分布图组合
plt.figure(figsize=(12, 8))
sns.set_style("whitegrid")
# 子图1:直方图+KDE
plt.subplot(2, 2, 1)
sns.histplot(df_ecom['revenue'], kde=True, bins=30)
plt.title('收入分布')
# 子图2:箱线图
plt.subplot(2, 2, 2)
sns.boxplot(data=df_ecom, x='product', y='revenue')
plt.title('产品收入箱线图')
plt.xticks(rotation=45)
# 子图3:小提琴图
plt.subplot(2, 2, 3)
sns.violinplot(data=df_ecom, x='city', y='revenue', inner='quartile')
plt.title('城市收入小提琴图')
# 子图4:蜂群图
plt.subplot(2, 2, 4)
sns.swarmplot(data=df_ecom.head(100), x='product', y='revenue', hue='city')
plt.title('蜂群图(前100条)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 2. 关系图矩阵
sns.pairplot(df_ecom[['age', 'revenue', 'quantity', 'price']], diag_kind='kde', corner=True)
plt.suptitle('关系图矩阵', y=1.02)
plt.show()
# 3. 分面网格
g = sns.FacetGrid(df_ecom, col='city', hue='product', col_wrap=3, height=4)
g.map(sns.scatterplot, 'age', 'revenue', alpha=0.6)
g.add_legend()
plt.suptitle('分面散点图', y=1.02)
plt.show()
6.3 仪表板开发(Streamlit)
# 需要安装:pip install streamlit
# 运行:streamlit run your_script.py
"""
# 数据分析仪表板示例
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 页面配置
st.set_page_config(page_title="销售分析仪表板", layout="wide")
# 侧边栏
st.sidebar.header("数据配置")
sample_size = st.sidebar.slider("样本大小", 100, 1000, 500)
show_raw_data = st.sidebar.checkbox("显示原始数据")
# 生成数据
@st.cache_data # 缓存数据
def generate_data(n):
np.random.seed(42)
data = {
'city': np.random.choice(['北京', '上海', '广州', '深圳'], n),
'product': np.random.choice(['手机', '电脑', '耳机'], n),
'sales': np.random.normal(1000, 200, n),
'profit': np.random.normal(200, 50, n)
}
return pd.DataFrame(data)
df = generate_data(sample_size)
# 主标题
st.title("📊 销售分析仪表板")
# 关键指标
col1, col2, col3 = st.columns(3)
with col1:
st.metric("总销售额", f"¥{df['sales'].sum():,.0f}")
with col2:
st.metric("平均利润", f"¥{df['profit'].mean():.0f}")
with col3:
st.metric("订单数", len(df))
# 图表区域
st.subheader("可视化分析")
tab1, tab2, tab3 = st.tabs(["销售分布", "城市对比", "产品分析"])
with tab1:
fig, ax = plt.subplots(figsize=(10, 6))
sns.histplot(data=df, x='sales', kde=True, ax=ax)
st.pyplot(fig)
with tab2:
fig, ax = plt.subplots(figsize=(10, 6))
sns.boxplot(data=df, x='city', y='sales', ax=ax)
st.pyplot(fig)
with tab3:
fig, ax = plt.subplots(figsize=(10, 6))
product_summary = df.groupby('product')['sales'].sum()
product_summary.plot(kind='bar', ax=ax)
st.pyplot(fig)
# 原始数据
if show_raw_data:
st.subheader("原始数据")
st.dataframe(df)
# 数据下载
st.sidebar.download_button(
label="下载数据CSV",
data=df.to_csv(index=False).encode('utf-8'),
file_name="sales_data.csv",
mime="text/csv"
)
"""
6.4 自动化报告生成
# 使用Jupyter Notebook自动生成报告
def generate_analysis_report(df, output_path="analysis_report.html"):
"""
生成HTML分析报告
"""
from nbconvert import HTMLExporter
import nbformat as nbf
# 创建Notebook对象
nb = nbf.v4.new_notebook()
# 添加Markdown单元格
markdown_text = f"""
# 数据分析报告
## 数据概览
- 数据形状: {df.shape}
- 内存使用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB
## 关键指标
- 总记录数: {len(df)}
- 缺失值: {df.isnull().sum().sum()}
- 重复值: {df.duplicated().sum()}
"""
# 添加代码单元格
code_cells = [
"import pandas as pd\nimport matplotlib.pyplot as plt\nimport seaborn as sns\n\n# 加载数据\n# df = pd.read_csv('your_data.csv')",
"df.head()",
"df.describe()",
"plt.figure(figsize=(10, 6))\nsns.heatmap(df.corr(), annot=True, cmap='coolwarm')\nplt.title('相关性热力图')\nplt.show()"
]
nb['cells'] = [
nbf.v4.new_markdown_cell(markdown_text),
nbf.v4.new_code_cell(code_cells[0]),
nbf.v4.new_code_cell(code_cells[1]),
nbf.v4.new_code_cell(code_cells[2]),
nbf.v4.new_code_cell(code_cells[3])
]
# 导出为HTML
with open('temp_notebook.ipynb', 'w') as f:
nbf.write(nb, f)
# 转换为HTML
html_exporter = HTMLExporter()
body, resources = html_exporter.from_filename('temp_notebook.ipynb')
with open(output_path, 'w', encoding='utf-8') as f:
f.write(body)
print(f"报告已生成: {output_path}")
import os
os.remove('temp_notebook.ipynb')
# 使用示例
# generate_analysis_report(df_ecom)
第七章:大数据处理与分布式计算
7.1 Dask:Pandas的分布式替代品
# 需要安装:pip install dask[complete]
try:
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client, LocalCluster
# 启动本地集群
cluster = LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client = Client(cluster)
print(f"Dask Dashboard: {client.dashboard_link}")
# 创建Dask DataFrame
# 模拟大数据
df_large = pd.DataFrame({
'id': range(1000000),
'value': np.random.randn(1000000),
'category': np.random.choice(['A', 'B', 'C'], 1000000)
})
df_large.to_csv('large_data.csv', index=False)
# 使用Dask读取
ddf = dd.read_csv('large_data.csv')
print(f"Dask DataFrame形状: {ddf.shape}")
print(f"分区数: {ddf.npartitions}")
# 延迟计算
result = ddf.groupby('category')['value'].mean()
print("延迟计算对象:", result)
# 触发计算
computed_result = result.compute()
print("计算结果:\n", computed_result)
# 复杂操作
result_complex = ddf[
ddf['value'] > 0
].groupby('category').agg({
'value': ['mean', 'std', 'count']
}).compute()
print("复杂计算结果:\n", result_complex)
# 关闭集群
client.close()
cluster.close()
except ImportError:
print("Dask未安装,跳过该部分")
7.2 PySpark基础
# 需要安装:pip install pyspark
try:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when
# 创建Spark会话
spark = SparkSession.builder \
.appName("DataAnalysis") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 创建DataFrame
data = [(1, "Alice", 25, "Engineer", 80000),
(2, "Bob", 30, "Manager", 100000),
(3, "Charlie", 35, "Director", 150000),
(4, "David", 28, "Engineer", 85000)]
columns = ["id", "name", "age", "title", "salary"]
df_spark = spark.createDataFrame(data, columns)
# 基本操作
df_spark.show()
df_spark.printSchema()
# SQL查询
df_spark.createOrReplaceTempView("employees")
result = spark.sql("""
SELECT title,
AVG(salary) as avg_salary,
COUNT(*) as count
FROM employees
GROUP BY title
ORDER BY avg_salary DESC
""")
result.show()
# DataFrame API
df_processed = df_spark \
.filter(col("age") > 25) \
.groupBy("title") \
.agg(
avg("salary").alias("avg_salary"),
count("*").alias("count")
) \
.orderBy("avg_salary", ascending=False)
df_processed.show()
# 转换为Pandas(小数据量)
if df_processed.count() < 10000:
pandas_df = df_processed.toPandas()
print("转换为Pandas DataFrame:")
print(pandas_df)
spark.stop()
except ImportError:
print("PySpark未安装,跳过该部分")
except Exception as e:
print(f"Spark启动失败: {e}")
7.3 数据库连接与查询
import sqlite3
import sqlalchemy
from sqlalchemy import create_engine, text
# 1. SQLite示例
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
product TEXT,
city TEXT,
revenue REAL,
sale_date DATE
)
""")
# 插入数据
sample_data = [
('手机', '北京', 15000, '2023-01-01'),
('电脑', '上海', 25000, '2023-01-02'),
('耳机', '广州', 8000, '2023-01-03')
]
cursor.executemany("INSERT INTO sales (product, city, revenue, sale_date) VALUES (?, ?, ?, ?)", sample_data)
conn.commit()
# 查询
cursor.execute("SELECT * FROM sales WHERE revenue > 10000")
results = cursor.fetchall()
print("SQLite查询结果:")
for row in results:
print(row)
conn.close()
# 2. SQLAlchemy(支持多种数据库)
# 创建引擎(SQLite)
engine = create_engine('sqlite:///example.db')
# 使用Pandas读取
df_from_db = pd.read_sql("SELECT * FROM sales", engine)
print("\nSQLAlchemy读取结果:")
print(df_from_db)
# 使用Pandas写入
df_ecom_sample = df_ecom.head(100)
df_ecom_sample.to_sql('ecom_sales', engine, if_exists='replace', index=False)
# 使用SQL查询
with engine.connect() as conn:
result = conn.execute(text("SELECT city, AVG(revenue) as avg_rev FROM ecom_sales GROUP BY city"))
print("\nSQL查询结果:")
for row in result:
print(row)
# 3. 连接PostgreSQL(需要安装psycopg2)
# engine = create_engine('postgresql://user:password@localhost:5432/mydb')
第八章:时间序列预测与高级分析
8.1 时间序列预测基础
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')
# 1. 创建时间序列数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
# 包含趋势、季节性和噪声
trend = np.linspace(100, 200, len(dates))
seasonality = 20 * np.sin(2 * np.pi * np.arange(len(dates)) / 12)
noise = np.random.normal(0, 5, len(dates))
sales = trend + seasonality + noise
ts_data = pd.DataFrame({'date': dates, 'sales': sales})
ts_data.set_index('date', inplace=True)
# 2. 划分训练测试集
train_size = int(len(ts_data) * 0.8)
train, test = ts_data.iloc[:train_size], ts_data.iloc[train_size:]
# 3. ARIMA模型
# 自动选择参数(简化版)
def find_best_arima(train, test, p_range=3, d_range=2, q_range=3):
best_score = float('inf')
best_params = None
for p in range(p_range):
for d in range(d_range):
for q in range(q_range):
try:
model = ARIMA(train, order=(p, d, q))
model_fit = model.fit()
forecast = model_fit.forecast(steps=len(test))
mse = mean_squared_error(test, forecast)
if mse < best_score:
best_score = mse
best_params = (p, d, q)
except:
continue
return best_params, best_score
best_params, best_score = find_best_arima(train['sales'], test['sales'])
print(f"最佳ARIMA参数: {best_params}, MSE: {best_score:.2f}")
# 使用最佳参数训练
model = ARIMA(train['sales'], order=best_params)
model_fit = model.fit()
print(model_fit.summary())
# 预测
forecast = model_fit.forecast(steps=len(test))
forecast_df = pd.DataFrame({'forecast': forecast}, index=test.index)
# 评估
mse = mean_squared_error(test['sales'], forecast)
mae = mean_absolute_error(test['sales'], forecast)
print(f"\nARIMA评估:")
print(f"均方误差: {mse:.2f}")
print(f"平均绝对误差: {mae:.2f}")
# 可视化
plt.figure(figsize=(12, 6))
plt.plot(train.index, train['sales'], label='训练集')
plt.plot(test.index, test['sales'], label='真实值', color='orange')
plt.plot(forecast_df.index, forecast_df['forecast'], label='ARIMA预测', color='red', linestyle='--')
plt.title('ARIMA时间序列预测')
plt.legend()
plt.show()
8.2 Prophet:Facebook时间序列预测
# 需要安装:pip install prophet
try:
from prophet import Prophet
# 准备数据(Prophet需要特定格式)
prophet_df = ts_data.reset_index()
prophet_df.columns = ['ds', 'y']
# 训练模型
model_prophet = Prophet(
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False,
changepoint_prior_scale=0.05
)
model_prophet.fit(prophet_df)
# 创建未来数据框
future = model_prophet.make_future_dataframe(periods=12, freq='M')
# 预测
forecast_prophet = model_prophet.predict(future)
# 可视化
fig1 = model_prophet.plot(forecast_prophet)
plt.title('Prophet预测')
plt.show()
fig2 = model_prophet.plot_components(forecast_prophet)
plt.show()
# 评估(在测试集上)
test_prophet = forecast_prophet.tail(len(test))
mse_prophet = mean_squared_error(test['sales'], test_prophet['yhat'])
print(f"Prophet MSE: {mse_prophet:.2f}")
except ImportError:
print("Prophet未安装,跳过该部分")
8.3 高级时间序列分析
# 1. 自相关分析
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plot_acf(ts_data['sales'], lags=24, ax=plt.gca())
plt.title('自相关函数(ACF)')
plt.subplot(2, 1, 2)
plot_pacf(ts_data['sales'], lags=24, ax=plt.gca())
plt.title('偏自相关函数(PACF)')
plt.tight_layout()
plt.show()
# 2. 季节性分解
from statsmodels.tsa.seasonal import seasonal_decompose
decomposition = seasonal_decompose(ts_data['sales'], model='additive', period=12)
fig = decomposition.plot()
fig.set_size_inches(12, 8)
plt.show()
# 3. 平稳性检验(ADF检验)
from statsmodels.tsa.stattools import adfuller
result = adfuller(ts_data['sales'])
print('ADF检验结果:')
print(f'统计量: {result[0]}')
print(f'p值: {result[1]}')
print('临界值:')
for key, value in result[4].items():
print(f' {key}: {value}')
if result[1] < 0.05:
print("序列是平稳的")
else:
print("序列是非平稳的,需要差分")
# 4. 差分处理
ts_data['sales_diff1'] = ts_data['sales'].diff(1)
ts_data['sales_diff12'] = ts_data['sales'].diff(12)
# 5. 协整检验(多变量)
def cointegration_test(df, col1, col2):
from statsmodels.tsa.stattools import coint
score, p_value, _ = coint(df[col1], df[col2])
print(f"协整检验 p值: {p_value:.4f}")
if p_value < 0.05:
print(f"{col1} 和 {col2} 存在协整关系")
else:
print(f"{col1} 和 {col2} 不存在协整关系")
# 示例:如果有多个时间序列
# cointegration_test(ts_data, 'sales', 'sales_diff1')
第九章:机器学习工程化与部署
9.1 模型持久化
import joblib
import pickle
import os
# 1. 使用joblib(推荐用于sklearn模型)
def save_model_joblib(model, filename):
"""保存模型"""
joblib.dump(model, filename)
print(f"模型已保存: {filename}")
def load_model_joblib(filename):
"""加载模型"""
if os.path.exists(filename):
model = joblib.load(filename)
print(f"模型已加载: {filename}")
return model
else:
print("模型文件不存在")
return None
# 训练并保存模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
save_model_joblib(model, 'random_forest_model.joblib')
# 加载并使用
loaded_model = load_model_joblib('random_forest_model.joblib')
if loaded_model:
predictions = loaded_model.predict(X_test)
print(f"加载模型准确率: {accuracy_score(y_test, predictions):.3f}")
# 2. 使用pickle(通用)
def save_model_pickle(model, filename):
with open(filename, 'wb') as f:
pickle.dump(model, f)
print(f"模型已保存: {filename}")
def load_model_pickle(filename):
with open(filename, 'rb') as f:
model = pickle.load(f)
return model
# 3. 保存预处理管道
pipeline = Pipeline([
('scaler', StandardScaler()),
('model', RandomForestClassifier())
])
pipeline.fit(X_train, y_train)
save_model_joblib(pipeline, 'full_pipeline.joblib')
# 4. 模型版本管理
def save_model_versioned(model, prefix='model', version='1.0'):
"""版本化保存模型"""
filename = f"{prefix}_v{version}.joblib"
save_model_joblib(model, filename)
return filename
# 保存多个版本
save_model_versioned(model, prefix='rf', version='1.0')
save_model_versioned(model, prefix='rf', version='1.1')
9.2 模型API开发(FastAPI)
# 需要安装:pip install fastapi uvicorn
"""
# 保存为 api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import pandas as pd
app = FastAPI(title="机器学习模型API", version="1.0")
# 加载模型
model = joblib.load('random_forest_model.joblib')
# 定义输入数据模型
class PredictionInput(BaseModel):
features: list[float]
feature_names: list[str] = None
# 定义输出模型
class PredictionOutput(BaseModel):
prediction: int
probability: float
@app.get("/")
def read_root():
return {"message": "机器学习模型API", "version": "1.0"}
@app.post("/predict", response_model=PredictionOutput)
def predict(input_data: PredictionInput):
try:
# 转换为DataFrame
if input_data.feature_names:
X = pd.DataFrame([input_data.features], columns=input_data.feature_names)
else:
X = np.array([input_data.features])
# 预测
prediction = model.predict(X)[0]
probability = model.predict_proba(X)[0][prediction]
return PredictionOutput(prediction=int(prediction), probability=float(probability))
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict_batch")
def predict_batch(inputs: list[PredictionInput]):
try:
features_list = []
for input_data in inputs:
features_list.append(input_data.features)
X = np.array(features_list)
predictions = model.predict(X)
probabilities = model.predict_proba(X)
results = []
for i, (pred, prob) in enumerate(zip(predictions, probabilities)):
results.append({
"index": i,
"prediction": int(pred),
"probability": float(prob[pred])
})
return {"predictions": results}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 运行命令: uvicorn api:app --reload
"""
9.3 模型监控与评估
# 1. 持续评估指标
def calculate_drift_metrics(old_data, new_data, threshold=0.05):
"""
检测数据漂移
"""
from scipy import stats
drift_results = {}
for col in old_data.columns:
if old_data[col].dtype in [np.number]:
# KS检验
ks_stat, p_value = stats.ks_2samp(old_data[col], new_data[col])
drift_results[col] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'drift': p_value < threshold
}
return drift_results
# 2. 模型性能监控
def monitor_model_performance(y_true, y_pred, y_proba, threshold=0.8):
"""
监控模型性能指标
"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
metrics = {
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, average='weighted'),
'recall': recall_score(y_true, y_pred, average='weighted'),
'f1': f1_score(y_true, y_pred, average='weighted')
}
# 检查是否需要重新训练
needs_retraining = metrics['accuracy'] < threshold
return metrics, needs_retraining
# 3. 日志记录
import logging
logging.basicConfig(
filename='model_monitoring.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def log_prediction(features, prediction, probability):
logging.info(f"Features: {features}, Prediction: {prediction}, Probability: {probability}")
# 4. A/B测试框架
class ABTestFramework:
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results = {'A': [], 'B': []}
def predict(self, X):
import random
if random.random() < self.traffic_split:
model = self.model_a
version = 'A'
else:
model = self.model_b
version = 'B'
pred = model.predict(X)
prob = model.predict_proba(X)
self.results[version].append({
'prediction': pred[0],
'probability': prob[0][pred[0]]
})
return pred, prob, version
def get_stats(self):
stats = {}
for version in ['A', 'B']:
if self.results[version]:
avg_prob = np.mean([r['probability'] for r in self.results[version]])
stats[version] = {
'count': len(self.results[version]),
'avg_probability': avg_prob
}
return stats
# 使用示例
# ab_test = ABTestFramework(model_v1, model_v2)
# pred, prob, version = ab_test.predict(X_test[:1])
第十章:职业发展路径与实战项目
10.1 数据分析师职业路径
阶段1:初级数据分析师(0-2年)
- 核心技能:
- 精通SQL和Excel
- 掌握Python基础(Pandas、NumPy)
- 数据可视化(Matplotlib、Seaborn)
- 统计学基础(描述统计、假设检验)
- 典型工作:
- 制作日报/周报
- 基础数据提取和清洗
- 描述性分析报告
- 薪资范围:8-15万/年(国内)
阶段2:中级数据分析师(2-5年)
- 新增技能:
- 掌握机器学习基础(Scikit-learn)
- A/B测试设计
- 因果推断
- 数据仓库基础
- 典型工作:
- 深入业务分析
- 构建分析框架
- 预测模型开发
- 薪资范围:15-30万/年
阶段3:高级数据分析师/数据科学家(5年+)
- 新增技能:
- 高级统计建模
- 深度学习
- 大数据技术(Spark、Hadoop)
- 工程化能力(API开发、模型部署)
- 典型工作:
- 复杂建模
- 战略级分析
- 技术架构设计
- 薪资范围:30-60万/年,部分可达80万+
10.2 数据科学家核心能力矩阵
| 能力维度 | 初级 | 中级 | 高级 | 专家 |
|---|---|---|---|---|
| 编程能力 | Python基础 | 熟练Pandas | 熟练Spark | 系统架构 |
| 统计学 | 描述统计 | 假设检验 | 贝叶斯方法 | 理论创新 |
| 机器学习 | 基础算法 | 熟练调参 | 集成学习 | 深度学习 |
| 业务理解 | 理解指标 | 洞察驱动 | 战略思维 | 商业闭环 |
| 工程能力 | Jupyter | Git/Docker | CI/CD | MLOps |
| 可视化 | 基础图表 | 交互式仪表板 | 自动化报告 | 数据产品 |
10.3 实战项目推荐
项目1:电商用户行为分析
- 技术栈:Pandas + Seaborn + SQL
- 数据:用户浏览、购买、评价数据
- 输出:用户画像、RFM模型、转化漏斗
- 亮点:业务价值明确,适合简历
项目2:销售预测系统
- 技术栈:Prophet/XGBoost + Streamlit + Docker
- 数据:历史销售数据
- 输出:预测模型 + 交互式仪表板
- 亮点:端到端项目,展示工程能力
项目3:用户流失预测
- 技术栈:Scikit-learn + 特征工程 + 模型解释
- 数据:用户行为数据
- 输出:流失预测模型 + 特征重要性分析
- 亮点:分类问题,业务价值高
项目4:实时推荐系统
- 技术栈:PySpark + Kafka + Flask/FastAPI
- 数据:用户行为日志
- 输出:实时推荐API
- 亮点:大数据处理,实时计算
项目5:异常检测系统
- 技术栈:Isolation Forest + DBSCAN + 可视化
- 数据:监控数据、交易数据
- 输出:异常检测模型 + 告警系统
- 亮点:算法深度,工程实践
10.4 简历与面试准备
简历要点:
- 量化成果:不要写”优化了模型”,要写”将模型准确率从85%提升到92%,减少业务损失200万/年”
- 项目结构:STAR法则(情境、任务、行动、结果)
- 技术关键词:根据JD调整,突出匹配技能
- GitHub:展示代码质量,有README和文档
面试准备:
技术面试:
- SQL手写(窗口函数、CTE)
- Python手写(Pandas操作、算法题)
- 统计学(假设检验、置信区间)
- 机器学习(过拟合、特征工程、模型评估)
业务面试:
- A/B测试设计
- 指标体系搭建
- 异常分析思路
- 业务场景建模
行为面试:
- 项目难点与解决
- 跨部门协作
- 持续学习能力
10.5 持续学习资源
在线课程:
- Coursera: Andrew Ng机器学习
- Kaggle Learn: 免费实战课程
- DataCamp: 交互式编程
书籍推荐:
- 《利用Python进行数据分析》
- 《Python数据科学手册》
- 《统计学习方法》
- 《机器学习》(周志华)
社区与竞赛:
- Kaggle: 参加比赛,学习优秀方案
- 天池大赛: 国内竞赛平台
- Kaggle Discussion: 学习讨论
- GitHub: 关注热门项目
技术博客:
- Towards Data Science
- 机器之心
- 量子位
- 个人博客(如李沐、吴恩达)
10.6 薪资谈判技巧
市场调研:
- 使用Glassdoor、拉勾网、Boss直聘调研目标公司薪资
- 了解行业平均薪资水平
- 考虑城市差异(一线城市高30-50%)
谈判策略:
- 不要先报价:让HR先给范围
- 基于价值:强调你能为公司创造的价值
- 总包概念:base + 奖金 + 股票 + 福利
- 备选方案:有其他offer时更有议价权
- 职业发展:考虑长期成长而非短期薪资
常见误区:
- 只看base,忽略奖金和股票
- 忽视公司成长性
- 过早暴露底线
- 缺乏备选方案
附录:常用工具与快捷键
A.1 Jupyter Notebook快捷键
- Shift + Enter: 运行单元格
- A: 在上方插入单元格
- B: 在下方插入单元格
- D + D: 删除单元格
- M: 切换到Markdown模式
- Y: 切换到代码模式
- H: 显示快捷键帮助
A.2 Pandas常用操作速查
# 读取数据
df = pd.read_csv('file.csv')
df = pd.read_excel('file.xlsx')
df = pd.read_json('file.json')
# 查看数据
df.head()
df.info()
df.describe()
df.shape
df.dtypes
# 选择数据
df['column'] # 单列
df[['col1', 'col2']] # 多列
df.iloc[0:5, 0:3] # 位置索引
df.loc[0:5, ['col1', 'col2']] # 标签索引
# 筛选
df[df['col'] > 100]
df.query('col > 100')
df[df['col'].isin([1, 2, 3])]
# 处理缺失值
df.dropna()
df.fillna(value)
df.interpolate()
# 分组聚合
df.groupby('col')['value'].agg(['mean', 'sum', 'count'])
# 合并
pd.concat([df1, df2])
pd.merge(df1, df2, on='key', how='inner')
# 时间序列
df['date'] = pd.to_datetime(df['date'])
df.resample('M').sum()
df.rolling(7).mean()
# 处理文本
df['col'].str.upper()
df['col'].str.contains('pattern')
df['col'].str.replace('old', 'new')
# 处理类别
df['col'] = df['col'].astype('category')
pd.get_dummies(df, columns=['col'])
A.3 常用库安装命令
# 基础环境
conda create -n data_analysis python=3.10
conda activate data_analysis
# 核心库
conda install pandas numpy matplotlib seaborn scikit-learn
pip install jupyterlab
# 高级分析
pip install statsmodels
pip install xgboost lightgbm
pip install plotly
pip install streamlit
# 大数据
pip install dask[complete]
pip install pyspark
# 部署
pip install fastapi uvicorn
pip install joblib
# 开发工具
pip install black flake8 pytest
A.4 代码风格与最佳实践
# 1. 使用函数封装重复逻辑
def load_and_clean_data(filepath):
"""加载并清洗数据"""
df = pd.read_csv(filepath)
df = df.dropna(subset=['key_column'])
df['date'] = pd.to_datetime(df['date'])
return df
# 2. 使用类型提示
from typing import List, Optional, Dict
import numpy as np
import pandas as pd
def calculate_metrics(df: pd.DataFrame,
target_col: str,
group_col: Optional[str] = None) -> Dict[str, float]:
"""计算指标"""
if group_col:
result = df.groupby(group_col)[target_col].mean().to_dict()
else:
result = {target_col: df[target_col].mean()}
return result
# 3. 使用配置文件
import yaml
def load_config(config_path='config.yaml'):
with open(config_path, 'r') as f:
return yaml.safe_load(f)
# config.yaml
# data:
# filepath: "data.csv"
# test_size: 0.2
# model:
# random_state: 42
# 4. 使用日志
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def process_data(df):
logger.info(f"Processing data with shape {df.shape}")
# ... processing
logger.info("Processing complete")
return df
# 5. 使用if __name__ == "__main__"
if __name__ == "__main__":
# 只在直接运行脚本时执行
main()
结语
本课程从Python基础到高级机器学习,从数据处理到模型部署,涵盖了数据分析师/科学家的完整知识体系。记住,理论学习 + 项目实践 + 持续学习 是成长的关键。
下一步行动:
- 立即开始:选择一个项目,用本课程的代码模板开始实践
- 建立习惯:每天至少1小时编码,每周至少1个Kaggle Notebook
- 社区参与:在GitHub上贡献代码,在Kaggle上分享方案
- 职业规划:根据当前水平,制定6个月学习计划
最后建议:
- 不要追求完美,先完成再优化
- 代码能跑通是第一步,然后是效率,最后是优雅
- 业务理解比技术更重要
- 保持好奇心,持续学习新技术
祝你在数据分析的道路上越走越远,成为真正的数据驱动决策者!
