引言:理解高并发场景下的MySQL挑战
在当今互联网应用中,高并发场景已经成为常态。无论是电商秒杀、社交网络的热点事件,还是金融交易系统,MySQL数据库都面临着前所未有的挑战。当每秒查询量(QPS)达到数千甚至数万时,数据库的性能瓶颈往往成为系统稳定性的关键因素。
高并发对MySQL的主要挑战包括:
- 连接数耗尽:大量并发连接导致无法建立新连接
- CPU资源竞争:频繁的上下文切换和锁竞争
- I/O瓶颈:磁盘读写速度跟不上内存操作速度
- 锁等待:行锁、表锁导致的长时间等待
- 缓存失效:热点数据频繁更新导致缓存穿透
本文将从索引优化、查询优化、架构设计三个层面,全面解析MySQL应对流量洪峰的策略。
一、索引优化:性能提升的第一道防线
1.1 索引设计的核心原则
索引是MySQL性能优化的基石。正确的索引设计可以将查询性能提升几个数量级。
1.1.1 最左前缀原则
对于复合索引,MySQL会遵循最左前缀原则。例如,创建索引:
CREATE INDEX idx_user_name_age ON users(name, age);
这个索引可以被以下查询有效利用:
-- ✅ 使用索引
SELECT * FROM users WHERE name = '张三';
SELECT * FROM users WHERE name = '张三' AND age = 25;
-- ❌ 无法使用索引
SELECT * FROM users WHERE age = 25;
1.1.2 覆盖索引优化
覆盖索引是指查询所需的所有列都在索引中,避免回表操作。
示例:订单查询优化
-- 原始查询(需要回表)
SELECT order_id, user_id, amount FROM orders
WHERE user_id = 1001 AND status = 'paid';
-- 优化方案1:创建覆盖索引
CREATE INDEX idx_user_status_amount ON orders(user_id, status, amount);
-- 优化方案2:使用延迟关联(大表特别有效)
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
WHERE user_id = 1001 AND status = 'paid'
LIMIT 100
) AS tmp ON o.order_id = tmp.order_id;
1.2 索引失效的常见场景
1.2.1 隐式类型转换
-- 假设user_id是VARCHAR类型
-- ❌ 索引失效
SELECT * FROM users WHERE user_id = 12345;
-- ✅ 索引生效
SELECT * FROM users WHERE user_id = '12345';
1.2.2 函数操作导致索引失效
-- ❌ 索引失效(对索引列使用函数)
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- ✅ 索引生效(改写为范围查询)
SELECT * FROM orders
WHERE create_time >= '2024-01-01 00:00:00'
AND create_time < '2024-01-02 00:00:00';
1.2.3 LIKE查询优化
-- ❌ 索引失效(前缀模糊匹配)
SELECT * FROM users WHERE name LIKE '%三';
-- ✅ 索引生效(后缀模糊匹配)
SELECT * FROM users WHERE name LIKE '张%';
-- ✅ 全文索引方案(适用于大量文本搜索)
ALTER TABLE users ADD FULLTEXT INDEX ft_name (name);
SELECT * FROM users WHERE MATCH(name) AGAINST('三' IN BOOLEAN MODE);
1.3 索引维护的最佳实践
1.3.1 索引选择性计算
-- 计算列的选择性(越接近1越好)
SELECT
COUNT(DISTINCT status) / COUNT(*) AS selectivity
FROM orders;
-- 查看表中索引的使用情况
SELECT
object_schema,
object_name,
index_name,
count_star,
count_read,
count_write
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE object_schema = 'your_database'
ORDER BY count_star DESC;
1.3.2 删除无用索引
-- 查找从未使用过的索引
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.INDEX_NAME
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage u
ON t.TABLE_SCHEMA = u.OBJECT_SCHEMA
AND t.TABLE_NAME = u.OBJECT_NAME
AND t.INDEX_NAME = u.INDEX_NAME
WHERE u.INDEX_NAME IS NULL
AND t.TABLE_SCHEMA = 'your_database';
二、查询优化:让SQL飞起来
2.1 执行计划分析
2.1.1 EXPLAIN详解
-- 基础使用
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;
-- 详细格式
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 1001;
执行计划关键字段解读:
- type:访问类型(ALL > index > range > ref > eq_ref > const > system)
- key:实际使用的索引
- rows:预估扫描行数
- Extra:额外信息(Using index, Using where, Using filesort等)
2.1.2 慢查询日志分析
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1; -- 超过1秒的查询
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
2.2 高效查询模式
2.2.1 分页优化
-- ❌ 传统分页(深度分页性能差)
SELECT * FROM orders WHERE user_id = 1001
ORDER BY create_time DESC
LIMIT 1000000, 20;
-- ✅ 优化方案1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
WHERE user_id = 1001
ORDER BY create_time DESC
LIMIT 1000000, 20
) AS tmp ON o.order_id = tmp.order_id;
-- ✅ 优化方案2:位置记录法(适用于实时性要求不高的场景)
SELECT * FROM orders
WHERE user_id = 1001
AND create_time < '2024-01-01 00:00:00' -- 上一页最后一条的时间
ORDER BY create_time DESC
LIMIT 20;
2.2.2 IN vs EXISTS
-- ❌ 数据量大时性能差
SELECT * FROM users WHERE id IN (
SELECT user_id FROM orders WHERE amount > 1000
);
-- ✅ 使用EXISTS(半连接优化)
SELECT * FROM users u WHERE EXISTS (
SELECT 1 FROM orders o WHERE o.user_id = u.id AND o.amount > 1000
);
-- ✅ 更优:使用JOIN
SELECT DISTINCT u.* FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE o.amount > 1000;
2.2.3 UNION vs UNION ALL
-- ❌ UNION会去重,性能较差
SELECT * FROM orders WHERE status = 'paid'
UNION
SELECT * FROM orders WHERE status = 'completed';
-- ✅ UNION ALL不去重,性能更好(如果业务允许重复)
SELECT * FROM orders WHERE status = 'paid'
UNION ALL
SELECT * FROM orders WHERE status = 'completed';
2.3 大事务优化
2.3.1 大事务拆分
-- ❌ 大事务(可能导致锁等待超时)
START TRANSACTION;
UPDATE users SET balance = balance - 100 WHERE id = 1;
UPDATE users SET balance = balance + 100 WHERE id = 2;
UPDATE orders SET status = 'completed' WHERE order_id = 1001;
-- ... 可能还有更多操作
COMMIT;
-- ✅ 拆分为小事务
START TRANSACTION;
UPDATE users SET balance = balance - 100 WHERE id = 1;
COMMIT;
START TRANSACTION;
UPDATE users SET balance = balance + 100 WHERE id = 2;
COMMIT;
START TRANSACTION;
UPDATE orders SET status = 'completed' WHERE order_id = 1001;
COMMIT;
2.3.2 批量操作优化
-- ❌ 循环单条插入(性能差)
INSERT INTO logs (user_id, action, create_time) VALUES (1, 'login', NOW());
INSERT INTO logs (user_id, action, create_time) VALUES (2, 'login', NOW());
-- ... 循环1000次
-- ✅ 批量插入
INSERT INTO logs (user_id, action, create_time) VALUES
(1, 'login', NOW()),
(2, 'login', NOW()),
(3, 'login', NOW()),
-- ... 一次插入1000条
(1000, 'login', NOW());
-- ✅ 使用LOAD DATA INFILE(超大数据量)
LOAD DATA LOCAL INFILE '/tmp/logs.csv'
INTO TABLE logs
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, action, create_time);
2.4 并发控制策略
2.4.1 悲观锁 vs 乐观锁
悲观锁(适用于写冲突频繁场景)
-- 显式加锁
SELECT * FROM inventory WHERE product_id = 1001 FOR UPDATE;
-- 执行业务逻辑
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001;
乐观锁(适用于读多写少场景)
-- 添加版本号字段
ALTER TABLE inventory ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本
UPDATE inventory
SET stock = stock - 1, version = version + 1
WHERE product_id = 1001 AND version = #{oldVersion};
-- 影响行数为0时重试
2.4.2 死锁预防
-- 统一加锁顺序(避免循环等待)
-- 场景:转账操作
-- ❌ 错误:不同事务加锁顺序不一致
-- 事务1: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;
-- 事务2: UPDATE users SET balance = balance - 100 WHERE id = 2; UPDATE users SET balance = balance + 100 WHERE id = 1;
-- ✅ 正确:按ID顺序加锁
-- 事务1: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;
-- 事务2: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;
三、架构设计:从单机到分布式
3.1 读写分离架构
3.1.1 主从复制原理
MySQL主从复制基于binlog,有三种日志格式:
- STATEMENT:记录SQL语句(可能导致主从不一致)
- ROW:记录行变更(推荐,更安全)
- MIXED:混合模式
3.1.2 主从配置示例
# 主库配置 (my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
# 从库配置
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1
3.1.3 主从搭建步骤
-- 主库:创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
-- 主库:锁定表并记录binlog位置
FLUSH TABLES WITH READ LOCK;
SHOW MASTER STATUS; -- 记录File和Position
-- 从库:配置主库信息
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=1234;
-- 从库:启动复制
START SLAVE;
SHOW SLAVE STATUS\G -- 检查Slave_IO_Running和Slave_SQL_Running
3.1.4 应用层读写分离
# Python示例:使用SQLAlchemy实现读写分离
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class RoutingSession:
def __init__(self):
self.master_engine = create_engine('mysql://master/db')
self.slave_engine = create_engine('mysql://slave/db')
def get_session(self, write=False):
if write:
return Session(self.master_engine)
return Session(self.slave_engine)
# 使用示例
session = routing_session.get_session(write=True)
session.execute("INSERT INTO users (name) VALUES ('张三')")
session = routing_session.get_session(write=False)
result = session.execute("SELECT * FROM users WHERE id = 1")
3.2 分库分表架构
3.2.1 垂直拆分
按业务模块拆分
-- 原始单库
-- 用户库:users, user_profiles, user_settings
-- 订单库:orders, order_items, order_logs
-- 商品库:products, product_categories, product_inventory
-- 拆分后
-- user_db.users
-- user_db.user_profiles
-- order_db.orders
-- order_db.order_items
-- product_db.products
-- product_db.product_categories
3.2.2 水平拆分
按用户ID取模分片
-- 分片规则:user_id % 4
-- 分片0: user_id % 4 = 0
-- 分片1: user_id % 4 = 1
-- 分片2: user_id % 4 = 2
-- 分片3: user_id % 4 = 3
-- 分片表结构
CREATE TABLE users_0 (
id BIGINT PRIMARY KEY,
name VARCHAR(50),
email VARCHAR(100),
-- 其他字段
INDEX idx_email (email)
) ENGINE=InnoDB;
CREATE TABLE users_1 LIKE users_0;
CREATE TABLE users_2 LIKE users_0;
CREATE TABLE users_3 LIKE users_0;
3.2.3 分库分表中间件
ShardingSphere-JDBC配置示例
# application.yml
spring:
shardingsphere:
datasource:
names: ds0, ds1, ds2, ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/db0
username: root
password: password
# ds1, ds2, ds3 类似配置
rules:
sharding:
tables:
users:
actual-data-nodes: ds${0..3}.users_${0..3}
table-strategy:
standard:
sharding-column: id
sharding-algorithm-name: user-mod
database-strategy:
standard:
sharding-column: id
sharding-algorithm-name: db-mod
sharding-algorithms:
user-mod:
type: MOD
props:
sharding-count: 4
db-mod:
type: MOD
props:
sharding-count: 4
3.3 缓存策略
3.3.1 多级缓存架构
用户请求 → CDN → Nginx缓存 → 应用缓存(Redis) → MySQL
3.3.2 Redis缓存示例
import redis
import json
class CacheService:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379)
def get_user(self, user_id):
cache_key = f"user:{user_id}"
# 1. 先查缓存
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查数据库
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 3. 写入缓存(设置过期时间)
if user:
self.redis_client.setex(
cache_key,
3600, # 1小时过期
json.dumps(user)
)
return user
def update_user(self, user_id, data):
# 更新数据库
db.execute("UPDATE users SET name = %s WHERE id = %s",
data['name'], user_id)
# 删除缓存(Cache Aside模式)
self.redis_client.delete(f"user:{user_id}")
3.3.3 缓存穿透、击穿、雪崩防护
缓存穿透防护(查询不存在的数据)
def get_user(self, user_id):
cache_key = f"user:{user_id}"
cached = self.redis_client.get(cache_key)
if cached:
return None if cached == "null" else json.loads(cached)
user = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 即使数据不存在,也缓存空值
if user is None:
self.redis_client.setex(cache_key, 60, "null")
return None
self.redis_client.setex(cache_key, 3600, json.dumps(user))
return user
缓存击穿防护(热点key过期)
def get_hot_data(self, key):
cache_key = f"hot:{key}"
# 使用SETNX实现分布式锁
lock_key = f"lock:{key}"
lock = self.redis_client.set(lock_key, "1", nx=True, ex=10)
if lock:
try:
# 只有获取锁的线程才查询数据库
data = db.query("SELECT * FROM hot_table WHERE key = %s", key)
self.redis_client.setex(cache_key, 3600, json.dumps(data))
return data
finally:
self.redis_client.delete(lock_key)
else:
# 未获取锁,等待后重试
time.sleep(0.1)
return self.get_hot_data(key)
缓存雪崩防护(大量key同时过期)
def set_with_jitter(self, key, value, base_expire=3600):
# 添加随机过期时间(±300秒)
jitter = random.randint(-300, 300)
expire = base_expire + jitter
self.redis_client.setex(key, expire, value)
3.4 高可用架构
3.4.1 MHA高可用方案
MHA(Master High Availability)是MySQL高可用的经典方案。
架构图:
Master (写) → Slave1 (读) → Slave2 (读)
↓
MHA Manager (监控)
MHA配置示例
# manager配置文件 /etc/mha/app1.cnf
[server default]
user=mha
password=password
manager_workdir=/var/log/mha/app1
manager_log=/var/log/mha/app1/manager.log
master_binlog_dir=/var/lib/mysql
ping_interval=3
repl_password=password
repl_user=repl
[server1]
hostname=master1
candidate_master=1
[server2]
hostname=slave1
candidate_master=1
[server3]
hostname=slave2
no_master=1
3.4.2 InnoDB Cluster
MySQL 8.0+ 推荐的高可用方案,基于Group Replication。
# 创建InnoDB Cluster
# 1. 配置Group Replication
SET GLOBAL group_replication_bootstrap_group=ON;
SET GLOBAL group_replication_local_address="192.168.1.101:33061";
SET GLOBAL group_replication_group_seeds="192.168.1.101:33061,192.168.1.102:33061,192.168.1.103:33061";
# 2. 创建Cluster
mysqlsh --uri root@localhost:3306 --password=password
dba.createCluster('mycluster');
3.5 分布式事务处理
3.5.1 TCC模式(Try-Confirm-Cancel)
# 转账场景:A向B转账
class TransferService:
def transfer(self, from_id, to_id, amount):
# Try阶段:冻结资金
try:
self.try_reduce_balance(from_id, amount)
self.try_increase_balance(to_id, amount)
# Confirm阶段
self.confirm_reduce_balance(from_id, amount)
self.confirm_increase_balance(to_id, amount)
except Exception as e:
# Cancel阶段
self.cancel_reduce_balance(from_id, amount)
self.cancel_increase_balance(to_id, amount)
raise e
def try_reduce_balance(self, user_id, amount):
# 冻结资金,不实际扣除
db.execute("""
UPDATE users
SET frozen_balance = frozen_balance + %s
WHERE id = %s AND balance >= %s
""", amount, user_id, amount)
def confirm_reduce_balance(self, user_id, amount):
# 确认扣除
db.execute("""
UPDATE users
SET balance = balance - %s,
frozen_balance = frozen_balance - %s
WHERE id = %s
""", amount, amount, user_id)
def cancel_reduce_balance(self, user_id, amount):
# 解冻资金
db.execute("""
UPDATE users
SET frozen_balance = frozen_balance - %s
WHERE id = %s
""", amount, user_id)
3.5.2 Saga模式
适用于长事务场景,将大事务拆分为一系列小事务。
# 订单创建Saga
class OrderSaga:
def create_order(self, user_id, product_id, quantity):
try:
# 步骤1:创建订单记录
order_id = self.create_order_record(user_id, product_id, quantity)
# 步骤2:扣减库存
self.reduce_stock(product_id, quantity)
# 步骤3:扣减用户余额
self.reduce_balance(user_id, product_id, quantity)
# 步骤4:更新订单状态为成功
self.update_order_status(order_id, 'SUCCESS')
except Exception as e:
# 补偿事务
self.compensate(order_id)
def compensate(self, order_id):
# 根据订单状态执行补偿
order = self.get_order(order_id)
if order.status == 'CREATED':
# 补偿:恢复库存
self.restore_stock(order.product_id, order.quantity)
# 补偿:恢复余额
self.restore_balance(order.user_id, order.amount)
# 标记订单为已取消
self.update_order_status(order_id, 'CANCELLED')
四、监控与调优:持续优化的保障
4.1 性能监控指标
4.1.1 关键指标
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看活跃连接数
SHOW STATUS LIKE 'Threads_running';
-- 查看QPS
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Queries' -- 1秒后再次查询,计算差值
-- 查看TPS
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';
4.1.2 InnoDB引擎状态
SHOW ENGINE INNODB STATUS\G
重点关注:
- TRANSACTIONS:锁等待和死锁信息
- SEMAPHORES:信号量等待
- FILE I/O:I/O性能
- BUFFER POOL:缓存命中率
4.2 慢查询监控系统
4.2.1 使用Percona Toolkit
# 安装
sudo apt-get install percona-toolkit
# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 在线分析
pt-query-digest --processlist h=localhost,u=root,p=password --interval 10
4.2.2 自定义监控脚本
#!/usr/bin/env python3
import mysql.connector
import time
import smtplib
def monitor_slow_queries():
conn = mysql.connector.connect(
host='localhost',
user='monitor',
password='password',
database='performance_schema'
)
cursor = conn.cursor()
# 查询最近5分钟的慢查询
cursor.execute("""
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 as avg_time_sec
FROM events_statements_summary_by_digest
WHERE LAST_SEEN > NOW() - INTERVAL 5 MINUTE
AND AVG_TIMER_WAIT > 1000000000000 -- 超过1秒
ORDER BY COUNT_STAR DESC
LIMIT 10
""")
slow_queries = cursor.fetchall()
if slow_queries:
# 发送告警
send_alert(slow_queries)
conn.close()
def send_alert(queries):
# 实现告警发送逻辑
subject = "MySQL慢查询告警"
body = "\n".join([f"{q[0]}: {q[1]}次, 平均{q[2]:.2f}秒" for q in queries])
# 发送邮件或钉钉/企业微信
print(f"ALERT: {body}")
if __name__ == '__main__':
while True:
monitor_slow_queries()
time.sleep(60) # 每分钟检查一次
4.3 性能调优工具
4.3.1 sys schema(MySQL 5.7+)
-- 查看最耗时的查询
SELECT * FROM sys.statement_analysis ORDER BY avg_latency DESC LIMIT 10;
-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;
-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;
-- 查看表的I/O情况
SELECT * FROM sys.schema_table_statistics ORDER BY total_latency DESC;
4.3.2 MySQLTuner
# 下载并运行
wget https://raw.githubusercontent.com/major/MySQLTuner-perl/master/mysqltuner.pl
perl mysqltuner.pl --user root --password password
五、实战案例:秒杀系统设计
5.1 秒杀场景特点
- 瞬时流量:QPS可能达到平时的100倍以上
- 库存扣减:需要保证不超卖
- 热点数据:少数商品被大量用户抢购
- 防作弊:防止黄牛刷单
5.2 秒杀架构设计
5.2.1 整体架构
客户端 → CDN → Nginx → 秒杀服务 → Redis → MySQL
↓
消息队列 → 异步处理
5.2.2 核心代码实现
Redis预扣库存
class SeckillService:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
self.db = create_db_connection()
def seckill(self, user_id, product_id):
# 1. 限流(令牌桶)
if not self.rate_limit(user_id):
return {"code": 429, "msg": "请求过于频繁"}
# 2. 校验用户资格(防刷)
if not self.check_user资格(user_id, product_id):
return {"code": 403, "msg": "不符合购买条件"}
# 3. 预扣库存(Redis原子操作)
stock_key = f"seckill:stock:{product_id}"
user_key = f"seckill:user:{product_id}"
# 检查是否已购买
if self.redis.sismember(user_key, user_id):
return {"code": 400, "msg": "您已参与过秒杀"}
# 原子扣减库存
stock = self.redis.decr(stock_key)
if stock < 0:
self.redis.incr(stock_key) # 回滚
return {"code": 500, "msg": "库存不足"}
# 4. 记录已购买用户
self.redis.sadd(user_key, user_id)
# 5. 发送消息到队列(异步创建订单)
self.send_to_queue({
"user_id": user_id,
"product_id": product_id,
"stock": stock,
"timestamp": time.time()
})
return {"code": 200, "msg": "秒杀成功", "order_id": None}
def rate_limit(self, user_id):
# 令牌桶算法
key = f"rate_limit:{user_id}"
capacity = 10 # 桶容量
refill_rate = 1 # 每秒 refill 1个令牌
current = self.redis.get(key)
if current is None:
self.redis.setex(key, 60, capacity - 1)
return True
if int(current) > 0:
self.redis.decr(key)
return True
return False
def check_user资格(self, user_id, product_id):
# 检查黑名单
if self.redis.sismember("blacklist:users", user_id):
return False
# 检查购买频率(1小时内最多1次)
freq_key = f"user_freq:{user_id}:{product_id}"
if self.redis.exists(freq_key):
return False
self.redis.setex(freq_key, 3600, "1")
return True
def send_to_queue(self, message):
# 发送到RabbitMQ/Kafka
pass
异步订单处理
def process_seckill_order(message):
"""
消费消息队列,创建订单
"""
user_id = message['user_id']
product_id = message['product_id']
try:
# 1. 开启事务
with db.transaction():
# 2. 检查库存(防止Redis和DB不一致)
stock = db.query(
"SELECT stock FROM products WHERE id = %s FOR UPDATE",
product_id
)
if stock <= 0:
raise Exception("库存不足")
# 3. 扣减数据库库存
result = db.execute(
"UPDATE products SET stock = stock - 1 WHERE id = %s AND stock > 0",
product_id
)
if result.rowcount == 0:
raise Exception("数据库库存不足")
# 4. 创建订单
order_id = db.execute("""
INSERT INTO orders (user_id, product_id, status, create_time)
VALUES (%s, %s, 'SUCCESS', NOW())
""", user_id, product_id)
# 5. 记录订单号(用于后续查询)
self.redis.setex(f"seckill:order:{user_id}:{product_id}", 3600, order_id)
except Exception as e:
# 异常处理:恢复Redis库存
self.redis.incr(f"seckill:stock:{product_id}")
self.redis.srem(f"seckill:user:{product_id}", user_id)
logger.error(f"订单处理失败: {e}")
5.2.3 数据库表设计
-- 商品表(库存单独维护)
CREATE TABLE products (
id BIGINT PRIMARY KEY,
name VARCHAR(200),
total_stock INT,
seckill_stock INT, -- 秒杀库存
seckill_start DATETIME,
seckill_end DATETIME,
INDEX idx_seckill_time (seckill_start, seckill_end)
) ENGINE=InnoDB;
-- 订单表(分表)
CREATE TABLE orders_0 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
amount DECIMAL(10,2),
status ENUM('PENDING', 'SUCCESS', 'FAILED'),
create_time DATETIME,
INDEX idx_user (user_id),
INDEX idx_product (product_id)
) ENGINE=InnoDB;
-- 秒杀记录表(只记录成功订单)
CREATE TABLE seckill_records (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT,
product_id BIGINT,
order_id BIGINT,
create_time DATETIME,
INDEX idx_user_product (user_id, product_id)
) ENGINE=InnoDB;
5.3 压测与优化
5.3.1 使用sysbench进行压测
# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test --tables=10 --table-size=100000 \
/usr/share/sysbench/oltp_read_write.lua prepare
# 执行压测
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test --tables=10 --table-size=100000 \
--threads=100 --time=60 --report-interval=10 \
/usr/share/sysbench/oltp_read_write.lua run
# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test \
/usr/share/sysbench/oltp_read_write.lua cleanup
5.3.2 性能优化 checklist
- [ ] 索引是否覆盖查询?
- [ ] 是否存在全表扫描?
- [ ] 连接池配置是否合理?
- [ ] 慢查询是否已优化?
- [ ] 缓存命中率是否>95%?
- [ ] 主从延迟是否秒?
- [ ] 是否开启查询缓存(MySQL 8.0已移除)?
- [ ] InnoDB缓冲池大小是否合理(建议70-80%内存)?
- [ ] 日志文件大小是否合适(避免频繁刷盘)?
- [ ] 是否启用压缩表?
六、总结与最佳实践
6.1 性能优化金字塔
架构设计(分库分表、缓存)
↓
查询优化(执行计划、大事务)
↓
索引优化(覆盖索引、最左前缀)
↓
配置优化(缓冲池、连接池)
6.2 高并发黄金法则
- 能不查库就不查库:缓存为王
- 能异步就不同步:消息队列解耦
- 能限制就不放行:限流熔断
- 能拆分就不合并:分而治之
- 能简单就不复杂:避免过度设计
6.3 持续优化建议
- 建立监控体系:实时掌握数据库健康状况
- 定期慢查询分析:每周review一次
- 压测常态化:每次大促前必须压测
- 容量规划:提前3个月预估增长
- 故障演练:定期演练主从切换、宕机恢复
6.4 推荐工具与资源
- 监控:Prometheus + Grafana + mysqld_exporter
- 慢查询分析:Percona Toolkit, pt-query-digest
- 性能调优:MySQLTuner, tuning-primer
- 备份:Percona XtraBackup
- 高可用:Orchestrator, MHA
- 分库分表:ShardingSphere, Vitess
通过以上策略的综合运用,MySQL完全可以应对万级QPS的高并发场景。关键在于预防优于治疗,在系统设计之初就考虑好扩展性和性能,而不是等到出现问题再紧急优化。
