引言:为什么需要Python数据分析进阶技能?

在当今数据驱动的商业环境中,Python已经成为数据分析领域的标准工具。然而,许多初学者在掌握了基础的Pandas和Matplotlib后,往往在面对真实工作场景时感到力不从心。本课程将带你从入门到精通,掌握高级技巧,解决实际工作难题,从而显著提升职场竞争力。

学习目标

  • 掌握Python数据分析的核心高级技巧
  • 学会处理大规模数据集和复杂数据结构
  • 提升数据清洗、转换和可视化的效率
  • 学会使用高级库解决实际业务问题
  • 构建可复用的数据分析工作流

第一部分:Pandas高级操作与性能优化

1.1 高效处理大规模数据集

向量化操作 vs 循环

在处理大规模数据时,向量化操作是提升性能的关键。向量化操作利用了底层的C语言实现,比Python循环快得多。

错误示例(使用循环):

import pandas as pd
import numpy as np

# 创建一个包含100万行的DataFrame
df = pd.DataFrame({'A': np.random.randint(0, 100, 1000000),
                   'B': np.random.randint(0, 100, 1000000)})

# 使用循环计算两列之和(慢)
def sum_with_loop(df):
    result = []
    for i in range(len(df)):
        result.append(df['A'].iloc[i] + df['B'].iloc[i])
    return result

# 这将非常慢,可能需要几秒钟甚至更长时间
# sum_with_loop(df)

正确示例(向量化操作):

# 使用向量化操作计算两列之和(快)
df['sum'] = df['A'] + df['B']

# 或者使用NumPy的向量化函数
df['sum'] = np.add(df['A'], df['B'])

使用query()方法进行高效过滤

query()方法可以使用字符串表达式进行过滤,比传统的布尔索引更简洁且性能更好。

# 传统方法
filtered_df = df[(df['A'] > 50) & (df['B'] < 80)]

# 使用query方法
filtered_df = df.query('A > 50 and B < 100')

使用eval()进行高效计算

eval()方法可以使用字符串表达式进行计算,避免创建中间DataFrame。

# 传统方法(创建中间DataFrame)
df['C'] = df['A'] + df['B']
df['D'] = df['A'] - df['B']
df['E'] = df['C'] * df['D']

# 使用eval方法(避免中间DataFrame)
df.eval('''
    C = A + B
    D = A - B
    E = C * D
''', inplace=True)

1.2 内存优化技巧

选择合适的数据类型

Pandas默认使用64位数据类型,但很多时候我们可以使用更小的类型来节省内存。

# 查看当前内存使用情况
print(df.info())

# 优化数据类型
df['A'] = df['A'].astype('int32')  # 从int64转换为int32
df['B'] = df['B'].astype('int32')

# 对于分类数据,使用category类型
df['category'] = pd.Categorical(['A', 'B', 'C'] * 333333)

# 对于浮点数,如果精度要求不高,可以使用float32
df['float_col'] = df['A'].astype('float32')

使用select_dtypes()批量转换

# 批量转换整数列
int_cols = df.select_dtypes(include=['int64']).columns
df[int_cols] = df[int_cols].astype('int32')

# 批量转换浮点列
float_cols = df.select_dtypes(include=['float64']).columns
df[float_cols] = df[float_cols].astype('float32')

1.3 高级数据合并与连接

多条件合并

# 创建示例数据
left = pd.DataFrame({'key': ['A', 'B', 'C', 'D'],
                     'value': [1, 2, 3, 4],
                     'category': ['X', 'Y', 'X', 'Y']})

right = pd.DataFrame({'key': ['A', 'B', 'C', 'D'],
                      'value': [5, 6, 7, 8],
                      'category': ['X', 'Y', 'X', 'Z']})

# 多条件合并
merged = pd.merge(left, right, on=['key', 'category'], how='inner')
print(merged)

使用join()进行索引合并

# 设置索引
left.set_index('key', inplace=True)
right.set_index('key', inplace=True)

# 使用join合并
joined = left.join(right, lsuffix='_left', rsuffix='_right')
print(joined)

使用concat()进行纵向合并

# 创建多个DataFrame
df1 = pd.DataFrame({'A': [1, 2], 'B': [3, 4]})
df2 = pd.DataFrame({'A': [5, 6], 'B': [7, 8]})
df3 = pd.DataFrame({'A': [9, 10], 'B': [11, 12]})

# 纵向合并
combined = pd.concat([df1, df2, df3], ignore_index=True)
print(combined)

第二部分:高级数据清洗与转换

2.1 处理复杂缺失值

多策略填充

# 创建包含复杂缺失值的DataFrame
df = pd.DataFrame({
    'A': [1, 2, np.nan, 4, 5],
    'B': [np.nan, 2, 3, np.nan, 5],
    'C': [1, 2, 3, 4, 5],
    'D': ['cat', 'dog', np.nan, 'bird', 'fish']
})

# 1. 使用不同策略填充不同列
df['A'].fillna(df['A'].mean(), inplace=True)  # 数值列用均值填充
df['B'].fillna(method='ffill', inplace=True)  # 用前一个值填充
df['D'].fillna('unknown', inplace=True)  # 分类列用特定值填充

# 2. 使用插值方法
df['A'] = df['A'].interpolate(method='linear')  # 线性插值

# 3. 使用KNN填充(需要sklearn)
from sklearn.impute import KNNImputer
imputer = KNNImputer(n_neighbors=2)
df[['A', 'B', 'C']] = imputer.fit_transform(df[['A', 'B', 'C']])

2.2 高级数据转换

使用transform()进行分组转换

# 创建示例数据
df = pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'C', 'C'],
    'value': [10, 20, 30, 40, 50, 60]
})

# 计算每组的均值并减去均值(中心化)
df['value_centered'] = df.groupby('group')['value'].transform(lambda x: x - x.mean())

# 计算每组的百分比
df['value_pct'] = df.groupby('group')['value'].transform(lambda x: x / x.sum() * 100)

print(df)

使用apply()进行复杂转换

# 复杂条件转换
def complex_transform(row):
    if row['value'] > 30:
        return 'High'
    elif row['value'] > 15:
        'Medium'
    else:
        return 'Low'

df['category'] = df.apply(complex_transform, axis=1)

2.3 时间序列处理

时间特征提取

# 创建时间序列数据
dates = pd.date_range('2023-01-01', periods=10, freq='D')
df = pd.DataFrame({
    'date': dates,
    'value': np.random.randint(10, 100, 10)
})

# 提取时间特征
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.weekday  # 0=周一, 6=周日
df['is_weekend'] = df['weekday'].isin([5, 6])
df['quarter'] = df['date'].dt.quarter
df['day_of_year'] = df['date'].dt.dayofyear

# 计算时间差
df['days_since_start'] = (df['date'] - df['date'].min()).dt.days

print(df)

时间重采样

# 创建分钟级数据
minute_df = pd.DataFrame({
    'datetime': pd.date_range('2023-01-01', periods=100, freq='T'),
    'value': np.random.randint(10, 100, 100)
})
minute_df.set_index('datetime', inplace=True)

# 重采样为小时数据
hourly = minute_df.resample('H').mean()

# 重采样为天数据
daily = minute_df.resample('D').sum()

# 重采样为周数据,使用自定义的周起始
weekly = minute_df.resample('W-MON').agg({'value': ['mean', 'sum', 'std']})

第三部分:高级数据可视化

3.1 Matplotlib高级技巧

子图网格与复杂布局

import matplotlib.pyplot as plt
import seaborn as sns

# 创建复杂布局
fig = plt.figure(figsize=(12, 8))

# 创建子图网格
gs = fig.add_gridspec(3, 3)

# 添加不同大小的子图
ax1 = fig.add_subplot(gs[0, :])  # 第一行,占满
ax2 = fig.add_subplot(gs[1, 0])  # 第二行第一列
ax3 = fig.add_subplot(gs[1, 1:])  # 第二行,后两列
ax4 = fig.add_subplot(gs[2, :2])  # 第三行,前两列
ax5 = fig.add_subplot(gs[2, 2])  # 第三行第三列

# 绘制数据
data = np.random.randn(100, 4)
ax1.plot(data[:, 0])
ax2.scatter(data[:, 1], data[:, 2])
ax3.hist(data[:, 3], bins=20)
ax4.boxplot(data)
ax5.violinplot(data)

plt.tight_layout()
plt.show()

自定义颜色映射

from matplotlib.colors import LinearSegmentedColormap

# 创建自定义颜色映射
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7']
custom_cmap = LinearSegmentedColormap.from_list('custom', colors)

# 使用自定义颜色映射
plt.imshow(np.random.rand(10, 10), cmap=custom_cmap)
plt.colorbar()
plt.show()

3.2 Seaborn高级可视化

复杂关系图

# 创建示例数据
tips = sns.load_dataset('tips')
tips['tip_pct'] = tips['tip'] / tips['total_bill'] * 100

# 1. PairGrid用于自定义关系图
g = sns.PairGrid(tips, vars=['total_bill', 'tip', 'tip_pct'], hue='time')
g.map_upper(sns.scatterplot)
g.map_lower(sns.kdeplot, fill=True)
g.map_diag(sns.histplot, kde=True)
g.add_legend()

# 2. 复杂热力图
corr = tips[['total_bill', 'tip', 'size']].corr()
mask = np.triu(np.ones_like(corr, dtype=bool))
sns.heatmap(corr, mask=mask, annot=True, cmap='coolwarm', center=0)
plt.show()

分类数据可视化

# 创建分类数据
data = pd.DataFrame({
    'category': ['A', 'B', 'C'] * 100,
    'subcat': ['X', 'Y'] * 150,
    'value': np.random.randn(300)
})

# 使用catplot进行复杂分类可视化
sns.catplot(data=data, x='category', y='value', hue='subcat',
            kind='box', height=5, aspect=2)
plt.show()

3.3 交互式可视化

Plotly基础

import plotly.express as px
import plotly.graph_objects as go

# 创建交互式散点图
fig = px.scatter(
    tips,
    x='total_bill',
    y='tip',
    color='time',
    size='size',
    hover_data=['day', 'sex'],
    title='小费与账单金额关系',
    template='plotly_white'
)

fig.show()

# 创建交互式折线图
fig = go.Figure()
fig.add_trace(go.Scatter(
    x=df['date'],
    y=df['value'],
    mode='lines+markers',
    name='数值趋势',
    hovertemplate='%{x}<br>数值: %{y}<extra></extra>'
))

fig.update_layout(
    title='时间序列趋势',
    xaxis_title='日期',
    yaxis_title='数值',
    hovermode='x unified'
)

fig.show()

高级Plotly图表

# 创建复杂的组合图表
fig = go.Figure()

# 添加柱状图
fig.add_trace(go.Bar(
    x=df['date'],
    y=df['value'],
    name='每日数值',
    marker_color='rgba(55, 83, 109, 0.7)'
))

# 添加折线图(移动平均)
df['MA7'] = df['value'].rolling(window=7).mean()
fig.add_trace(go.Scatter(
    x=df['date'],
    y=df['MA7'],
    mode='lines',
    name='7日移动平均',
    line=dict(color='red', width=3)
))

# 添加范围滑块和选择器
fig.update_layout(
    xaxis=dict(
        rangeselector=dict(
            buttons=list([
                dict(count=7, label="1w", step="day", stepmode="backward"),
                dict(count=1, label="1m", step="month", stepmode="backward"),
                dict(step="all")
            ])
        ),
        rangeslider=dict(visible=True),
        type="date"
    )
)

fig.show()

第四部分:高级数据处理技巧

4.1 处理大数据集

分块处理(Chunking)

# 处理大CSV文件(无法一次性读入内存)
chunk_size = 100000
results = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 对每个块进行处理
    processed = chunk.groupby('category').agg({
        'value': ['mean', 'sum', 'count']
    })
    results.append(processed)

# 合并结果
final_result = pd.concat(results).groupby(level=0).sum()

使用Dask进行并行处理

import dask.dataframe as dd

# 创建Dask DataFrame(类似于Pandas,但可以并行处理)
ddf = dd.read_csv('large_file.csv')

# 执行计算(惰性求值)
result = ddf.groupby('category').value.mean().compute()  # compute()触发实际计算

# 处理大于内存的数据集
ddf = dd.read_csv('large_file.csv', blocksize=25e6)  # 25MB一块

# 复杂操作
result = (ddf[ddf['value'] > 100]
          .groupby('category')
          .agg({'value': ['mean', 'sum', 'count']})
          .compute())

4.2 高级字符串处理

正则表达式应用

# 创建包含文本数据的DataFrame
text_df = pd.DataFrame({
    'email': ['user1@example.com', 'user2@test.org', 'user3@company.co.uk'],
    'phone': ['(123) 456-7890', '123.456.7890', '123-456-7890'],
    'address': ['123 Main St, City, ST 12345', '456 Oak Ave, Town, ST 67890']
})

# 提取域名
text_df['domain'] = text_df['email'].str.extract(r'@(.+)')

# 标准化电话号码
text_df['phone_clean'] = text_df['phone'].str.replace(r'[^\d]', '', regex=True)

# 提取邮政编码
text_df['zipcode'] = text_df['address'].str.extract(r'(\d{5})')

# 复杂模式匹配
text_df['is_valid_email'] = text_df['email'].str.match(r'^[\w\.-]+@[\w\.-]+\.\w+$')

print(text_df)

文本向量化

from sklearn.feature_extraction.text import TfidfVectorizer

# 创建文本数据
documents = [
    "Python is great for data analysis",
    "Pandas is a powerful library",
    "Data analysis requires Python skills",
    "Machine learning with Python"
]

# 创建TF-IDF向量化器
vectorizer = TfidfVectorizer(max_features=100, stop_words='english')
X = vectorizer.fit_transform(documents)

# 转换为DataFrame查看
tfidf_df = pd.DataFrame(X.toarray(), columns=vectorizer.get_feature_names_out())
print(tfidf_df)

4.3 高级分组与聚合

多级索引操作

# 创建多级索引数据
arrays = [['A', 'A', 'B', 'B', 'C', 'C'],
          ['one', 'two', 'one', 'two', 'one', 'two']]
index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
df = pd.DataFrame({'value': [10, 20, 30, 40, 50, 60]}, index=index)

# 多级索引查询
print(df.loc[('A', 'one')])  # 查询特定元素
print(df.loc['A'])  # 查询第一级
print(df.xs('two', level='second'))  # 跨选择

# 多级索引聚合
print(df.groupby(level=0).sum())  # 按第一级分组
print(df.groupby(level=1).mean())  # 按第二级分组

自定义聚合函数

# 创建示例数据
df = pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'C', 'C'],
    'value': [10, 20, 30, 40, 50, 60],
    'weight': [1, 2, 1, 2, 1, 2]
})

# 定义自定义聚合函数
def weighted_mean(x, w):
    return np.average(x, weights=w)

# 使用apply进行复杂分组计算
result = df.groupby('group').apply(
    lambda x: pd.Series({
        'weighted_mean': weighted_mean(x['value'], x['weight']),
        'std': x['value'].std(),
        'count': x['value'].count()
    })
)

print(result)

第五部分:实际工作场景解决方案

5.1 销售数据分析案例

场景描述

假设你是一家零售公司的数据分析师,需要分析销售数据,找出最佳销售员、最畅销产品,并预测下季度销售趋势。

# 创建模拟销售数据
np.random.seed(42)
sales_data = pd.DataFrame({
    'date': pd.date_range('2023-01-01', periods=365, freq='D'),
    'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie', 'David'], 365),
    'product': np.random.choice(['Laptop', 'Phone', 'Tablet', 'Monitor'], 365),
    'quantity': np.random.randint(1, 10, 365),
    'unit_price': np.random.randint(500, 2000, 365)
})

# 计算总销售额
sales_data['total_sales'] = sales_data['quantity'] * sales_data['unit_price']

# 1. 最佳销售员分析
top_salesperson = sales_data.groupby('salesperson')['total_sales'].sum().sort_values(ascending=False)
print("最佳销售员:")
print(top_salesperson)

# 2. 最畅销产品
top_product = sales_data.groupby('product')['quantity'].sum().sort_values(ascending=False)
print("\n最畅销产品:")
print(top_product)

# 3. 月度销售趋势
sales_data['month'] = sales_data['date'].dt.month
monthly_sales = sales_data.groupby('month')['total_sales'].sum()
print("\n月度销售趋势:")
print(monthly_sales)

# 4. 销售员-产品矩阵
pivot_table = pd.pivot_table(sales_data, 
                             values='total_sales', 
                             index='salesperson', 
                             columns='product', 
                             aggfunc='sum',
                             fill_value=0)
print("\n销售员-产品矩阵:")
print(pivot_table)

# 5. 预测下季度销售(简单移动平均)
sales_data['MA7'] = sales_data['total_sales'].rolling(window=7).mean()
sales_data['MA30'] = sales_data['total_sales'].rolling(window=30).mean()
last_30_days_avg = sales_data['MA30'].iloc[-1]
predicted_next_month = last_30_days_avg * 30  # 简单预测
print(f"\n下月预测销售额:{predicted_next_month:.2f}")

5.2 用户行为分析案例

场景描述

分析用户在网站上的行为数据,找出高价值用户,分析用户流失原因。

# 创建模拟用户行为数据
user_data = pd.DataFrame({
    'user_id': np.random.randint(1000, 2000, 1000),
    'session_date': pd.date_range('2023-01-01', periods=1000, freq='H'),
    'page_views': np.random.randint(1, 20, 1000),
    'time_on_site': np.random.randint(10, 600, 1000),
    'purchases': np.random.randint(0, 5, 1000),
    'purchase_amount': np.random.randint(0, 500, 1000)
})

# 1. RFM分析(最近购买时间、购买频率、购买金额)
# 计算每个用户的RFM指标
user_rfm = user_data.groupby('user_id').agg({
    'session_date': lambda x: (pd.Timestamp.now() - x.max()).days,  # Recency
    'user_id': 'count',  # Frequency
    'purchase_amount': 'sum'  # Monetary
}).rename(columns={
    'session_date': 'recency',
    'user_id': 'frequency',
    'purchase_amount': 'monetary'
})

# RFM评分(1-5分)
user_rfm['R_score'] = pd.qcut(user_rfm['recency'], 5, labels=[5,4,3,2,1])  # 越小越好
user_rfm['F_score'] = pd.qcut(user_rfm['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
user_rfm['M_score'] = pd.qcut(user_rfm['monetary'], 5, labels=[1,2,3,4,5])

# 计算RFM总分
user_rfm['RFM_score'] = user_rfm['R_score'].astype(str) + user_rfm['F_score'].astype(str) + user_rfm['M_score'].astype(str)

# 2. 用户分群
def segment_user(row):
    score = int(row['RFM_score'])
    if score >= 555:
        return 'VIP'
    elif score >= 444:
        return 'High Value'
    elif score >= 333:
        return 'Medium Value'
    else:
        return 'Low Value'

user_rfm['segment'] = user_rfm.apply(segment_user, axis=1)

print("用户分群结果:")
print(user_rfm['segment'].value_counts())

# 3. 分析用户流失(长时间未访问)
current_date = pd.Timestamp.now()
user_data['days_since_last_visit'] = (current_date - user_data['session_date']).dt.days
churned_users = user_data[user_data['days_since_last_visit'] > 30]['user_id'].unique()
print(f"\n流失用户数量:{len(churned_users)}")

# 4. 高价值用户特征分析
vip_users = user_rfm[user_rfm['segment'] == 'VIP'].index
vip_behavior = user_data[user_data['user_id'].isin(vip_users)].agg({
    'page_views': 'mean',
    'time_on_site': 'mean',
    'purchases': 'mean'
})
print("\nVIP用户平均行为:")
print(vip_behavior)

5.3 A/B测试分析案例

场景描述

分析网站改版A/B测试结果,判断新版本是否优于旧版本。

# 创建模拟A/B测试数据
np.random.seed(42)
ab_test_data = pd.DataFrame({
    'user_id': range(10000),
    'group': np.random.choice(['control', 'treatment'], 10000, p=[0.5, 0.5]),
    'converted': 0
})

# 模拟转化率:对照组2%,实验组2.5%
control_mask = ab_test_data['group'] == 'control'
treatment_mask = ab_test_data['group'] == 'treatment'

ab_test_data.loc[control_mask, 'converted'] = np.random.binomial(1, 0.02, control_mask.sum())
ab_test_data.loc[treatment_mask, 'converted'] = np.random.binomial(1, 0.025, treatment_mask.sum())

# 1. 基础统计
summary = ab_test_data.groupby('group')['converted'].agg(['count', 'sum', 'mean'])
print("A/B测试结果摘要:")
print(summary)

# 2. 统计显著性检验(卡方检验)
from scipy.stats import chi2_contingency

# 创建列联表
contingency_table = pd.crosstab(ab_test_data['group'], ab_test_data['converted'])
chi2, p_value, dof, expected = chi2_contingency(contingency_table)

print(f"\n卡方检验结果:")
print(f"Chi2: {chi2:.4f}")
print(f"P-value: {p_value:.4f}")
print(f"显著性水平: 0.05")
print(f"结果: {'显著' if p_value < 0.05 else '不显著'}")

# 3. 计算提升度和置信区间
control_rate = summary.loc['control', 'mean']
treatment_rate = summary.loc['treatment', 'mean']
uplift = (treatment_rate - control_rate) / control_rate * 100

# 计算置信区间(使用正态近似)
from scipy.stats import norm
import math

def confidence_interval(successes, total, confidence=0.95):
    p = successes / total
    z = norm.ppf(1 - (1 - confidence) / 2)
    se = math.sqrt(p * (1 - p) / total)
    return (p - z * se, p + z * se)

control_ci = confidence_interval(
    summary.loc['control', 'sum'], 
    summary.loc['control', 'count']
)
treatment_ci = confidence_interval(
    summary.loc['treatment', 'sum'], 
    summary.loc['treatment', 'count']
)

print(f"\n转化率提升: {uplift:.2f}%")
print(f"对照组95%置信区间: [{control_ci[0]:.4f}, {control_ci[1]:.4f}]")
print(f"实验组95%置信区间: [{treatment_ci[0]:.4f}, {treatment_ci[1]:.4f}]")

# 4. 功效分析(需要statsmodels)
try:
    from statsmodels.stats.power import zt_ind_solve_power
    from statsmodels.stats.proportion import proportion_effectsize
    
    effect_size = proportion_effectsize(0.025, 0.02)
    power = zt_ind_solve_power(
        effect_size=effect_size,
        nobs1=5000,
        alpha=0.05,
        ratio=1.0
    )
    print(f"\n统计功效: {power:.4f}")
except ImportError:
    print("\n需要安装statsmodels进行功效分析")

第六部分:性能优化与最佳实践

6.1 代码性能分析

使用cProfile分析性能瓶颈

import cProfile
import pstats

def slow_function():
    # 模拟慢函数
    df = pd.DataFrame({'A': np.random.randint(0, 100, 100000)})
    df['B'] = df['A'].apply(lambda x: x**2 + np.sin(x))
    return df

# 分析性能
profiler = cProfile.Profile()
profiler.enable()
slow_function()
profiler.disable()

# 输出结果
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative').print_stats(10)

使用%timeit魔法命令

# 在Jupyter中使用
# %timeit df['A'] + df['B']
# %timeit df.apply(lambda row: row['A'] + row['B'], axis=1)

6.2 内存优化最佳实践

使用astype()优化内存

def optimize_memory(df):
    """优化DataFrame内存使用"""
    start_mem = df.memory_usage().sum() / 1024**2
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
            else:
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
    
    end_mem = df.memory_usage().sum() / 1024**2
    print(f"内存使用从 {start_mem:.2f} MB 降低到 {end_mem:.2f} MB")
    return df

# 使用示例
df = pd.DataFrame({
    'A': np.random.randint(0, 100, 100000),
    'B': np.random.randn(100000)
})
df = optimize_memory(df)

6.3 代码组织与可复用性

创建可复用的分析函数

def sales_analysis_pipeline(data_path, output_path=None):
    """
    销售数据分析管道
    
    参数:
        data_path: 数据文件路径
        output_path: 输出路径(可选)
    
    返回:
        分析结果DataFrame
    """
    # 1. 数据加载
    df = pd.read_csv(data_path)
    
    # 2. 数据清洗
    df = df.dropna(subset=['sales', 'date'])
    df['date'] = pd.to_datetime(df['date'])
    
    # 3. 特征工程
    df['month'] = df['date'].dt.month
    df['quarter'] = df['date'].dt.quarter
    df['sales'] = df['sales'].astype('float32')
    
    # 4. 分析计算
    monthly_sales = df.groupby('month')['sales'].sum()
    top_products = df.groupby('product')['sales'].sum().nlargest(5)
    
    # 5. 结果输出
    result = {
        'monthly_sales': monthly_sales,
        'top_products': top_products,
        'total_sales': df['sales'].sum()
    }
    
    if output_path:
        monthly_sales.to_csv(f"{output_path}/monthly_sales.csv")
        top_products.to_csv(f"{output_path}/top_products.csv")
    
    return result

# 使用示例
# result = sales_analysis_pipeline('sales_data.csv', 'output/')

第七部分:进阶库与工具

7.1 使用Polars进行高性能分析

Polars基础

import polars as pl

# 创建Polars DataFrame(比Pandas更快)
df = pl.DataFrame({
    "a": [1, 2, 3, 4, 5],
    "b": ["A", "B", "A", "B", "A"],
    "c": [3.1, 2.4, 5.6, 2.7, 8.8]
})

# 链式操作(惰性求值)
result = (df.filter(pl.col("a") > 2)
          .groupby("b")
          .agg(pl.col("c").mean())
          .collect())  # collect()触发计算

print(result)

# 与Pandas互操作
pandas_df = df.to_pandas()
polars_from_pandas = pl.from_pandas(pandas_df)

Polars性能优势

# 大数据集性能对比
import time

# Pandas方式
start = time.time()
pandas_df = pd.DataFrame({
    'A': np.random.randint(0, 100, 1000000),
    'B': np.random.randint(0, 100, 1000000)
})
pandas_result = pandas_df.groupby('A').agg({'B': ['mean', 'sum']})
pandas_time = time.time() - start

# Polars方式
start = time.time()
polars_df = pl.DataFrame({
    'A': np.random.randint(0, 100, 1000000),
    'B': np.random.randint(0, 100, 1000000)
})
polars_result = (polars_df.groupby('A')
                 .agg(pl.col("B").mean().alias("B_mean"),
                      pl.col("B").sum().alias("B_sum"))
                 .collect())
polars_time = time.time() - start

print(f"Pandas时间: {pandas_time:.2f}秒")
print(f"Polars时间: {polars_time:.2f}秒")
print(f"Polars比Pandas快 {pandas_time/polars_time:.1f}倍")

7.2 使用PySpark处理超大规模数据

PySpark基础

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count

# 创建Spark会话
spark = SparkSession.builder \
    .appName("DataAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# 创建DataFrame
data = [(1, "Alice", 1000),
        (2, "Bob", 1500),
        (3, "Charlie", 2000)]
df = spark.createDataFrame(data, ["id", "name", "salary"])

# 执行聚合操作
result = df.groupBy() \
    .agg(
        sum("salary").alias("total_salary"),
        avg("salary").alias("avg_salary"),
        count("id").alias("employee_count")
    )

result.show()

# 读取大文件
large_df = spark.read.csv("large_file.csv", header=True, inferSchema=True)
# 执行分布式计算
summary = large_df.groupBy("category").agg(
    sum("value").alias("total"),
    avg("value").alias("average")
)
summary.show()

# 停止Spark会话
spark.stop()

7.3 使用SQL进行数据分析

在Python中使用SQL

import sqlite3
import pandas as pd

# 创建内存数据库
conn = sqlite3.connect(':memory:')

# 创建表
conn.execute('''
    CREATE TABLE sales (
        id INTEGER PRIMARY KEY,
        date TEXT,
        product TEXT,
        salesperson TEXT,
        amount REAL
    )
''')

# 插入数据
sales_data = pd.DataFrame({
    'date': pd.date_range('2023-01-01', periods=100, freq='D'),
    'product': np.random.choice(['Laptop', 'Phone', 'Tablet'], 100),
    'salesperson': np.random.choice(['Alice', 'Bob', 'Charlie'], 100),
    'amount': np.random.randint(500, 2000, 100)
})

sales_data.to_sql('sales', conn, if_exists='append', index=False)

# 使用SQL查询
query = '''
    SELECT 
        salesperson,
        product,
        SUM(amount) as total_sales,
        COUNT(*) as transaction_count
    FROM sales
    WHERE date >= '2023-02-01'
    GROUP BY salesperson, product
    ORDER BY total_sales DESC
'''

result = pd.read_sql(query, conn)
print(result)

# 复杂SQL分析
complex_query = '''
    WITH monthly_sales AS (
        SELECT 
            strftime('%Y-%m', date) as month,
            salesperson,
            SUM(amount) as monthly_total
        FROM sales
        GROUP BY month, salesperson
    ),
    ranked_sales AS (
        SELECT 
            month,
            salesperson,
            monthly_total,
            RANK() OVER (PARTITION BY month ORDER BY monthly_total DESC) as rank
        FROM monthly_sales
    )
    SELECT * FROM ranked_sales WHERE rank <= 3
'''

top_performers = pd.read_sql(complex_query, conn)
print(top_performers)

conn.close()

第八部分:机器学习基础集成

8.1 特征工程自动化

使用Featuretools

import featuretools as ft

# 创建实体集
es = ft.EntitySet(id="sales_data")

# 添加数据
data = pd.DataFrame({
    'customer_id': [1, 1, 2, 2, 3, 3],
    'transaction_id': [1, 2, 3, 4, 5, 6],
    'amount': [100, 200, 150, 250, 300, 400],
    'date': pd.date_range('2023-01-01', periods=6, freq='D')
})

es = es.add_dataframe(
    dataframe_name="transactions",
    dataframe=data,
    index="transaction_id",
    time_index="date"
)

es = es.add_dataframe(
    dataframe_name="customers",
    dataframe=pd.DataFrame({'customer_id': [1, 2, 3]}),
    index="customer_id"
)

# 自动特征生成
feature_matrix, feature_defs = ft.dfs(
    entityset=es,
    target_dataframe_name="customers",
    max_depth=2
)

print("生成的特征:")
print(feature_matrix.head())

8.2 预测模型集成

简单预测示例

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

# 准备数据
df = pd.DataFrame({
    'feature1': np.random.randn(1000),
    'feature2': np.random.randn(1000),
    'feature3': np.random.randn(1000),
    'target': np.random.randn(1000) + 0.5 * np.random.randn(1000)
})

X = df[['feature1', 'feature2', 'feature3']]
y = df['target']

# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# 预测
y_pred = model.predict(X_test)

# 评估
mse = mean_squared_error(y_test, y_pred)
print(f"均方误差: {mse:.4f}")

# 特征重要性
feature_importance = pd.DataFrame({
    'feature': X.columns,
    'importance': model.feature_importances_
}).sort_values('importance', ascending=False)

print("\n特征重要性:")
print(feature_importance)

第九部分:项目实战与职场应用

9.1 构建自动化分析报告

自动化报告生成

import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

def generate_monthly_report(data_path, output_dir):
    """
    自动生成月度分析报告
    """
    # 1. 数据加载与清洗
    df = pd.read_csv(data_path)
    df['date'] = pd.to_datetime(df['date'])
    df = df[df['date'] >= df['date'].max() - pd.DateOffset(months=1)]
    
    # 2. 计算关键指标
    metrics = {
        '总销售额': df['sales'].sum(),
        '平均订单价值': df['sales'].mean(),
        '订单数量': len(df),
        '同比增长': ((df['sales'].sum() - df['sales'].iloc[:-30].sum()) / df['sales'].iloc[:-30].sum() * 100)
    }
    
    # 3. 生成图表
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # 销售趋势
    daily_sales = df.groupby('date')['sales'].sum()
    axes[0, 0].plot(daily_sales.index, daily_sales.values)
    axes[0, 0].set_title('每日销售趋势')
    axes[0, 0].tick_params(axis='x', rotation=45)
    
    # 产品分布
    product_sales = df.groupby('product')['sales'].sum()
    axes[0, 1].pie(product_sales.values, labels=product_sales.index, autopct='%1.1f%%')
    axes[0, 1].set_title('产品销售占比')
    
    # 销售员表现
    salesperson_sales = df.groupby('salesperson')['sales'].sum().sort_values(ascending=False)
    axes[1, 0].bar(salesperson_sales.index, salesperson_sales.values)
    axes[1, 0].set_title('销售员业绩')
    axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 订单分布
    axes[1, 1].hist(df['sales'], bins=20, alpha=0.7)
    axes[1, 1].set_title('订单金额分布')
    
    plt.tight_layout()
    
    # 4. 保存报告
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    report_path = f"{output_dir}/monthly_report_{timestamp}.png"
    plt.savefig(report_path, dpi=300, bbox_inches='tight')
    plt.close()
    
    # 5. 生成文本报告
    report_text = f"""
    月度销售分析报告
    生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    
    关键指标:
    - 总销售额: {metrics['总销售额']:,.2f}
    - 平均订单价值: {metrics['平均订单价值']:,.2f}
    - 订单数量: {metrics['订单数量']}
    - 同比增长: {metrics['同比增长']:.2f}%
    
    可视化图表已保存至: {report_path}
    """
    
    with open(f"{output_dir}/monthly_report_{timestamp}.txt", 'w') as f:
        f.write(report_text)
    
    return metrics, report_path

# 使用示例
# metrics, chart_path = generate_monthly_report('sales_data.csv', './reports')

9.2 数据质量监控

数据质量检查框架

def data_quality_check(df, rules):
    """
    数据质量检查框架
    
    参数:
        df: DataFrame
        rules: 质量检查规则字典
    """
    quality_report = {}
    
    # 1. 完整性检查
    quality_report['completeness'] = {
        col: 1 - df[col].isnull().sum() / len(df)
        for col in rules.get('required_columns', [])
    }
    
    # 2. 唯一性检查
    for col in rules.get('unique_columns', []):
        quality_report[f'unique_{col}'] = {
            'unique_ratio': df[col].nunique() / len(df),
            'duplicates': df[col].duplicated().sum()
        }
    
    # 3. 有效性检查
    for col, condition in rules.get('value_constraints', {}).items():
        valid = df[col].apply(condition)
        quality_report[f'valid_{col}'] = {
            'valid_ratio': valid.mean(),
            'invalid_count': (~valid).sum()
        }
    
    # 4. 范围检查
    for col, (min_val, max_val) in rules.get('range_constraints', {}).items():
        in_range = (df[col] >= min_val) & (df[col] <= max_val)
        quality_report[f'range_{col}'] = {
            'in_range_ratio': in_range.mean(),
            'out_of_range': (~in_range).sum()
        }
    
    return quality_report

# 使用示例
rules = {
    'required_columns': ['sales', 'date', 'product'],
    'unique_columns': ['transaction_id'],
    'value_constraints': {
        'sales': lambda x: x > 0,
        'product': lambda x: x.isin(['Laptop', 'Phone', 'Tablet'])
    },
    'range_constraints': {
        'sales': (0, 10000)
    }
}

# 质量检查
# quality = data_quality_check(your_df, rules)
# print(quality)

9.3 职场竞争力提升建议

1. 建立个人作品集

  • 将完成的项目整理成Jupyter Notebook
  • 使用GitHub托管代码
  • 撰写项目文档说明业务价值

2. 持续学习

  • 关注Pandas、NumPy、Scikit-learn的更新
  • 参加Kaggle竞赛
  • 阅读数据分析相关博客和论文

3. 业务理解能力

  • 学习所在行业的业务知识
  • 理解数据背后的业务逻辑
  • 能够将数据洞察转化为业务建议

4. 沟通能力

  • 学会用数据讲故事
  • 制作清晰的可视化图表
  • 向非技术人员解释技术概念

结语

通过本课程的学习,你已经掌握了Python数据分析的高级技巧,能够解决实际工作中的复杂问题。记住,数据分析不仅仅是写代码,更重要的是理解业务、发现问题并提供解决方案。

关键要点回顾

  1. 性能优化:向量化操作、内存优化、分块处理
  2. 数据清洗:高级缺失值处理、复杂转换、时间序列处理
  3. 可视化:交互式图表、复杂布局、自定义样式
  4. 实际应用:销售分析、用户行为分析、A/B测试
  5. 工具进阶:Polars、PySpark、SQL集成
  6. 机器学习:特征工程、预测模型

下一步行动

  1. 在实际工作中应用这些技巧
  2. 持续挑战更复杂的数据问题
  3. 建立个人数据分析项目集
  4. 参与开源项目或Kaggle竞赛
  5. 考虑获得相关认证(如Pandas认证)

掌握这些高级技能将显著提升你的职场竞争力,让你在数据驱动的决策中发挥关键作用。祝你在数据分析的道路上取得成功!