在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交平台还是金融交易系统,MySQL作为最流行的关系型数据库之一,如何有效应对高并发挑战是每个开发者和DBA必须掌握的技能。本文将从索引优化、查询优化、架构设计等多个维度,提供一套完整的MySQL高并发处理实战方案。
一、高并发场景下的性能瓶颈分析
1.1 什么是高并发?
高并发通常指系统在短时间内同时处理大量请求。例如:
- 电商秒杀:10万用户同时抢购1000件商品
- 社交热点:明星发布动态后百万级点赞/评论
- 金融交易:每秒数千笔交易处理
1.2 MySQL在高并发下的常见瓶颈
- CPU瓶颈:复杂查询、大量排序/分组操作
- IO瓶颈:磁盘读写频繁,尤其是随机IO
- 锁竞争:行锁、表锁导致的等待
- 连接数限制:max_connections配置不足
- 内存不足:缓冲池(Buffer Pool)命中率低
二、索引优化:高并发的基石
2.1 索引设计原则
2.1.1 最左前缀原则
-- 创建复合索引
CREATE INDEX idx_user_order ON orders(user_id, order_date, status);
-- 有效使用索引的查询
SELECT * FROM orders WHERE user_id = 1001; -- ✅ 使用索引
SELECT * FROM orders WHERE user_id = 1001 AND order_date = '2023-10-01'; -- ✅ 使用索引
SELECT * FROM orders WHERE user_id = 1001 AND status = 'paid'; -- ✅ 使用索引(部分)
-- 无法使用索引的查询
SELECT * FROM orders WHERE order_date = '2023-10-01'; -- ❌ 无法使用索引
SELECT * FROM orders WHERE status = 'paid'; -- ❌ 无法使用索引
2.1.2 覆盖索引(Covering Index)
-- 创建覆盖索引
CREATE INDEX idx_cover ON orders(user_id, order_date, status, amount);
-- 查询只需要扫描索引,无需回表
SELECT user_id, order_date, status, amount
FROM orders
WHERE user_id = 1001 AND order_date >= '2023-10-01';
2.1.3 索引选择性优化
-- 查看列的选择性(唯一值/总行数)
SELECT
COUNT(DISTINCT status) / COUNT(*) AS selectivity,
COUNT(DISTINCT status) AS distinct_count
FROM orders;
-- 选择性低于0.1的列不适合单独建索引
-- 选择性高的列(如user_id)优先建索引
2.2 索引优化实战案例
案例1:电商订单查询优化
-- 原始查询(无索引,全表扫描)
SELECT * FROM orders
WHERE user_id = 1001
AND order_date BETWEEN '2023-01-01' AND '2023-12-31'
AND status IN ('paid', 'shipped')
ORDER BY order_date DESC
LIMIT 10;
-- 优化方案1:创建复合索引
CREATE INDEX idx_user_date_status ON orders(user_id, order_date DESC, status);
-- 优化方案2:如果需要分页,避免深度分页问题
SELECT * FROM orders
WHERE user_id = 1001
AND order_date BETWEEN '2023-01-01' AND '2023-12-31'
AND status IN ('paid', 'shipped')
AND order_date < '2023-06-01' -- 使用上一页的最后一条记录
ORDER BY order_date DESC
LIMIT 10;
案例2:多条件组合查询
-- 场景:用户搜索订单,支持多条件组合
-- 条件:用户ID、订单状态、时间范围、金额范围
-- 优化前:多个单列索引,MySQL可能只使用一个
CREATE INDEX idx_user ON orders(user_id);
CREATE INDEX idx_status ON orders(status);
CREATE INDEX idx_date ON orders(order_date);
-- 优化后:创建智能复合索引
-- 分析查询模式,创建多个复合索引覆盖常见查询
CREATE INDEX idx_user_status_date ON orders(user_id, status, order_date);
CREATE INDEX idx_user_date_amount ON orders(user_id, order_date, amount);
CREATE INDEX idx_status_date ON orders(status, order_date);
-- 使用EXPLAIN分析执行计划
EXPLAIN SELECT * FROM orders
WHERE user_id = 1001
AND status = 'paid'
AND order_date >= '2023-10-01';
2.3 索引维护与监控
-- 查看索引使用情况
SELECT
table_name,
index_name,
stat_value,
stat_description
FROM mysql.innodb_index_stats
WHERE database_name = 'your_database'
AND table_name = 'orders';
-- 查看未使用的索引(MySQL 8.0+)
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.INDEX_NAME,
s.ROWS_READ,
s.ROWS_INSERTED,
s.ROWS_UPDATED,
s.ROWS_DELETED
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage s
ON t.TABLE_SCHEMA = s.OBJECT_SCHEMA
AND t.TABLE_NAME = s.OBJECT_NAME
AND t.INDEX_NAME = s.INDEX_NAME
WHERE t.TABLE_SCHEMA = 'your_database'
AND s.ROWS_READ = 0
AND s.ROWS_INSERTED = 0
AND s.ROWS_UPDATED = 0
AND s.ROWS_DELETED = 0;
三、查询优化:提升执行效率
3.1 避免全表扫描
3.1.1 WHERE条件优化
-- ❌ 避免在索引列上使用函数
SELECT * FROM orders WHERE DATE(order_date) = '2023-10-01';
-- ✅ 优化为范围查询
SELECT * FROM orders WHERE order_date BETWEEN '2023-10-01' AND '2023-10-02';
-- ❌ 避免使用LIKE前缀通配符
SELECT * FROM users WHERE username LIKE '%john%';
-- ✅ 如果必须使用,考虑全文索引
ALTER TABLE users ADD FULLTEXT INDEX ft_username (username);
SELECT * FROM users WHERE MATCH(username) AGAINST('john');
3.1.2 JOIN优化
-- ❌ 避免笛卡尔积
SELECT * FROM orders o, users u WHERE o.user_id = u.id;
-- ✅ 明确JOIN类型和条件
SELECT o.*, u.username
FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE o.status = 'paid';
-- ✅ 小表驱动大表
-- orders表100万行,users表10万行
-- 应该用users驱动orders
SELECT o.*, u.username
FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active';
3.2 分页优化
3.2.1 深度分页问题
-- ❌ 传统分页(越往后越慢)
SELECT * FROM orders ORDER BY order_date DESC LIMIT 1000000, 10;
-- ✅ 优化方案1:使用子查询
SELECT * FROM orders
WHERE order_date <= (
SELECT order_date FROM orders
ORDER BY order_date DESC
LIMIT 1000000, 1
)
ORDER BY order_date DESC
LIMIT 10;
-- ✅ 优化方案2:使用游标分页(推荐)
-- 第一页
SELECT * FROM orders
WHERE order_date >= '2023-01-01'
ORDER BY order_date DESC
LIMIT 10;
-- 第二页(记录上一页最后一条的order_date)
SELECT * FROM orders
WHERE order_date < '2023-01-01 12:00:00' -- 上一页最后一条的order_date
ORDER BY order_date DESC
LIMIT 10;
3.3 批量操作优化
-- ❌ 逐条插入(1000条记录)
INSERT INTO orders (user_id, amount, status) VALUES (1, 100, 'paid');
INSERT INTO orders (user_id, amount, status) VALUES (2, 200, 'paid');
-- ... 重复1000次
-- ✅ 批量插入
INSERT INTO orders (user_id, amount, status) VALUES
(1, 100, 'paid'),
(2, 200, 'paid'),
(3, 300, 'paid'),
-- ... 1000条记录
(1000, 1000, 'paid');
-- ✅ 使用LOAD DATA INFILE(最快)
LOAD DATA LOCAL INFILE '/tmp/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount, status);
四、架构优化:从单机到分布式
4.1 读写分离架构
4.1.1 基于Proxy的读写分离
# 使用Python + PyMySQL实现读写分离
import pymysql
from pymysql import connections
class ReadWriteSplittingProxy:
def __init__(self):
# 主库配置(写操作)
self.master_config = {
'host': 'master.example.com',
'user': 'root',
'password': 'password',
'database': 'app_db'
}
# 从库配置(读操作)
self.slave_configs = [
{'host': 'slave1.example.com', 'user': 'root', 'password': 'password', 'database': 'app_db'},
{'host': 'slave2.example.com', 'user': 'root', 'password': 'password', 'database': 'app_db'}
]
self.current_slave_index = 0
def get_connection(self, is_write=False):
"""获取数据库连接"""
if is_write:
# 写操作连接主库
return pymysql.connect(**self.master_config)
else:
# 读操作轮询从库
slave_config = self.slave_configs[self.current_slave_index]
self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_configs)
return pymysql.connect(**slave_config)
def execute_query(self, sql, params=None, is_write=False):
"""执行查询"""
conn = self.get_connection(is_write)
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
if is_write:
conn.commit()
return cursor.rowcount
else:
return cursor.fetchall()
finally:
conn.close()
# 使用示例
proxy = ReadWriteSplittingProxy()
# 写操作(主库)
proxy.execute_query(
"INSERT INTO orders (user_id, amount) VALUES (%s, %s)",
(1001, 150.00),
is_write=True
)
# 读操作(从库轮询)
results = proxy.execute_query(
"SELECT * FROM orders WHERE user_id = %s",
(1001,),
is_write=False
)
4.1.2 基于中间件的读写分离(ShardingSphere)
# ShardingSphere配置示例
spring:
shardingsphere:
datasource:
names: master,slave1,slave2
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master.example.com:3306/app_db
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1.example.com:3306/app_db
username: root
password: password
slave2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave2.example.com:3306/app_db
username: root
password: password
rules:
readwrite-splitting:
data-sources:
ds0:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave1,slave2
load-balancer-name: round_robin
props:
sql-show: true
4.2 分库分表策略
4.2.1 水平分表(Sharding)
-- 按用户ID取模分表
-- orders_0, orders_1, orders_2, orders_3
-- 创建分表函数
CREATE FUNCTION get_shard_table(user_id INT) RETURNS VARCHAR(50)
DETERMINISTIC
BEGIN
DECLARE shard_num INT;
SET shard_num = user_id % 4;
RETURN CONCAT('orders_', shard_num);
END;
-- 查询时动态选择表
SET @table_name = get_shard_table(1001);
SET @sql = CONCAT('SELECT * FROM ', @table_name, ' WHERE user_id = 1001');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
4.2.2 垂直分表
-- 原始大表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id INT,
amount DECIMAL(10,2),
status VARCHAR(20),
-- 其他20个字段...
created_at DATETIME,
updated_at DATETIME
);
-- 垂直拆分:订单基本信息 + 订单详情
CREATE TABLE orders_base (
id BIGINT PRIMARY KEY,
user_id INT,
amount DECIMAL(10,2),
status VARCHAR(20),
created_at DATETIME,
updated_at DATETIME
);
CREATE TABLE orders_detail (
order_id BIGINT PRIMARY KEY,
shipping_address TEXT,
payment_method VARCHAR(50),
-- 其他详情字段...
FOREIGN KEY (order_id) REFERENCES orders_base(id)
);
4.3 缓存层优化
4.3.1 Redis缓存策略
import redis
import json
from functools import wraps
class RedisCache:
def __init__(self):
self.redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
def cache_query(self, ttl=300):
"""查询缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"query:{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,执行查询
result = func(*args, **kwargs)
# 写入缓存
self.redis_client.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
return wrapper
return decorator
# 使用示例
cache = RedisCache()
@cache.cache_query(ttl=60)
def get_user_orders(user_id, limit=10):
"""查询用户订单(带缓存)"""
conn = pymysql.connect(**db_config)
try:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC LIMIT %s",
(user_id, limit)
)
return cursor.fetchall()
finally:
conn.close()
# 首次查询会执行SQL,后续查询直接从Redis返回
orders = get_user_orders(1001, 10)
4.3.2 缓存穿透/雪崩防护
class SafeRedisCache(RedisCache):
def get_with_fallback(self, key, fallback_func, ttl=300, lock_ttl=10):
"""带防护的缓存获取"""
# 1. 尝试获取缓存
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
# 2. 获取分布式锁,防止缓存击穿
lock_key = f"lock:{key}"
lock_acquired = self.redis_client.set(
lock_key,
"1",
ex=lock_ttl,
nx=True # 仅当key不存在时设置
)
if lock_acquired:
try:
# 3. 双重检查(防止并发)
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
# 4. 执行查询
result = fallback_func()
# 5. 写入缓存(空值也缓存,防止穿透)
if result is None:
# 缓存空值,设置较短过期时间
self.redis_client.setex(key, 60, json.dumps(None))
else:
self.redis_client.setex(key, ttl, json.dumps(result))
return result
finally:
# 6. 释放锁
self.redis_client.delete(lock_key)
else:
# 7. 等待并重试
import time
time.sleep(0.1)
return self.get_with_fallback(key, fallback_func, ttl, lock_ttl)
五、MySQL配置优化
5.1 关键参数配置
# my.cnf 配置示例(MySQL 8.0)
[mysqld]
# 基础配置
port = 3306
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
# 内存配置(根据服务器内存调整)
innodb_buffer_pool_size = 8G # 通常设置为总内存的50-70%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G # 日志文件大小
innodb_log_buffer_size = 16M # 日志缓冲区
# 连接配置
max_connections = 1000 # 最大连接数
max_connect_errors = 100000
thread_cache_size = 100 # 线程缓存
# 查询缓存(MySQL 8.0已移除,5.7及以下版本)
# query_cache_type = 0 # 建议关闭
# InnoDB配置
innodb_flush_log_at_trx_commit = 2 # 1:每次提交都刷盘(安全),2:每秒刷盘(性能)
innodb_flush_method = O_DIRECT # 直接IO,避免双缓冲
innodb_file_per_table = ON # 每个表独立文件
innodb_read_io_threads = 8 # 读线程
innodb_write_io_threads = 8 # 写线程
# 日志配置
slow_query_log = ON
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2 # 慢查询阈值(秒)
log_queries_not_using_indexes = ON
# 复制配置(主从)
server_id = 1 # 主库ID
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW # 行级复制
expire_logs_days = 7 # 日志保留天数
5.2 动态参数调整
-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'max_connections';
-- 动态调整(无需重启)
SET GLOBAL innodb_buffer_pool_size = 8589934592; -- 8GB
SET GLOBAL max_connections = 1000;
-- 查看性能状态
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read_requests';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_reads';
-- 计算缓冲池命中率
SELECT
(1 - (Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests)) * 100 AS hit_rate
FROM (
SELECT
MAX(CASE WHEN VARIABLE_NAME = 'Innodb_buffer_pool_reads' THEN VARIABLE_VALUE END) AS Innodb_buffer_pool_reads,
MAX(CASE WHEN VARIABLE_NAME = 'Innodb_buffer_pool_read_requests' THEN VARIABLE_VALUE END) AS Innodb_buffer_pool_read_requests
FROM performance_schema.global_status
) AS stats;
六、监控与告警
6.1 性能监控指标
-- 1. 连接数监控
SELECT
COUNT(*) AS total_connections,
SUM(CASE WHEN COMMAND = 'Sleep' THEN 1 ELSE 0 END) AS idle_connections,
SUM(CASE WHEN COMMAND != 'Sleep' THEN 1 ELSE 0 END) AS active_connections
FROM information_schema.PROCESSLIST;
-- 2. 慢查询监控
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
SUM_ROWS_EXAMINED,
SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME = 'your_database'
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 3. 锁等待监控
SELECT
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;
6.2 自动化监控脚本
#!/usr/bin/env python3
"""
MySQL性能监控脚本
"""
import pymysql
import time
import smtplib
from email.mime.text import MIMEText
class MySQLMonitor:
def __init__(self, db_config, alert_thresholds):
self.db_config = db_config
self.thresholds = alert_thresholds
def check_connections(self):
"""检查连接数"""
conn = pymysql.connect(**self.db_config)
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
VARIABLE_VALUE as max_conn,
(SELECT COUNT(*) FROM information_schema.PROCESSLIST) as current_conn
FROM performance_schema.global_variables
WHERE VARIABLE_NAME = 'max_connections'
""")
result = cursor.fetchone()
max_conn, current_conn = result
# 计算使用率
usage_rate = (current_conn / max_conn) * 100
if usage_rate > self.thresholds['connection_usage']:
self.send_alert(
f"连接数告警: {current_conn}/{max_conn} ({usage_rate:.1f}%)",
f"当前连接数: {current_conn}\n最大连接数: {max_conn}\n使用率: {usage_rate:.1f}%"
)
return usage_rate
finally:
conn.close()
def check_slow_queries(self):
"""检查慢查询"""
conn = pymysql.connect(**self.db_config)
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT
COUNT(*) as slow_count,
AVG(AVG_TIMER_WAIT/1000000000000) as avg_time
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME = 'your_database'
AND AVG_TIMER_WAIT > %s
""", (self.thresholds['slow_query_time'] * 1000000000000,))
result = cursor.fetchone()
slow_count, avg_time = result
if slow_count > self.thresholds['slow_query_count']:
self.send_alert(
f"慢查询告警: {slow_count}条",
f"平均执行时间: {avg_time:.2f}秒\n阈值: {self.thresholds['slow_query_time']}秒"
)
return slow_count
finally:
conn.close()
def send_alert(self, subject, body):
"""发送告警邮件"""
# 配置邮件服务器
smtp_server = "smtp.example.com"
smtp_port = 587
sender = "monitor@example.com"
password = "password"
receiver = "dba@example.com"
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = receiver
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender, password)
server.send_message(msg)
server.quit()
print(f"告警已发送: {subject}")
except Exception as e:
print(f"发送告警失败: {e}")
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'monitor',
'password': 'password',
'database': 'mysql'
}
thresholds = {
'connection_usage': 80, # 连接使用率超过80%告警
'slow_query_time': 2, # 慢查询阈值2秒
'slow_query_count': 10 # 慢查询数量超过10条告警
}
monitor = MySQLMonitor(db_config, thresholds)
# 定时监控(示例:每5分钟检查一次)
while True:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 开始监控...")
monitor.check_connections()
monitor.check_slow_queries()
time.sleep(300) # 5分钟
七、实战案例:电商秒杀系统
7.1 系统架构设计
用户请求 → Nginx负载均衡 → 应用服务器集群 → Redis缓存层 → MySQL主从集群
↓
消息队列(异步处理)
7.2 核心代码实现
7.2.1 秒杀下单接口
import redis
import pymysql
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
class SeckillService:
def __init__(self):
# Redis连接
self.redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
# 数据库连接池
self.db_pool = pymysql.pool.SimpleConnectionPool(
1, 20, # 最小连接数,最大连接数
host='localhost',
user='root',
password='password',
database='seckill_db',
charset='utf8mb4'
)
# 线程池
self.thread_pool = ThreadPoolExecutor(max_workers=10)
def seckill(self, user_id, product_id, quantity):
"""秒杀下单"""
# 1. 参数校验
if quantity <= 0:
return {'success': False, 'message': '数量必须大于0'}
# 2. 检查用户是否已秒杀
user_key = f"seckill:user:{user_id}:product:{product_id}"
if self.redis_client.exists(user_key):
return {'success': False, 'message': '您已参与过该秒杀'}
# 3. 检查库存(使用Redis原子操作)
stock_key = f"seckill:stock:{product_id}"
stock = self.redis_client.decr(stock_key)
if stock < 0:
# 库存不足,恢复库存
self.redis_client.incr(stock_key)
return {'success': False, 'message': '库存不足'}
# 4. 异步创建订单(避免阻塞)
future = self.thread_pool.submit(
self._create_order_async,
user_id, product_id, quantity
)
# 5. 记录用户参与
self.redis_client.setex(user_key, 3600, '1')
return {
'success': True,
'message': '秒杀成功,订单处理中',
'order_id': f"SECKILL_{user_id}_{product_id}_{int(time.time())}"
}
def _create_order_async(self, user_id, product_id, quantity):
"""异步创建订单"""
conn = None
try:
conn = self.db_pool.get_connection()
conn.autocommit(False) # 开启事务
with conn.cursor() as cursor:
# 1. 扣减数据库库存(乐观锁)
cursor.execute("""
UPDATE products
SET stock = stock - %s,
version = version + 1
WHERE id = %s
AND stock >= %s
AND version = (
SELECT version FROM products WHERE id = %s
)
""", (quantity, product_id, quantity, product_id))
if cursor.rowcount == 0:
conn.rollback()
# 库存不足,恢复Redis库存
self.redis_client.incrby(
f"seckill:stock:{product_id}",
quantity
)
return False
# 2. 创建订单
order_id = f"ORDER_{user_id}_{product_id}_{int(time.time())}"
cursor.execute("""
INSERT INTO orders
(id, user_id, product_id, quantity, status, created_at)
VALUES (%s, %s, %s, %s, 'pending', NOW())
""", (order_id, user_id, product_id, quantity))
# 3. 记录订单详情
cursor.execute("""
INSERT INTO order_items
(order_id, product_id, quantity, price)
SELECT %s, id, %s, price
FROM products WHERE id = %s
""", (order_id, quantity, product_id))
conn.commit()
# 4. 发送消息到MQ(异步处理支付、物流等)
self._send_to_mq({
'order_id': order_id,
'user_id': user_id,
'product_id': product_id,
'quantity': quantity,
'timestamp': datetime.now().isoformat()
})
return True
except Exception as e:
if conn:
conn.rollback()
# 恢复Redis库存
self.redis_client.incrby(
f"seckill:stock:{product_id}",
quantity
)
print(f"订单创建失败: {e}")
return False
finally:
if conn:
conn.close()
def _send_to_mq(self, message):
"""发送消息到消息队列"""
# 这里可以使用RabbitMQ、Kafka等
# 简化示例:写入Redis列表
self.redis_client.rpush('order_queue', json.dumps(message))
def preload_cache(self, product_id, stock):
"""预热缓存"""
# 秒杀开始前预热库存到Redis
stock_key = f"seckill:stock:{product_id}"
self.redis_client.set(stock_key, stock)
# 设置过期时间(秒杀结束后清理)
self.redis_client.expire(stock_key, 3600) # 1小时
# 预热商品信息
product_key = f"seckill:product:{product_id}"
conn = self.db_pool.get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(
"SELECT * FROM products WHERE id = %s",
(product_id,)
)
product = cursor.fetchone()
if product:
self.redis_client.setex(
product_key,
3600,
json.dumps(product)
)
finally:
conn.close()
7.2.2 库存预热脚本
#!/usr/bin/env python3
"""
秒杀库存预热脚本
"""
import pymysql
import redis
import time
def preload_seckill_stock():
"""预热秒杀商品库存到Redis"""
# 数据库连接
db_conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='seckill_db'
)
# Redis连接
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0
)
try:
with db_conn.cursor() as cursor:
# 查询秒杀商品
cursor.execute("""
SELECT id, seckill_stock, seckill_price
FROM products
WHERE seckill_start <= NOW()
AND seckill_end >= NOW()
AND seckill_stock > 0
""")
products = cursor.fetchall()
for product in products:
product_id, stock, price = product
# 预热库存
stock_key = f"seckill:stock:{product_id}"
redis_client.set(stock_key, stock)
redis_client.expire(stock_key, 3600) # 1小时
# 预热商品信息
product_key = f"seckill:product:{product_id}"
redis_client.setex(
product_key,
3600,
f'{{"id":{product_id},"price":{price}}}'
)
print(f"预热商品 {product_id}: 库存 {stock}, 价格 {price}")
print(f"预热完成,共 {len(products)} 个商品")
finally:
db_conn.close()
if __name__ == "__main__":
preload_seckill_stock()
八、总结与最佳实践
8.1 高并发优化 checklist
索引优化
- [ ] 确保所有查询都有合适的索引
- [ ] 定期检查并删除未使用的索引
- [ ] 使用覆盖索引减少回表
- [ ] 避免索引列上的函数操作
查询优化
- [ ] 避免SELECT *
- [ ] 使用EXPLAIN分析执行计划
- [ ] 优化JOIN操作(小表驱动大表)
- [ ] 避免深度分页,使用游标分页
架构优化
- [ ] 实施读写分离
- [ ] 考虑分库分表(按业务维度)
- [ ] 引入缓存层(Redis)
- [ ] 使用消息队列异步处理
配置优化
- [ ] 调整innodb_buffer_pool_size
- [ ] 优化连接池配置
- [ ] 设置合理的慢查询阈值
- [ ] 启用慢查询日志
监控告警
- [ ] 监控连接数使用率
- [ ] 监控慢查询数量
- [ ] 监控锁等待情况
- [ ] 设置自动告警机制
8.2 性能优化原则
- 80/20法则:80%的性能问题来自20%的慢查询
- 分层优化:从应用层→缓存层→数据库层逐层优化
- 数据驱动:基于监控数据做优化决策
- 渐进式优化:每次只做一个改动,验证效果
- 预防为主:在设计阶段就考虑高并发场景
8.3 推荐工具
- 监控工具:Percona Monitoring and Management (PMM)、Prometheus + Grafana
- 慢查询分析:pt-query-digest、MySQL Workbench
- 压力测试:sysbench、JMeter
- 架构设计:ShardingSphere、Vitess
九、常见问题与解决方案
Q1:如何应对突发流量?
A:采用限流+降级+熔断策略
# 使用令牌桶限流
import time
from collections import deque
class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity # 桶容量
self.tokens = capacity # 当前令牌数
self.refill_rate = refill_rate # 每秒补充速率
self.last_refill = time.time()
def try_acquire(self):
now = time.time()
# 补充令牌
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
# 使用示例
bucket = TokenBucket(capacity=100, refill_rate=10) # 每秒10个请求
def handle_request(user_id):
if bucket.try_acquire():
# 处理请求
return process_seckill(user_id)
else:
# 限流返回
return {'success': False, 'message': '请求过于频繁,请稍后重试'}
Q2:如何保证数据一致性?
A:采用最终一致性方案
-- 1. 使用消息表记录操作
CREATE TABLE operation_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
operation_type VARCHAR(50),
business_id VARCHAR(100),
operation_data JSON,
status ENUM('pending', 'processing', 'completed', 'failed'),
created_at DATETIME,
updated_at DATETIME,
INDEX idx_status (status),
INDEX idx_business (business_id)
);
-- 2. 定时任务补偿
DELIMITER $$
CREATE PROCEDURE compensate_operations()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE op_id BIGINT;
DECLARE op_type VARCHAR(50);
DECLARE business_id VARCHAR(100);
DECLARE op_data JSON;
DECLARE cur CURSOR FOR
SELECT id, operation_type, business_id, operation_data
FROM operation_log
WHERE status = 'failed'
AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY created_at ASC
LIMIT 100;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
read_loop: LOOP
FETCH cur INTO op_id, op_type, business_id, op_data;
IF done THEN
LEAVE read_loop;
END IF;
-- 重试逻辑
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
-- 记录失败
UPDATE operation_log
SET status = 'failed',
updated_at = NOW()
WHERE id = op_id;
END;
-- 执行补偿操作
IF op_type = 'update_stock' THEN
-- 补偿库存更新
UPDATE products
SET stock = stock + JSON_EXTRACT(op_data, '$.quantity')
WHERE id = JSON_EXTRACT(op_data, '$.product_id');
END IF;
-- 标记完成
UPDATE operation_log
SET status = 'completed',
updated_at = NOW()
WHERE id = op_id;
END;
END LOOP;
CLOSE cur;
END$$
DELIMITER ;
通过以上全面的优化策略和实战案例,您可以系统地提升MySQL在高并发场景下的性能表现。记住,性能优化是一个持续的过程,需要根据实际业务场景和监控数据不断调整优化策略。
