引言:为什么Python数据分析是职场必备技能
在当今数据驱动的商业环境中,数据分析能力已经成为职场核心竞争力。Python作为数据分析的首选语言,凭借其简洁的语法、丰富的库生态系统和强大的社区支持,成为了数据分析师、数据科学家和业务分析师的标准工具。从基础的数据处理到复杂的机器学习建模,Python都能提供完整的解决方案。
掌握Python数据分析不仅能帮助你从海量数据中提取有价值的洞察,还能自动化重复性工作,提高决策效率。无论你是刚入行的新人,还是希望提升技能的职场人士,系统学习Python数据分析都将为你的职业发展带来巨大价值。
第一部分:Python基础语法精要
变量与数据类型
Python是动态类型语言,变量的类型在运行时自动确定。掌握基础数据类型是数据分析的起点。
# 数值类型
age = 25 # 整数
price = 19.99 # 浮点数
tax_rate = 0.08 # 浮点数
# 字符串类型
name = "张三" # 字符串
address = '北京市朝阳区' # 单引号或双引号都可以
# 布尔类型
is_active = True # 布尔值
is_verified = False
# 查看变量类型
print(type(age)) # <class 'int'>
print(type(price)) # <class 'float'>
print(type(name)) # <class 'str'>
数据结构核心
数据分析中最重要的四种数据结构:列表、元组、字典和集合。
# 列表:可变序列,适合存储有序数据
sales_data = [120, 150, 180, 200, 165]
print(f"销售数据: {sales_data}")
print(f"第一个月的销量: {sales_data[0]}") # 索引从0开始
# 元组:不可变序列,适合存储配置信息
coordinates = (116.4074, 39.9042) # 北京的经纬度
print(f"北京坐标: {coordinates}")
# 字典:键值对,适合存储结构化数据
customer = {
"name": "李四",
"age": 30,
"city": "上海",
"purchases": [200, 350, 180]
}
print(f"客户信息: {customer}")
print(f"客户城市: {customer['city']}")
# 集合:去重,适合存储唯一值
unique_categories = {"电子产品", "服装", "食品", "电子产品"}
print(f"去重后的类别: {unique_categories}") # 自动去重
# 列表推导式:高效的数据处理
squares = [x**2 for x in range(10)]
print(f"0-9的平方: {squares}")
# 条件过滤
even_numbers = [x for x in range(20) if x % 2 == 0]
print(f"0-19的偶数: {even_numbers}")
控制流与函数
# 条件语句
def get_discount_rate(purchase_amount):
if purchase_amount >= 1000:
return 0.15
elif purchase_amount >= 500:
return 0.10
elif purchase_amount >= 200:
return 0.05
else:
return 0
# 循环结构
def calculate_monthly_sales(sales_data):
total = 0
for month, sales in enumerate(sales_data, 1):
total += sales
print(f"第{month}个月销量: {sales}")
return total
# 函数式编程
def process_sales_data(sales_list):
# 使用map函数计算每个销售点的提成
commissions = list(map(lambda x: x * 0.02, sales_list))
# 使用filter函数筛选出高销售额
high_sales = list(filter(lambda x: x > 150, sales_list))
return commissions, high_sales
# 实际应用
sales = [120, 150, 180, 200, 165]
comms, highs = process_sales_data(sales)
print(f"提成列表: {comms}")
print(f"高销售额: {highs}")
第二部分:NumPy:科学计算基础
NumPy是Python科学计算的基础库,提供高性能的多维数组对象和工具。
ndarray数组操作
import numpy as np
# 创建数组
arr1 = np.array([1, 2, 3, 4, 5]) # 从列表创建
arr2 = np.arange(0, 10, 2) # 类似range
arr3 = np.linspace(0, 1, 5) # 等间距数列
arr4 = np.zeros((3, 3)) # 3x3零矩阵
arr5 = np.random.random((2, 3)) # 2x3随机数矩阵
print("数组1:", arr1)
print("数组2:", arr2)
print("数组3:", arr3)
print("数组4:\n", arr4)
print("数组5:\n", arr5)
# 数组属性
print(f"形状: {arr5.shape}") # (2, 3)
print(f"数据类型: {arr5.dtype}") # float64
print(f"维度: {arr5.ndim}") # 2
# 数组运算(向量化操作)
a = np.array([1, 2, 3, 4])
b = np.array([10, 20, 30, 40])
print("加法:", a + b) # [11 22 33 44]
print("乘法:", a * b) # [10 40 90 160]
print("平方:", a ** 2) # [1 4 9 16]
print("比较:", a > 2) # [False False True True]
# 索引和切片
matrix = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print("第二行:", matrix[1]) # [4 5 6]
print("第三列:", matrix[:, 2]) # [3 6 9]
print("子矩阵:", matrix[0:2, 1:3]) # [[2 3] [5 6]]
# 花式索引
indices = [0, 2]
print("指定行:", matrix[indices]) # [[1 2 3] [7 8 9]]
# 聚合函数
data = np.array([15, 22, 34, 41, 28, 33, 19, 25])
print(f"总和: {data.sum()}")
print(f"平均值: {data.mean()}")
print(f"标准差: {data.std()}")
print(f"最大值: {data.max()}")
print(f"最小值: {data.min()}")
print(f"中位数: {np.median(data)}")
# 广播机制
matrix = np.array([[1, 2, 3], [4, 5, 6]])
row_means = matrix.mean(axis=1).reshape(-1, 1)
normalized = matrix - row_means
print("原始矩阵:\n", matrix)
print("按行均值归一化:\n", np.round(normalized, 2))
实战案例:销售数据分析
# 模拟销售数据:10个销售点,3个月的数据
np.random.seed(42) # 固定随机种子,结果可复现
sales_data = np.random.randint(100, 500, size=(10, 3))
months = ['1月', '2月', '3月']
stores = [f"门店{i+1}" for i in 10(range(10))]
print("销售数据矩阵(行:门店,列:月份):")
print(sales_data)
# 计算每个门店的总销售额
total_sales_per_store = sales_data.sum(axis=1)
print("\n各门店总销售额:", total_sales_per_store)
# 计算每月平均销售额
monthly_avg = sales_data.mean(axis=0)
print("每月平均销售额:", monthly_avg)
# 找出销售额超过300的门店
high_sales_stores = np.where(sales_data > 300)
print("\n销售额超过300的门店和月份:")
for store, month in zip(high_sales_stores[0], high_sales_stores[1]):
print(f"门店{store+1} - {months[month]}: {sales_data[store, month]}")
# 计算环比增长率
growth_rates = np.diff(sales_data, axis=1) / sales_data[:, :-1] * 100
print("\n环比增长率(%):")
print(growth_rates)
第三部分:Pandas:数据处理与分析利器
Pandas是基于NumPy构建的数据分析库,提供DataFrame这一核心数据结构。
Series和DataFrame基础
import pandas as pd
# Series:带标签的一维数组
s = pd.Series([1, 3, 5, 7, 9], index=['a', 'b', 'c', 'd', 'e'])
print("Series:\n", s)
print("值:", s.values)
print("索引:", s.index)
# DataFrame:二维表格结构
data = {
'姓名': ['张三', '李四', '王五', '赵六'],
'年龄': [25, 30, 35, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'薪资': [12000, 18000, 15000, 20000]
}
df = pd.DataFrame(data)
print("\nDataFrame:\n", df)
# 查看数据基本信息
print("\n数据形状:", df.shape)
print("数据类型:\n", df.dtypes)
print("前3行:\n", df.head(3))
print("后2行:\n", df.tail(2))
print("统计描述:\n", df.describe())
数据读取与写入
# 读取CSV文件
# df = pd.read_csv('data.csv', encoding='utf-8')
# 读取Excel文件
# df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
# 读取JSON
# df = pd.read_json('data.json')
# 读取SQL数据库
# import sqlite3
# conn = sqlite3.connect('database.db')
# df = pd.read_sql('SELECT * FROM table', conn)
# 写入文件
# df.to_csv('output.csv', index=False, encoding='utf-8')
# df.to_excel('output.xlsx', index=False)
# df.to_json('output.json', orient='records')
# 示例:创建并保存数据
sample_df = pd.DataFrame({
'product': ['A', 'B', 'C', 'D'],
'price': [100, 200, 300, 400],
'quantity': [10, 5, 8, 3]
})
sample_df.to_csv('sample_data.csv', index=False)
print("已创建sample_data.csv文件")
数据清洗与预处理
# 创建包含问题数据的DataFrame
data = {
'姓名': ['张三', '李四', '王五', '赵六', '孙七'],
'年龄': [25, 30, -1, 28, 35], # -1是异常值
'城市': ['北京', '上海', '广州', '深圳', None], # 缺失值
'邮箱': ['zhang@company.com', 'li@company.com', 'wang@company.com', 'zhao@company.com', 'sun@company.com'],
'薪资': [12000, 18000, 15000, 20000, 16000]
}
df = pd.DataFrame(data)
print("原始数据:\n", df)
# 处理缺失值
print("\n缺失值统计:\n", df.isnull().sum())
# 删除缺失值
df_dropped = df.dropna(subset=['城市'])
# 填充缺失值
df_filled = df.copy()
df_filled['城市'] = df_filled['城市'].fillna('未知城市')
print("\n填充缺失值后:\n", df_filled)
# 处理异常值
# 年龄不能为负数,也不能超过100
df_clean = df_filled.copy()
df_clean.loc[df_clean['年龄'] <= 0, '年龄'] = np.nan
df_clean.loc[df_clean['年龄'] > 100, '年龄'] = np.nan
df_clean['年龄'] = df_clean['年龄'].fillna(df_clean['年龄'].median())
print("\n处理异常值后:\n", df_clean)
# 数据类型转换
df_clean['年龄'] = df_clean['年龄'].astype(int)
print("\n数据类型:\n", df_clean.dtypes)
# 重复值处理
df_duplicate = pd.DataFrame({
'姓名': ['张三', '李四', '张三', '王五'],
'年龄': [25, 30, 25, 35]
})
print("\n包含重复值的数据:\n", df_duplicate)
print("去重后:\n", df_duplicate.drop_duplicates())
数据筛选与查询
# 继续使用df_clean
df = df_clean
# 基本筛选
print("年龄大于28的员工:\n", df[df['年龄'] > 28])
# 多条件筛选
print("\n北京且薪资大于15000的员工:\n", df[(df['城市'] == '北京') & (df['薪资'] > 15000)])
# 使用query方法
print("\n使用query查询:\n", df.query('年龄 > 25 and 薪资 > 15000'))
# 使用isin筛选
target_cities = ['北京', '上海']
print("\n指定城市的员工:\n", df[df['城市'].isin(target_cities)])
# 字符串筛选
print("\n邮箱包含company的员工:\n", df[df['邮箱'].str.contains('company')])
# 按索引筛选
print("\n指定索引的行:\n", df.loc[[0, 2]])
print("\n指定列:\n", df[['姓名', '薪资']])
数据分组与聚合
# 创建销售数据
sales_df = pd.DataFrame({
'地区': ['华北', '华东', '华北', '华南', '华东', '华北', '华南', '华东'],
'产品': ['A', 'A', 'B', 'A', 'B', 'B', 'A', 'B'],
'销售额': [100, 150, 200, 120, 180, 220, 140, 160],
'利润': [20, 30, 40, 24, 36, 44, 28, 32]
})
print("销售数据:\n", sales_df)
# 单列分组
grouped = sales_df.groupby('地区')
print("\n按地区分组:\n", grouped)
print("\n各地区销售额总和:\n", grouped['销售额'].sum())
print("\n各地区平均利润:\n", grouped['利润'].mean())
# 多列分组
multi_group = sales_df.groupby(['地区', '产品'])
print("\n按地区和产品分组:\n", multi_group['销售额'].sum())
# 聚合多个函数
agg_result = grouped.agg({
'销售额': ['sum', 'mean', 'max'],
'利润': ['sum', 'mean']
})
print("\n多函数聚合:\n", agg_result)
# 自定义聚合函数
def profit_margin(series):
return series['利润'].sum() / series['销售额'].sum()
# apply自定义函数
margin_by_region = sales_df.groupby('地区').apply(profit_margin)
print("\n各地区利润率:\n", margin_by_region)
数据合并与连接
# 创建两个DataFrame用于演示
df1 = pd.DataFrame({
'员工ID': ['E001', 'E002', 'E003'],
'姓名': ['张三', '李四', '王五'],
'部门': ['销售', '技术', '人事']
})
df2 = pd.DataFrame({
'员工ID': ['E001', 'E002', 'E004'],
'薪资': [12000, 18000, 15000],
'城市': ['北京', '上海', '广州']
})
# 合并(类似SQL的JOIN)
print("df1:\n", df1)
print("\ndf2:\n", df2)
# 内连接
inner_join = pd.merge(df1, df2, on='员工ID', how='inner')
print("\n内连接:\n", inner_join)
# 左连接
left_join = pd.merge(df1, df2, on='员工ID', how='left')
print("\n左连接:\n", left_join)
# 外连接
outer_join = pd.merge(df1, df2, on='员工ID', how='outer')
print("\n外连接:\n", outer_join)
# 纵向合并
df3 = pd.DataFrame({
'员工ID': ['E005', 'E006'],
'姓名': ['赵六', '孙七'],
'部门': ['技术', '销售']
})
combined = pd.concat([df1, df3], ignore_index=True)
print("\n纵向合并:\n", combined)
数据透视表
# 使用之前的sales_df
pivot_table = pd.pivot_table(
sales_df,
values=['销售额', '利润'],
index='地区',
columns='产品',
aggfunc='sum',
fill_value=0
)
print("数据透视表:\n", pivot_table)
# 交叉表
cross_tab = pd.crosstab(
sales_df['地区'],
sales_df['产品'],
values=sales_df['销售额'],
aggfunc='sum'
)
print("\n交叉表:\n", cross_tab)
时间序列处理
# 创建时间序列数据
dates = pd.date_range('2024-01-01', periods=10, freq='D')
ts_df = pd.DataFrame({
'日期': dates,
'销售额': np.random.randint(100, 500, 10),
'访问量': np.random.randint(1000, 5000, 10)
})
ts_df = ts_df.set_index('日期')
print("时间序列数据:\n", ts_df)
# 时间筛选
print("\n2024年1月5日之后的数据:\n", ts_df.loc['2024-01-05':])
# 重采样(按周汇总)
weekly = ts_df.resample('W').sum()
print("\n周汇总:\n", weekly)
# 移动平均
ts_df['7日移动平均'] = ts_df['销售额'].rolling(window=7).mean()
print("\n带移动平均的数据:\n", ts_df)
第四部分:数据可视化
Matplotlib基础
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 折线图
x = np.linspace(0, 10, 100)
y = np.sin(x)
plt.figure(figsize=(10, 6))
plt.plot(x, y, label='sin(x)', color='blue', linewidth=2)
plt.title('正弦函数图像')
plt.xlabel('X轴')
plt.ylabel('Y轴')
plt.legend()
plt.grid(True)
plt.show()
# 柱状图
categories = ['产品A', '产品B', '产品C', '产品D']
values = [25, 40, 30, 35]
plt.figure(figsize=(8, 5))
plt.bar(categories, values, color=['red', 'green', 'blue', 'orange'])
plt.title('产品销量对比')
plt.xlabel('产品')
plt.ylabel('销量')
plt.show()
# 散点图
np.random.seed(42)
x = np.random.normal(0, 1, 100)
y = 2 * x + np.random.normal(0, 0.5, 100)
plt.figure(figsize=(8, 5))
plt.scatter(x, y, alpha=0.6)
plt.title('散点图示例')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()
# 直方图
data = np.random.normal(0, 1, 1000)
plt.figure(figsize=(8, 5))
plt.hist(data, bins=30, color='skyblue', edgecolor='black')
plt.title('正态分布直方图')
plt.xlabel('值')
plt.ylabel('频数')
plt.show()
Seaborn高级可视化
import seaborn as sns
# 设置风格
sns.set_theme(style="whitegrid")
# 箱线图
plt.figure(figsize=(8, 6))
sns.boxplot(data=sales_df, x='地区', y='销售额')
plt.title('各地区销售额箱线图')
plt.show()
# 热力图
corr_matrix = sales_df[['销售额', '利润']].corr()
plt.figure(figsize=(6, 4))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('相关性热力图')
plt.show()
# 小提琴图
plt.figure(figsize=(8, 6))
sns.violinplot(data=sales_df, x='地区', y='销售额', hue='产品')
plt.title('各地区销售额小提琴图')
plt.show()
# 成对关系图
pair_plot_df = sales_df[['销售额', '利润', '地区', '产品']]
sns.pairplot(pair_plot_df, hue='地区')
plt.show()
Plotly交互式可视化
import plotly.express as px
import plotly.graph_objects as go
# 交互式散点图
fig = px.scatter(
sales_df,
x='销售额',
y='利润',
color='地区',
size='销售额',
hover_data=['产品'],
title='销售额 vs 利润'
)
fig.show()
# 交互式折线图
time_data = pd.DataFrame({
'时间': pd.date_range('2024-01-01', periods=30),
'销售额': np.cumsum(np.random.normal(0, 50, 30)) + 1000,
'访问量': np.cumsum(np.random.normal(0, 200, 30)) + 5000
})
fig = px.line(
time_data,
x='时间',
y=['销售额', '访问量'],
title='30天销售与访问趋势'
)
fig.show()
# 交互式柱状图
fig = px.bar(
sales_df,
x='地区',
y='销售额',
color='产品',
barmode='group',
title='各地区产品销售额对比'
)
fig.show()
第五部分:实战项目:电商销售数据分析
项目背景与数据准备
# 创建模拟电商数据
np.random.seed(42)
# 基础数据
n_orders = 1000
categories = ['电子产品', '服装', '家居', '食品', '美妆']
cities = ['北京', '上海', '广州', '深圳', '杭州', '成都']
# 生成订单数据
orders = pd.DataFrame({
'订单ID': [f'ORD{i:04d}' for i in range(1, n_orders+1)],
'订单日期': pd.date_range('2024-01-01', periods=n_orders, freq='H'),
'客户ID': np.random.randint(1000, 2000, n_orders),
'产品类别': np.random.choice(categories, n_orders),
'城市': np.random.choice(cities, n_orders),
'订单金额': np.random.uniform(50, 5000, n_orders).round(2),
'订单状态': np.random.choice(['已完成', '待发货', '已取消'], n_orders, p=[0.7, 0.2, 0.1])
})
# 添加一些特征
orders['年份'] = orders['订单日期'].dt.year
orders['月份'] = orders['订单日期'].dt.month
orders['星期'] = orders['订单日期'].dt.dayofweek
orders['小时'] = orders['订单日期'].dt.hour
print("电商订单数据预览:")
print(orders.head())
print(f"\n数据形状: {orders.shape}")
1. 销售额总体分析
# 基础统计
print("=== 销售额总体分析 ===")
print(f"总销售额: {orders['订单金额'].sum():,.2f}")
print(f"平均订单金额: {orders['订单金额'].mean():,.2f}")
print(f"订单总数: {len(orders)}")
print(f"客单价: {orders['订单金额'].sum() / orders['客户ID'].nunique():,.2f}")
# 按状态统计
status_summary = orders.groupby('订单状态').agg({
'订单金额': ['count', 'sum', 'mean']
}).round(2)
print("\n按订单状态统计:")
print(status_summary)
# 只统计已完成订单
completed_orders = orders[orders['订单状态'] == '已完成']
print(f"\n已完成订单总销售额: {completed_orders['订单金额'].sum():,.2f}")
2. 时间趋势分析
print("\n=== 时间趋势分析 ===")
# 按月统计
monthly_sales = completed_orders.groupby('月份').agg({
'订单金额': ['sum', 'count', 'mean']
}).round(2)
print("月度销售统计:")
print(monthly_sales)
# 可视化月度趋势
plt.figure(figsize=(12, 6))
monthly_sales_plot = completed_orders.groupby('月份')['订单金额'].sum()
plt.plot(monthly_sales_plot.index, monthly_sales_plot.values, marker='o', linewidth=2)
plt.title('2024年月度销售额趋势')
plt.xlabel('月份')
plt.ylabel('销售额')
plt.xticks(range(1, 13))
plt.grid(True)
plt.show()
# 小时趋势分析
hourly_sales = completed_orders.groupby('小时')['订单金额'].sum()
plt.figure(figsize=(12, 6))
plt.bar(hourly_sales.index, hourly_sales.values, color='skyblue')
plt.title('24小时销售额分布')
plt.xlabel('小时')
plt.ylabel('销售额')
plt.xticks(range(0, 24))
plt.grid(True, alpha=0.3)
plt.show()
3. 产品类别分析
print("\n=== 产品类别分析 ===")
# 各类别销售占比
category_sales = completed_orders.groupby('产品类别')['订单金额'].sum()
print("各类别销售额:")
print(category_sales)
# 可视化
plt.figure(figsize=(10, 8))
plt.pie(category_sales.values, labels=category_sales.index, autopct='%1.1f%%')
plt.title('产品类别销售额占比')
plt.show()
# 类别TOP3
category_top3 = completed_orders.groupby('产品类别')['订单金额'].sum().nlargest(3)
print("\n销售额TOP3类别:")
print(category_top3)
# 类别与城市交叉分析
category_city = pd.pivot_table(
completed_orders,
values='订单金额',
index='产品类别',
columns='城市',
aggfunc='sum',
fill_value=0
)
print("\n类别-城市交叉分析:")
print(category_city)
4. 城市维度分析
print("\n=== 城市维度分析 ===")
# 各城市销售情况
city_sales = completed_orders.groupby('城市').agg({
'订单金额': ['sum', 'count', 'mean'],
'客户ID': 'nunique'
}).round(2)
city_sales.columns = ['总销售额', '订单数', '平均订单金额', '客户数']
print("各城市销售统计:")
print(city_sales)
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# 总销售额
city_sales['总销售额'].plot(kind='bar', ax=axes[0,0], color='skyblue')
axes[0,0].set_title('各城市总销售额')
axes[0,0].set_ylabel('销售额')
# 订单数
city_sales['订单数'].plot(kind='bar', ax=axes[0,1], color='lightgreen')
axes[0,1].set_title('各城市订单数')
axes[0,1].set_ylabel('订单数')
# 平均订单金额
city_sales['平均订单金额'].plot(kind='bar', ax=axes[1,0], color='orange')
axes[1,0].set_title('各城市平均订单金额')
axes[1,0].set_ylabel('平均订单金额')
# 客户数
city_sales['客户数'].plot(kind='bar', ax=axes[1,1], color='pink')
axes[1,1].set_title('各城市客户数')
axes[1,1].set_ylabel('客户数')
plt.tight_layout()
plt.show()
5. 客户价值分析(RFM模型简化版)
print("\n=== 客户价值分析 ===")
# 计算每个客户的最近购买时间、购买频率、购买金额
snapshot_date = completed_orders['订单日期'].max() + pd.Timedelta(days=1)
rfm = completed_orders.groupby('客户ID').agg({
'订单日期': lambda x: (snapshot_date - x.max()).days, # 最近购买天数
'订单ID': 'count', # 购买频率
'订单金额': 'sum' # 购买金额
})
rfm.columns = ['Recency', 'Frequency', 'Monetary']
print("RFM数据前5行:")
print(rfm.head())
# RFM评分(四分位数)
rfm['R_Score'] = pd.qcut(rfm['Recency'], 4, labels=[4, 3, 2, 1]) # 最近购买得分越高
rfm['F_Score'] = pd.qcut(rfm['Frequency'].rank(method='first'), 4, labels=[1, 2, 3, 4])
rfm['M_Score'] = pd.qcut(rfm['Monetary'], 4, labels=[1, 2, 3, 4])
# 客户分层
rfm['RFM_Score'] = rfm['R_Score'].astype(str) + rfm['F_Score'].astype(str) + rfm['M_Score'].astype(str)
def segment_customer(score):
if score in ['444', '443', '434', '433']:
return '重要价值客户'
elif score in ['424', '423', '414', '413']:
return '重要发展客户'
elif score in ['344', '343', '334', '333']:
return '重要保持客户'
elif score in ['144', '143', '134', '133']:
return '重要挽留客户'
else:
return '一般客户'
rfm['客户分层'] = rfm['RFM_Score'].apply(segment_customer)
print("\n客户分层统计:")
print(rfm['客户分层'].value_counts())
# 可视化
plt.figure(figsize=(10, 6))
rfm['客户分层'].value_counts().plot(kind='bar', color='skyblue')
plt.title('客户分层分布')
plt.xlabel('客户类型')
plt.ylabel('客户数量')
plt.xticks(rotation=45)
plt.show()
6. 异常检测与数据质量检查
print("\n=== 异常检测 ===")
# 检测异常高额订单
high_value_threshold = completed_orders['订单金额'].quantile(0.99)
high_value_orders = completed_orders[completed_orders['订单金额'] > high_value_threshold]
print(f"异常高额订单(前1%)数量: {len(high_value_orders)}")
print("异常高额订单:")
print(high_value_orders[['订单ID', '订单日期', '产品类别', '订单金额']].head())
# 检测异常时间订单(如凌晨3点大量订单)
hourly_order_count = completed_orders.groupby('小时')['订单ID'].count()
abnormal_hours = hourly_order_count[hourly_order_count > hourly_order_count.quantile(0.95)]
print("\n异常高订单量时段:")
print(abnormal_hours)
# 数据完整性检查
print("\n数据质量检查:")
print("缺失值统计:")
print(orders.isnull().sum())
print("\n重复订单ID:", orders['订单ID'].duplicated().sum())
7. 生成分析报告
def generate_analysis_report(df):
"""生成完整的分析报告"""
completed = df[df['订单状态'] == '已完成']
report = {
'基础指标': {
'总销售额': completed['订单金额'].sum(),
'订单总数': len(completed),
'客户总数': completed['客户ID'].nunique(),
'平均订单金额': completed['订单金额'].mean(),
'客单价': completed['订单金额'].sum() / completed['客户ID'].nunique()
},
'类别TOP3': completed.groupby('产品类别')['订单金额'].sum().nlargest(3).to_dict(),
'城市TOP3': completed.groupby('城市')['订单金额'].sum().nlargest(3).to_dict(),
'月度趋势': completed.groupby('月份')['订单金额'].sum().to_dict(),
'客户分层': rfm['客户分层'].value_counts().to_dict()
}
return report
# 生成报告
analysis_report = generate_analysis_report(orders)
print("\n=== 分析报告摘要 ===")
for section, data in analysis_report.items():
print(f"\n{section}:")
for key, value in data.items():
if isinstance(value, (int, float)):
print(f" {key}: {value:,.2f}")
else:
print(f" {key}: {value}")
第六部分:进阶技能:自动化与效率提升
1. 自动化数据处理流程
import os
import glob
def auto_process_sales_data(input_folder, output_folder):
"""
自动处理文件夹中的所有销售数据文件
"""
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 查找所有CSV文件
csv_files = glob.glob(os.path.join(input_folder, '*.csv'))
all_data = []
for file in csv_files:
print(f"正在处理: {file}")
try:
# 读取数据
df = pd.read_csv(file)
# 数据清洗
df = df.dropna()
df = df[df['订单金额'] > 0]
# 添加处理时间戳
df['处理时间'] = pd.Timestamp.now()
# 保存处理后的数据
filename = os.path.basename(file)
output_path = os.path.join(output_folder, f'processed_{filename}')
df.to_csv(output_path, index=False)
all_data.append(df)
print(f" 完成: {output_path}")
except Exception as e:
print(f" 错误: {e}")
if all_data:
combined = pd.concat(all_data, ignore_index=True)
combined_path = os.path.join(output_folder, 'combined_data.csv')
combined.to_csv(combined_path, index=False)
print(f"\n合并数据保存至: {combined_path}")
return combined
else:
return None
# 使用示例(需要实际文件夹)
# auto_process_sales_data('./raw_data', './processed_data')
2. 邮件自动化报告
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
def send_email_report(subject, body, attachment_path=None, to_emails=['your_email@example.com']):
"""
发送带附件的分析报告邮件
"""
# 邮件配置(需要替换为实际配置)
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your_email@gmail.com"
sender_password = "your_app_password"
try:
# 创建邮件
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = ', '.join(to_emails)
msg['Subject'] = subject
# 邮件正文
msg.attach(MIMEText(body, 'plain'))
# 添加附件
if attachment_path and os.path.exists(attachment_path):
with open(attachment_path, 'rb') as f:
attachment = MIMEApplication(f.read(), _subtype='csv')
attachment.add_header('Content-Disposition', 'attachment',
filename=os.path.basename(attachment_path))
msg.attach(attachment)
# 发送邮件
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
server.quit()
print("邮件发送成功!")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
# 使用示例
# report_body = """
# 本周销售数据分析报告:
# - 总销售额: 1,234,567元
# - 订单总数: 2,345单
# - 环比增长: 12.5%
# """
# send_email_report("销售周报", report_body, "sales_report.csv")
3. 数据库集成
import sqlite3
def create_sales_database(df):
"""创建SQLite数据库并存储销售数据"""
conn = sqlite3.connect('sales.db')
# 存储原始订单数据
df.to_sql('orders', conn, if_exists='replace', index=False)
# 创建汇总表
summary = df.groupby('产品类别').agg({
'订单金额': ['sum', 'count'],
'客户ID': 'nunique'
}).round(2)
summary.columns = ['总销售额', '订单数', '客户数']
summary.to_sql('category_summary', conn, if_exists='replace')
conn.close()
print("数据库创建完成: sales.db")
def query_database(sql_query):
"""查询数据库"""
conn = sqlite3.connect('sales.db')
result = pd.read_sql(sql_query, conn)
conn.close()
return result
# 使用示例
# create_sales_database(orders)
# result = query_database("SELECT * FROM category_summary WHERE 总销售额 > 100000")
# print(result)
第七部分:职场应用与最佳实践
1. 代码规范与文档
"""
电商销售数据分析模块
功能:
- 数据清洗与预处理
- 销售指标计算
- 客户价值分析
- 可视化报告生成
作者:数据分析师
日期:2024-01-01
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SalesAnalyzer:
"""销售数据分析器"""
def __init__(self, data: pd.DataFrame):
"""
初始化分析器
Args:
data: 包含销售数据的DataFrame
"""
self.data = data.copy()
self.completed_orders = self.data[self.data['订单状态'] == '已完成']
logger.info(f"初始化分析器,加载{len(self.data)}条数据")
def validate_data(self) -> Dict[str, bool]:
"""数据质量验证"""
validation_results = {
'has_missing': self.data.isnull().any().any(),
'has_negative_amount': (self.data['订单金额'] <= 0).any(),
'has_duplicates': self.data['订单ID'].duplicated().any(),
'data_freshness_days': (pd.Timestamp.now() - self.data['订单日期'].max()).days
}
logger.info(f"数据验证结果: {validation_results}")
return validation_results
def calculate_kpis(self) -> Dict[str, float]:
"""计算关键绩效指标"""
if len(self.completed_orders) == 0:
return {}
kpis = {
'total_revenue': self.completed_orders['订单金额'].sum(),
'order_count': len(self.completed_orders),
'customer_count': self.completed_orders['客户ID'].nunique(),
'avg_order_value': self.completed_orders['订单金额'].mean(),
'revenue_per_customer': self.completed_orders['订单金额'].sum() / self.completed_orders['客户ID'].nunique(),
'monthly_growth': self._calculate_monthly_growth()
}
logger.info(f"KPI计算完成: {kpis}")
return kpis
def _calculate_monthly_growth(self) -> float:
"""计算月度增长率"""
monthly = self.completed_orders.groupby('月份')['订单金额'].sum()
if len(monthly) >= 2:
return ((monthly.iloc[-1] - monthly.iloc[-2]) / monthly.iloc[-2]) * 100
return 0
def generate_report(self, output_path: str) -> None:
"""生成完整分析报告"""
logger.info("开始生成分析报告...")
kpis = self.calculate_kpis()
validation = self.validate_data()
# 创建报告DataFrame
report_data = {
'指标': list(kpis.keys()),
'值': list(kpis.values())
}
report_df = pd.DataFrame(report_data)
report_df.to_csv(output_path, index=False)
logger.info(f"报告已保存至: {output_path}")
return report_df
# 使用示例
# analyzer = SalesAnalyzer(orders)
# analyzer.validate_data()
# kpis = analyzer.calculate_kpis()
# analyzer.generate_report('sales_analysis_report.csv')
2. 性能优化技巧
# 1. 使用向量化操作替代循环
def slow_approach(data):
# 避免这样做
result = []
for i in range(len(data)):
if data[i] > 100:
result.append(data[i] * 1.1)
return result
def fast_approach(data):
# 应该这样做
return data[data > 100] * 1.1
# 2. 使用适当的数据类型
def optimize_memory(df):
"""优化DataFrame内存使用"""
df_optimized = df.copy()
# 减少数值类型的字节数
for col in df_optimized.select_dtypes(include=['int']).columns:
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='integer')
for col in df_optimized.select_dtypes(include=['float']).columns:
df_optimized[col] = pd.to_numeric(df_optimized[col], downcast='float')
# 将字符串转换为category类型
for col in df_optimized.select_dtypes(include=['object']).columns:
if df_optimized[col].nunique() / len(df_optimized) < 0.5:
df_optimized[col] = df_optimized[col].astype('category')
return df_optimized
# 3. 使用chunksize处理大文件
def process_large_file(file_path, chunk_size=10000):
"""分块处理大文件"""
chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 处理每个chunk
processed_chunk = chunk[chunk['订单金额'] > 0]
chunks.append(processed_chunk)
return pd.concat(chunks, ignore_index=True)
# 4. 使用并行处理
from joblib import Parallel, delayed
def process_single_city(city_data):
"""处理单个城市数据"""
return {
'city': city_data.name,
'revenue': city_data['订单金额'].sum(),
'orders': len(city_data)
}
def parallel_city_processing(df, n_jobs=-1):
"""并行处理各城市数据"""
results = Parallel(n_jobs=n_jobs)(
delayed(process_single_city)(group)
for name, group in df.groupby('城市')
)
return pd.DataFrame(results)
3. 版本控制与协作
"""
Git使用规范:
1. 每个功能开发前创建新分支:git checkout -b feature/sales-analysis
2. 提交信息规范:
- feat: 新功能
- fix: 修复bug
- docs: 文档更新
- refactor: 代码重构
- test: 测试相关
3. 代码审查流程
4. 使用requirements.txt管理依赖
"""
# 生成requirements.txt
def generate_requirements():
"""生成依赖列表"""
import subprocess
subprocess.run(['pip', 'freeze', '>', 'requirements.txt'])
# 配置文件示例
config_template = """
# config.yaml
data:
input_path: "./data/raw"
output_path: "./data/processed"
analysis:
min_order_amount: 50
high_value_threshold: 1000
report:
email_recipients: ["team@company.com"]
schedule: "0 9 * * 1" # 每周一9点
"""
第八部分:应对职场挑战的实战策略
1. 处理脏数据
def robust_data_cleaning(df):
"""鲁棒的数据清洗流程"""
# 1. 备份原始数据
original_shape = df.shape
# 2. 处理缺失值策略
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
# 数值列用中位数填充
median_val = df[col].median()
df[col] = df[col].fillna(median_val)
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
# 分类列用众数填充
mode_val = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
df[col] = df[col].fillna(mode_val)
# 3. 异常值检测与处理
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 将异常值设为边界值
df[col] = np.where(df[col] < lower_bound, lower_bound, df[col])
df[col] = np.where(df[col] > upper_bound, upper_bound, df[col])
# 4. 数据类型修正
df = df.convert_dtypes()
logger.info(f"数据清洗完成: {original_shape} -> {df.shape}")
return df
# 使用示例
# cleaned_data = robust_data_cleaning(raw_data)
2. 处理业务需求变更
class FlexibleAnalyzer:
"""灵活应对需求变更的分析器"""
def __init__(self, data):
self.data = data
def dynamic_groupby(self, group_cols: List[str], value_col: str, agg_func: str = 'sum'):
"""动态分组聚合"""
return self.data.groupby(group_cols)[value_col].agg(agg_func)
def add_derived_metrics(self, metrics: Dict[str, str]):
"""
动态添加衍生指标
metrics: {'新指标名': '计算表达式'}
"""
for name, expr in metrics.items():
self.data[name] = self.data.eval(expr)
return self.data
def filter_data(self, conditions: List[str]):
"""动态筛选"""
query_str = ' & '.join(conditions)
return self.data.query(query_str)
# 使用示例
# analyzer = FlexibleAnalyzer(orders)
# analyzer.add_derived_metrics({
# '利润率': '利润/订单金额',
# '客单价': '订单金额/客户ID'
# })
3. 沟通与汇报技巧
def create_presentation_summary(df, analysis_type='sales'):
"""生成适合汇报的摘要"""
summary = {}
if analysis_type == 'sales':
completed = df[df['订单状态'] == '已完成']
summary['标题'] = '销售业绩分析简报'
summary['关键发现'] = [
f"总销售额 {completed['订单金额'].sum():,.0f} 元",
f"同比增长 {((completed['订单金额'].sum() / (completed['订单金额'].sum() * 0.9) - 1) * 100):.1f}%",
f"TOP3产品类别: {', '.join(completed.groupby('产品类别')['订单金额'].sum().nlargest(3).index)}"
]
summary['行动建议'] = [
"重点关注高价值客户",
"优化低销量产品线",
"增加促销活动频次"
]
return summary
# 生成汇报文本
def format_for_managers(summary):
"""格式化为管理层汇报语言"""
print("\n" + "="*50)
print(f"【{summary['标题']}】")
print("="*50)
print("\n📊 关键发现:")
for item in summary['关键发现']:
print(f" • {item}")
print("\n💡 行动建议:")
for item in summary['行动建议']:
print(f" • {item}")
print("="*50)
# 使用示例
# summary = create_presentation_summary(orders)
# format_for_managers(summary)
第九部分:持续学习与职业发展
1. 学习路径规划
learning_path = {
"初级阶段": [
"Python基础语法",
"NumPy数组操作",
"Pandas数据处理",
"Matplotlib基础绘图",
"完成3个小型数据分析项目"
],
"中级阶段": [
"Pandas高级操作",
"Seaborn/Plotly可视化",
"SQL数据库查询",
"统计学基础",
"自动化脚本编写",
"完成5个中型项目"
],
"高级阶段": [
"机器学习基础",
"时间序列分析",
"大数据处理(Dask/Spark)",
"数据工程基础",
"业务洞察与汇报",
"完成2个端到端项目"
]
}
def print_learning_path():
"""打印学习路径"""
for level, skills in learning_path.items():
print(f"\n{level}:")
for i, skill in enumerate(skills, 1):
print(f" {i}. {skill}")
print_learning_path()
2. 项目作品集建议
portfolio_projects = [
{
"项目名称": "销售数据分析系统",
"技能点": ["Pandas", "可视化", "自动化"],
"难度": "中级",
"业务价值": "提升决策效率"
},
{
"项目名称": "客户流失预测",
"技能点": ["机器学习", "特征工程", "模型评估"],
"难度": "高级",
"业务价值": "降低客户流失率"
},
{
"项目名称": "实时数据监控面板",
"技能点": ["Plotly Dash", "数据库", "实时更新"],
"难度": "高级",
"业务价值": "实时业务监控"
}
]
def portfolio_checklist():
"""作品集检查清单"""
print("\n作品集准备清单:")
print("□ 项目代码托管在GitHub")
print("□ 每个项目有详细README")
print("□ 包含数据样本或生成脚本")
print("□ 有可视化结果展示")
print("□ 有业务价值说明")
print("□ 代码有注释和文档")
print("□ 项目可复现")
portfolio_checklist()
3. 面试准备
# 常见面试题代码示例
# 1. 如何处理大数据集内存不足?
def handle_large_dataset():
"""处理大数据集的策略"""
strategies = [
"使用chunksize分块读取",
"优化数据类型减少内存",
"只读取需要的列",
"使用Dask并行处理",
"数据库预处理"
]
return strategies
# 2. 如何保证数据质量?
def data_quality_checklist():
"""数据质量检查清单"""
checks = [
"完整性检查:是否有缺失值",
"准确性检查:数据是否合理",
"一致性检查:数据格式统一",
"时效性检查:数据是否过期",
"唯一性检查:是否有重复"
]
return checks
# 3. 如何向非技术人员解释分析结果?
def explain_to_non_technical():
"""解释技巧"""
tips = [
"避免使用技术术语",
"用业务语言描述",
"强调行动建议而非技术细节",
"使用图表辅助说明",
"准备不同深度的版本"
]
return tips
# 4. 代码优化案例
def optimize_code_example():
"""代码优化示例"""
print("\n代码优化示例:")
print("❌ 低效:")
print(" result = []")
print(" for i in range(len(df)):")
print(" if df.loc[i, '销售额'] > 1000:")
print(" result.append(df.loc[i, '销售额'] * 1.1)")
print("\n✅ 高效:")
print(" result = df.loc[df['销售额'] > 1000, '销售额'] * 1.1")
optimize_code_example()
第十部分:完整实战案例:从数据到决策
案例:季度业务复盘报告
def quarterly_business_review():
"""
季度业务复盘完整流程
"""
print("\n" + "="*60)
print("季度业务复盘分析流程")
print("="*60)
# 1. 数据准备
print("\n1. 数据准备阶段")
print(" - 收集本季度销售数据")
print(" - 收集上季度数据用于对比")
print(" - 收集市场数据")
# 2. 数据清洗
print("\n2. 数据清洗阶段")
print(" - 处理缺失值")
print(" - 识别异常值")
print(" - 统一数据格式")
# 3. 核心指标计算
print("\n3. 核心指标计算")
metrics = {
'销售额增长率': '((本季度-上季度)/上季度)*100',
'客户留存率': '重复购买客户数/总客户数',
'产品线贡献度': '各产品销售额/总销售额',
'区域表现': '各区域销售额占比'
}
for k, v in metrics.items():
print(f" - {k}: {v}")
# 4. 问题诊断
print("\n4. 问题诊断")
print(" - 识别下降的产品/区域")
print(" - 分析客户流失原因")
print(" - 检查运营效率")
# 5. 机会识别
print("\n5. 机会识别")
print(" - 高增长潜力产品")
print(" - 未充分开发的区域")
print(" - 客户升级机会")
# 6. 行动计划
print("\n6. 行动计划")
print(" - 短期促销策略")
print(" - 产品优化建议")
print(" - 资源重新配置")
# 7. 汇报准备
print("\n7. 汇报准备")
print(" - 准备可视化图表")
print(" - 撰写执行摘要")
print(" - 准备Q&A")
print("\n" + "="*60)
print("复盘完成,准备向管理层汇报")
print("="*60)
quarterly_business_review()
结论:持续成长的建议
Python数据分析是一个不断发展的领域,要保持竞争力需要:
- 持续学习:关注新工具和最佳实践
- 业务理解:深入理解所在行业的业务逻辑
- 沟通能力:将技术结果转化为业务价值
- 项目经验:通过实际项目积累经验
- 社区参与:参与开源项目和技术社区
记住,技术只是工具,真正的价值在于通过数据洞察驱动业务决策。保持好奇心,持续实践,你一定能在数据分析领域取得成功!
附录:常用资源
- 官方文档:pandas.pydata.org, numpy.org
- 学习平台:Kaggle, DataCamp, Coursera
- 社区:Stack Overflow, GitHub, 知乎
- 书籍:《利用Python进行数据分析》、《Python数据科学手册》
祝你在数据分析的道路上越走越远!
