引言:理解高并发场景下的数据库挑战
在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,数据库都面临着前所未有的压力。MySQL作为最流行的关系型数据库之一,如何在高并发环境下保持稳定和高性能,是每个开发者和DBA必须掌握的核心技能。
高并发对数据库的挑战主要体现在以下几个方面:
- 连接数激增:大量用户同时访问,导致数据库连接资源耗尽
- CPU负载过高:复杂查询和频繁的计算操作使CPU不堪重负
- I/O瓶颈:磁盘读写速度跟不上内存中的数据处理速度
- 锁竞争:并发事务导致锁等待和死锁,严重影响吞吐量
- 内存不足:缓冲池无法容纳热点数据,导致频繁的磁盘I/O
本文将从多个维度深入探讨MySQL高并发优化的策略,包括架构设计、配置调优、SQL优化、缓存策略等,并提供详细的配置示例和代码实现。
一、架构层面的优化策略
1.1 读写分离架构
读写分离是应对高并发查询的最有效策略之一。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库的压力。
实现原理:
- 主库(Master)负责所有的写操作(INSERT、UPDATE、DELETE)
- 从库(Slave)负责所有的读操作(SELECT)
- 应用层通过中间件或ORM框架自动路由请求
MySQL原生主从复制配置示例:
-- 在主库上配置
-- 编辑 my.cnf 或 my.ini
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
-- 查看主库状态
SHOW MASTER STATUS;
-- 记录 File 和 Position 值
-- 在从库上配置
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read-only = 1
-- 配置从库连接主库
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=1234;
-- 启动从库复制
START SLAVE;
SHOW SLAVE STATUS\G
应用层读写分离实现(Python示例):
import pymysql
from pymysql.cursors import DictCursor
class ReadWriteSplittingDB:
def __init__(self):
# 主库配置(写操作)
self.master_config = {
'host': 'master_host',
'user': 'root',
'password': 'password',
'database': 'myapp',
'charset': 'utf8mb4'
}
# 从库配置(读操作)
self.slave_config = {
'host': 'slave_host',
'user': 'root',
'password': 'password',
'database': 'myapp',
'charset': 'utf8mb4'
}
def get_master_connection(self):
"""获取主库连接"""
return pymysql.connect(**self.master_config)
def get_slave_connection(self):
"""获取从库连接"""
return pymysql.connect(**self.slave_config)
def execute_write(self, sql, params=None):
"""执行写操作"""
conn = self.get_master_connection()
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
finally:
conn.close()
def execute_read(self, sql, params=None):
"""执行读操作"""
conn = self.get_slave_connection()
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
finally:
conn.close()
# 使用示例
db = ReadWriteSplittingDB()
# 写操作走主库
db.execute_write("INSERT INTO users (name, email) VALUES (%s, %s)", ("张三", "zhangsan@example.com"))
# 读操作走从库
results = db.execute_read("SELECT * FROM users WHERE id = %s", (1,))
1.2 分库分表策略
当单表数据量超过千万级别时,查询性能会急剧下降。分库分表是解决这一问题的根本方案。
水平分表(Sharding): 将大表按某种规则拆分成多个小表,例如按用户ID取模或按时间范围拆分。
-- 原始订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- 水平分表:按user_id取模分4张表
CREATE TABLE orders_0 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
CREATE TABLE orders_1 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- ... orders_2, orders_3
-- 分表路由函数(应用层实现)
def get_table_name(user_id):
"""根据user_id计算表名"""
table_index = user_id % 4
return f"orders_{table_index}"
def insert_order(user_id, amount):
"""插入订单到分表"""
table_name = get_table_name(user_id)
sql = f"INSERT INTO {table_name} (order_id, user_id, amount, create_time) VALUES (%s, %s, %s, NOW())"
# 执行SQL...
垂直分表: 将大表中不常用的字段拆分到扩展表中。
-- 主表:核心字段
CREATE TABLE user_core (
user_id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
status TINYINT,
create_time DATETIME
) ENGINE=InnoDB;
-- 扩展表:详细信息
CREATE TABLE user_profile (
user_id BIGINT PRIMARY KEY,
real_name VARCHAR(50),
avatar TEXT,
bio TEXT,
last_login DATETIME,
FOREIGN KEY (user_id) REFERENCES user_core(user_id)
) ENGINE=InnoDB;
1.3 数据库中间件
使用数据库中间件可以简化分库分表和读写分离的实现。
ShardingSphere-JDBC配置示例:
# application.yml
spring:
shardingsphere:
datasource:
names: ds0, ds1
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/db0
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/db1
username: root
password: password
rules:
sharding:
tables:
orders:
actual-data-nodes: ds$->{0..1}.orders_$->{0..3}
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-db-inline
sharding-algorithms:
orders-table-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
algorithmClassName: com.example.ModShardingAlgorithm
orders-db-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
二、MySQL配置参数调优
2.1 连接数优化
高并发下首先要调整的最大连接数,避免连接耗尽。
# my.cnf 配置
[mysqld]
# 最大连接数,根据服务器内存调整,一般设置为500-1000
max_connections = 800
# 每个用户最大连接数
max_user_connections = 200
# 连接超时时间(秒)
wait_timeout = 600
interactive_timeout = 600
# 连接队列大小
back_log = 500
# 最大错误连接尝试次数
max_connect_errors = 100000
动态调整(无需重启):
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 动态调整最大连接数(立即生效)
SET GLOBAL max_connections = 1000;
-- 查看连接拒绝统计
SHOW STATUS LIKE 'Aborted_connects';
2.2 缓冲池优化
InnoDB缓冲池是MySQL性能的关键,决定了数据在内存中的缓存能力。
# my.cnf 配置
[mysqld]
# 缓冲池大小,通常设置为物理内存的50%-70%
innodb_buffer_pool_size = 8G
# 缓冲池实例数,多核CPU建议设置为4-8
innodb_buffer_pool_instances = 8
# 缓冲池预热,重启时恢复缓冲池状态
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON
# 页大小,对于大表建议设置为64K
innodb_page_size = 16K
# 旧数据淘汰策略
innodb_old_blocks_time = 1000
innodb_old_blocks_pct = 37
监控缓冲池命中率:
-- 缓冲池读写统计
SHOW STATUS LIKE 'Innodb_buffer_pool_read%';
-- 计算命中率(应>99%)
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100
-- 查看缓冲池使用情况
SHOW ENGINE INNODB STATUS\G
-- 查看 BUFFER POOL AND MEMORY 部分
2.3 日志文件优化
日志文件大小和写入策略直接影响事务性能和数据安全性。
# my.cnf 配置
[mysqld]
# 重做日志文件大小,建议1-2G,太大恢复时间长,太小频繁切换
innodb_log_file_size = 2G
# 重做日志文件数量,至少2个
innodb_log_files_in_group = 3
# 日志缓冲区大小,建议8-64M
innodb_log_buffer_size = 64M
# 刷新策略
innodb_flush_log_at_trx_commit = 1 # 1: 每次提交都刷盘(最安全)
# 2: 每秒刷盘(性能好,可能丢失1秒数据)
# 0: 每checkpoint刷盘(性能最好,风险最大)
# 刷新方法
innodb_flush_method = O_DIRECT # 绕过OS缓存,直接写入磁盘
# 刷盘线程数
innodb_flush_neighbors = 1
innodb_flush_sync = OFF
2.4 事务和锁优化
# my.cnf 配置
[mysqld]
# 事务隔离级别,默认REPEATABLE-READ,高并发读多场景可考虑READ-COMMITTED
transaction-isolation = READ-COMMITTED
# 锁等待超时(秒)
innodb_lock_wait_timeout = 50
# 自增锁模式,高并发插入时建议设置为2(交错模式)
innodb_autoinc_lock_mode = 2
# 死锁检测,高并发下可关闭死锁检测,依靠超时机制
innodb_deadlock_detect = ON
# 间隙锁,高并发写入时建议关闭
innodb_locks_unsafe_for_binlog = OFF
三、SQL语句优化
3.1 索引优化策略
索引是提升查询性能最直接有效的方法。
索引设计原则:
- 选择性高的列适合建索引(如ID、邮箱)
- 避免在低选择性列上建索引(如性别、状态)
- 复合索引遵循最左前缀原则
- 避免过多索引(影响写性能)
-- 示例:用户表索引优化
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status TINYINT DEFAULT 1,
create_time DATETIME,
region VARCHAR(50),
age INT,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_status_create_time (status, create_time), -- 复合索引
INDEX idx_region_age (region, age) -- 复合索引
) ENGINE=InnoDB;
-- 优化前:全表扫描
EXPLAIN SELECT * FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- type: ALL, rows: 1000000
-- 优化后:使用索引
EXPLAIN SELECT * FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- type: range, rows: 50000
索引使用技巧:
-- 1. 覆盖索引:查询列都在索引中,避免回表
EXPLAIN SELECT id, username FROM users WHERE username LIKE 'zhang%';
-- 使用 idx_username 索引,Extra: Using index
-- 2. 索引下推:MySQL 5.6+ 特性
EXPLAIN SELECT * FROM users WHERE username LIKE 'zhang%' AND status = 1;
-- 索引过滤后,再回表过滤status
-- 3. 索引合并:MySQL 5.0+ 特性
EXPLAIN SELECT * FROM users WHERE username = 'zhangsan' OR email = 'zhangsan@example.com';
-- 同时使用 idx_username 和 idx_email,然后合并结果
3.2 避免索引失效的常见场景
-- ❌ 错误示例:索引失效
-- 1. 在索引列上使用函数
SELECT * FROM users WHERE DATE(create_time) = '2024-01-01';
-- 优化:改为范围查询
SELECT * FROM users WHERE create_time >= '2024-01-01' AND create_time < '2024-01-02';
-- 2. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000; -- phone是VARCHAR类型
-- 优化:保持类型一致
SELECT * FROM users WHERE phone = '13800138000';
-- 3. 前导模糊查询
SELECT * FROM users WHERE username LIKE '%zhang';
-- 无法使用索引,优化:只使用后缀模糊查询或全文索引
SELECT * FROM users WHERE username LIKE 'zhang%';
-- 4. OR连接不同列(部分版本)
SELECT * FROM users WHERE username = 'zhangsan' OR age = 25;
-- 优化:使用UNION或IN
SELECT * FROM users WHERE username = 'zhangsan'
UNION
SELECT * FROM users WHERE age = 25;
-- 5. 负向查询
SELECT * FROM users WHERE status != 1;
SELECT * FROM users WHERE username NOT IN ('zhangsan', 'lisi');
-- 优化:改为正向查询
SELECT * FROM users WHERE status IN (0, 2, 3);
3.3 分页优化
高并发下的分页查询容易出现性能问题,特别是深度分页。
-- ❌ 低效分页:OFFSET越大越慢
SELECT * FROM orders WHERE user_id = 123 ORDER BY id LIMIT 1000000, 20;
-- ✅ 优化方案1:延迟关联(子查询只查主键)
SELECT o.*
FROM orders o
JOIN (
SELECT id
FROM orders
WHERE user_id = 123
ORDER BY id
LIMIT 1000000, 20
) t ON o.id = t.id;
-- ✅ 优化方案2:记录上次最大ID(适用于滚动加载)
SELECT * FROM orders
WHERE user_id = 123 AND id > 1000000
ORDER BY id
LIMIT 20;
-- ✅ 优化方案3:使用Elasticsearch等搜索引擎
-- 对于超大数据量分页,建议使用ES的search_after
3.4 批量操作优化
高并发下,频繁的单条插入会导致性能瓶颈。
-- ❌ 低效:单条插入循环
INSERT INTO orders (user_id, amount) VALUES (1, 100.00);
INSERT INTO orders (user_id, amount) VALUES (2, 200.00);
-- ... 1000次
-- ✅ 高效:批量插入
INSERT INTO orders (user_id, amount) VALUES
(1, 100.00),
(2, 200.00),
(3, 300.00),
-- ... 1000条
(1000, 100000.00);
-- ✅ 优化:使用LOAD DATA INFILE(万级数据)
LOAD DATA LOCAL INFILE '/tmp/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount);
-- ✅ 优化:使用INSERT DELAYED(已废弃,建议用ON DUPLICATE KEY UPDATE)
INSERT INTO orders (user_id, amount) VALUES (1, 100.00)
ON DUPLICATE KEY UPDATE amount = VALUES(amount);
3.5 复杂查询优化
-- 示例:电商订单统计查询优化
-- ❌ 原始查询:多表JOIN,性能差
SELECT
o.order_id,
u.username,
p.product_name,
o.amount,
o.create_time
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1
AND o.create_time >= '2024-01-01'
AND u.region = '北京'
ORDER BY o.amount DESC
LIMIT 0, 20;
-- ✅ 优化方案:冗余字段 + 覆盖索引
-- 1. 在orders表冗余username和region字段
ALTER TABLE orders ADD COLUMN username VARCHAR(50);
ALTER TABLE orders ADD COLUMN region VARCHAR(50);
-- 2. 创建复合索引
CREATE INDEX idx_status_region_time_amount ON orders (status, region, create_time, amount DESC);
-- 3. 优化后的查询(避免JOIN)
SELECT
order_id,
username,
amount,
create_time
FROM orders
WHERE status = 1
AND region = '北京'
AND create_time >= '2024-01-01'
ORDER BY amount DESC
LIMIT 0, 20;
-- ✅ 优化方案2:物化视图(MySQL 8.0+)
CREATE MATERIALIZED VIEW mv_order_summary AS
SELECT
o.order_id,
u.username,
p.product_name,
o.amount,
o.create_time
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1;
-- 查询物化视图
SELECT * FROM mv_order_summary WHERE create_time >= '2024-01-01';
四、缓存策略
4.1 应用层缓存
使用Redis等内存数据库作为缓存层,减少数据库访问。
import redis
import json
from functools import wraps
# Redis连接配置
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
def cache_query(ttl=300):
"""查询结果缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# 执行原函数
result = func(*args, **kwargs)
# 写入缓存
redis_client.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# 使用示例
@cache_query(ttl=60)
def get_user_info(user_id):
"""从数据库获取用户信息"""
db = ReadWriteSplittingDB()
return db.execute_read("SELECT * FROM users WHERE id = %s", (user_id,))
# 高并发下,90%的请求会命中缓存
4.2 缓存更新策略
class CacheManager:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def get_user_with_cache(self, user_id):
"""获取用户信息(带缓存)"""
cache_key = f"user:{user_id}"
# 1. 先从缓存获取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查询数据库
user = self.db.execute_read("SELECT * FROM users WHERE id = %s", (user_id,))
# 3. 写入缓存
if user:
self.redis.setex(cache_key, 300, json.dumps(user))
return user
def update_user(self, user_id, data):
"""更新用户信息(同步缓存)"""
# 1. 更新数据库
sql = "UPDATE users SET name = %s, email = %s WHERE id = %s"
self.db.execute_write(sql, (data['name'], data['email'], user_id))
# 2. 删除缓存(下次查询时自动更新)
cache_key = f"user:{user_id}"
self.redis.delete(cache_key)
# 3. 可选:立即更新缓存
# self.redis.setex(cache_key, 300, json.dumps(data))
def delete_user(self, user_id):
"""删除用户(同步缓存)"""
# 1. 删除数据库
self.db.execute_write("DELETE FROM users WHERE id = %s", (user_id,))
# 2. 删除缓存
cache_key = f"user:{user_id}"
self.redis.delete(cache_key)
4.3 缓存穿透、击穿、雪崩防护
class CacheProtection:
def __init__(self, redis_client):
self.redis = redis_client
def get_with_cache穿透防护(self, key, fallback_func, ttl=300):
"""防止缓存穿透:查询不存在的数据"""
value = self.redis.get(key)
if value:
return json.loads(value) if value != "null" else None
# 查询数据库
result = fallback_func()
# 缓存空值(防止缓存穿透)
if result is None:
self.redis.setex(key, 60, "null") # 短过期时间
return None
self.redis.setex(key, ttl, json.dumps(result))
return result
def get_with_cache击穿防护(self, key, fallback_func, ttl=300):
"""防止缓存击穿:热点key过期时的并发查询"""
# 使用分布式锁
lock_key = f"lock:{key}"
lock = self.redis.set(lock_key, "1", nx=True, ex=10)
if lock:
try:
# 获取数据
result = fallback_func()
self.redis.setex(key, ttl, json.dumps(result))
return result
finally:
self.redis.delete(lock_key)
else:
# 等待并重试
import time
time.sleep(0.1)
return self.get_with_cache击穿防护(key, fallback_func, ttl)
def get_with_cache雪崩防护(self, key, fallback_func, ttl=300):
"""防止缓存雪崩:随机过期时间"""
# 随机过期时间(基础ttl ± 60秒)
import random
actual_ttl = ttl + random.randint(-60, 60)
result = fallback_func()
if result:
self.redis.setex(key, actual_ttl, json.dumps(result))
return result
五、高并发下的事务优化
5.1 事务设计原则
-- ❌ 错误:长事务(持有锁时间过长)
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
-- 等待用户输入...
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
COMMIT;
-- ✅ 正确:短事务
BEGIN;
-- 先计算好所有值
SET @new_balance_1 = (SELECT balance FROM account WHERE user_id = 1) - 100;
SET @new_balance_2 = (SELECT balance FROM account WHERE user_id = 2) + 100;
-- 然后快速执行
UPDATE account SET balance = @new_balance_1 WHERE user_id = 1;
UPDATE account SET balance = @new_balance_2 WHERE user_id = 2;
COMMIT;
5.2 乐观锁 vs 悲观锁
-- 悲观锁:SELECT ... FOR UPDATE(适合写冲突多的场景)
BEGIN;
-- 锁定记录,其他事务无法修改
SELECT balance FROM account WHERE user_id = 1 FOR UPDATE;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
COMMIT;
-- 乐观锁:使用版本号(适合读多写少的场景)
ALTER TABLE account ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本号
UPDATE account
SET balance = balance - 100, version = version + 1
WHERE user_id = 1 AND version = 0; -- 版本号匹配才更新
-- 检查影响行数
-- 如果影响行数为0,说明版本号不匹配,需要重试
5.3 死锁避免策略
-- 死锁示例:两个事务交叉锁定
-- 事务1: UPDATE account SET balance = 100 WHERE user_id = 1;
-- 事务2: UPDATE account SET balance = 200 WHERE user_id = 2;
-- 事务1: UPDATE account SET balance = 200 WHERE user_id = 2; -- 等待事务2
-- 事务2: UPDATE account SET balance = 100 WHERE user_id = 1; -- 等待事务1(死锁)
-- ✅ 避免策略:固定加锁顺序
-- 事务1和事务2都按user_id排序后加锁
BEGIN;
-- 先锁定user_id=1,再锁定user_id=2
UPDATE account SET balance = 100 WHERE user_id = 1;
UPDATE account SET balance = 200 WHERE user_id = 2;
COMMIT;
六、监控与诊断
6.1 慢查询日志
# my.cnf 配置
[mysqld]
# 开启慢查询日志
slow_query_log = ON
# 慢查询日志文件路径
slow_query_log_file = /var/log/mysql/slow.log
# 慢查询阈值(秒)
long_query_time = 1
# 记录未使用索引的查询
log_queries_not_using_indexes = ON
# 记录管理语句
log_slow_admin_statements = ON
# 日志输出格式
log_output = FILE
分析慢查询日志:
# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
# 使用pt-query-digest分析(更详细)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 实时监控慢查询
tail -f /var/log/mysql/slow.log | grep -v "Query_time: 0"
6.2 性能监控脚本
import pymysql
import time
import psutil
class MySQLMonitor:
def __init__(self, host, user, password):
self.conn = pymysql.connect(
host=host, user=user, password=password,
cursorclass=pymysql.cursors.DictCursor
)
def get_connection_stats(self):
"""获取连接统计"""
with self.conn.cursor() as cursor:
cursor.execute("SHOW STATUS LIKE 'Threads%'")
return {row['Variable_name']: row['Value'] for row in cursor.fetchall()}
def get_innodb_status(self):
"""获取InnoDB状态"""
with self.conn.cursor() as cursor:
cursor.execute("SHOW ENGINE INNODB STATUS")
return cursor.fetchone()['Status']
def get_slow_queries(self, seconds=1):
"""获取慢查询"""
with self.conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM mysql.slow_log
WHERE start_time > NOW() - INTERVAL 60 SECOND
AND query_time > %s
ORDER BY query_time DESC
LIMIT 10
""", (seconds,))
return cursor.fetchall()
def get_table_stats(self, db_name):
"""获取表统计信息"""
with self.conn.cursor() as cursor:
cursor.execute("""
SELECT
table_name,
table_rows,
data_length,
index_length,
ROUND((data_length + index_length) / 1024 / 1024, 2) AS total_mb
FROM information_schema.tables
WHERE table_schema = %s
ORDER BY data_length DESC
""", (db_name,))
return cursor.fetchall()
def monitor_realtime(self, interval=5):
"""实时监控"""
while True:
# 系统资源
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
# MySQL状态
stats = self.get_connection_stats()
print(f"\n=== {time.strftime('%Y-%m-%d %H:%M:%S')} ===")
print(f"CPU: {cpu_percent}%")
print(f"Memory: {memory.percent}%")
print(f"MySQL Connections: {stats.get('Threads_connected', 0)}/{stats.get('Max_connections', 0)}")
print(f"Active Queries: {stats.get('Threads_running', 0)}")
time.sleep(interval)
# 使用示例
monitor = MySQLMonitor('localhost', 'root', 'password')
monitor.monitor_realtime()
6.3 性能模式(Performance Schema)
MySQL 5.6+ 提供了强大的性能监控功能。
-- 启用性能模式(默认开启)
SHOW VARIABLES LIKE 'performance_schema';
-- 查看最耗时的SQL
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT / 1000000000000 AS avg_time_sec,
SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 查看表I/O统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_NUMBER_OF_BYTES_READ / 1024 / 1024 AS read_mb,
SUM_NUMBER_OF_BYTES_WRITE / 1024 / 1024 AS write_mb
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_NUMBER_OF_BYTES_READ + SUM_NUMBER_OF_BYTES_WRITE DESC
LIMIT 10;
-- 查看索引使用情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_FETCH DESC
LIMIT 10;
七、高并发场景实战案例
7.1 秒杀系统优化
场景:10000 QPS的秒杀活动,库存只有100件。
架构设计:
- 前端限流:按钮置灰、验证码
- 服务端限流:令牌桶算法
- 库存预扣:Redis预减库存
- 异步下单:消息队列削峰
import redis
import time
import json
from threading import Lock
class SeckillService:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
self.lock = Lock()
def seckill(self, user_id, item_id):
"""秒杀核心逻辑"""
stock_key = f"seckill:stock:{item_id}"
user_key = f"seckill:user:{item_id}:{user_id}"
# 1. 检查是否已购买
if self.redis.exists(user_key):
return {"success": False, "message": "您已参与过秒杀"}
# 2. 预减库存(原子操作)
stock = self.redis.decr(stock_key)
if stock < 0:
self.redis.incr(stock_key) # 恢复库存
return {"success": False, "message": "库存不足"}
# 3. 标记用户已购买
self.redis.setex(user_key, 3600, "1")
# 4. 发送消息到队列(异步创建订单)
message = {
"user_id": user_id,
"item_id": item_id,
"timestamp": time.time()
}
self.redis.lpush("seckill:order:queue", json.dumps(message))
return {"success": True, "message": "秒杀成功,订单处理中"}
def process_order_queue(self):
"""后台处理订单队列"""
while True:
# 从队列获取消息
message = self.redis.brpop("seckill:order:queue", timeout=5)
if not message:
continue
data = json.loads(message[1])
user_id = data['user_id']
item_id = data['item_id']
try:
# 扣减真实库存(数据库)
sql = "UPDATE items SET stock = stock - 1 WHERE item_id = %s AND stock > 0"
result = self.db.execute_write(sql, (item_id,))
if result > 0:
# 创建订单
order_id = self.create_order(user_id, item_id)
print(f"订单创建成功: {order_id}")
else:
# 库存不足,回滚Redis
self.redis.incr(f"seckill:stock:{item_id}")
except Exception as e:
print(f"订单处理失败: {e}")
# 异常回滚
self.redis.incr(f"seckill:stock:{item_id}")
def create_order(self, user_id, item_id):
"""创建订单"""
sql = """
INSERT INTO orders (order_id, user_id, item_id, status, create_time)
VALUES (UUID(), %s, %s, 'pending', NOW())
"""
self.db.execute_write(sql, (user_id, item_id))
return "ORDER_" + str(int(time.time()))
7.2 朋友圈动态查询优化
场景:用户查看朋友圈,按时间倒序,支持分页。
优化方案:
- 时间线分表(按用户ID分片)
- 热点数据缓存
- 延迟加载大字段
-- 朋友圈动态表(分表)
CREATE TABLE moments_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT,
content TEXT,
images JSON,
create_time DATETIME,
INDEX idx_user_time (user_id, create_time DESC)
) ENGINE=InnoDB;
-- 用户关系表(缓存关注列表)
CREATE TABLE user_follows (
user_id BIGINT,
follow_id BIGINT,
create_time DATETIME,
PRIMARY KEY (user_id, follow_id),
INDEX idx_follow (follow_id)
) ENGINE=InnoDB;
-- 查询优化:只查ID,再延迟加载详情
SELECT m.id
FROM moments_0 m
WHERE m.user_id IN (
SELECT follow_id FROM user_follows WHERE user_id = 123
)
AND m.create_time >= '2024-01-01'
ORDER BY m.create_time DESC
LIMIT 0, 20;
-- 然后根据ID批量查询详情
SELECT * FROM moments_0 WHERE id IN (1,2,3,4,5...);
八、总结与最佳实践
8.1 高并发优化检查清单
架构层:
- [ ] 是否采用读写分离?
- [ ] 数据量是否超过1000万?考虑分库分表
- [ ] 是否使用数据库中间件简化管理?
配置层:
- [ ] max_connections 是否足够?
- [ ] innodb_buffer_pool_size 是否合理?
- [ ] 慢查询日志是否开启?
- [ ] 是否启用查询缓存(MySQL 8.0已移除)?
SQL层:
- [ ] 所有查询都有合适的索引?
- [ ] 避免SELECT *,只查询需要的字段
- [ ] 复杂查询是否可以拆分或冗余字段?
- [ ] 分页查询是否使用延迟关联?
缓存层:
- [ ] 热点数据是否缓存?
- [ ] 是否有缓存穿透、击穿、雪崩防护?
- [ ] 缓存更新策略是否合理?
监控层:
- [ ] 慢查询监控是否到位?
- [ ] 是否有实时性能监控?
- [ ] 是否定期分析慢查询日志?
8.2 性能优化黄金法则
- 二八定律:80%的性能问题由20%的SQL导致,优先优化这些SQL
- 空间换时间:适当冗余字段,使用缓存
- 异步化:非核心逻辑异步处理,减少响应时间
- 限流降级:高并发时保护数据库,避免雪崩
- 持续监控:性能优化是持续过程,需要实时监控和调整
8.3 常见误区
- ❌ 盲目增加硬件资源(应先优化SQL和架构)
- ❌ 过度索引(影响写性能)
- ❌ 忽略事务设计(导致锁竞争)
- ❌ 缓存与数据库不一致(需要合理的更新策略)
- ❌ 忽略监控(无法及时发现问题)
通过以上策略的综合运用,可以有效提升MySQL在高并发场景下的性能表现,支撑海量请求的挑战。记住,性能优化是一个系统工程,需要从架构、配置、SQL、缓存、监控等多个维度综合考虑,持续迭代优化。# MySQL高并发处理策略:如何优化数据库性能应对海量请求与挑战
引言:理解高并发场景下的数据库挑战
在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,数据库都面临着前所未有的压力。MySQL作为最流行的关系型数据库之一,如何在高并发环境下保持稳定和高性能,是每个开发者和DBA必须掌握的核心技能。
高并发对数据库的挑战主要体现在以下几个方面:
- 连接数激增:大量用户同时访问,导致数据库连接资源耗尽
- CPU负载过高:复杂查询和频繁的计算操作使CPU不堪重负
- I/O瓶颈:磁盘读写速度跟不上内存中的数据处理速度
- 锁竞争:并发事务导致锁等待和死锁,严重影响吞吐量
- 内存不足:缓冲池无法容纳热点数据,导致频繁的磁盘I/O
本文将从多个维度深入探讨MySQL高并发优化的策略,包括架构设计、配置调优、SQL优化、缓存策略等,并提供详细的配置示例和代码实现。
一、架构层面的优化策略
1.1 读写分离架构
读写分离是应对高并发查询的最有效策略之一。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库的压力。
实现原理:
- 主库(Master)负责所有的写操作(INSERT、UPDATE、DELETE)
- 从库(Slave)负责所有的读操作(SELECT)
- 应用层通过中间件或ORM框架自动路由请求
MySQL原生主从复制配置示例:
-- 在主库上配置
-- 编辑 my.cnf 或 my.ini
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
-- 查看主库状态
SHOW MASTER STATUS;
-- 记录 File 和 Position 值
-- 在从库上配置
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read-only = 1
-- 配置从库连接主库
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=1234;
-- 启动从库复制
START SLAVE;
SHOW SLAVE STATUS\G
应用层读写分离实现(Python示例):
import pymysql
from pymysql.cursors import DictCursor
class ReadWriteSplittingDB:
def __init__(self):
# 主库配置(写操作)
self.master_config = {
'host': 'master_host',
'user': 'root',
'password': 'password',
'database': 'myapp',
'charset': 'utf8mb4'
}
# 从库配置(读操作)
self.slave_config = {
'host': 'slave_host',
'user': 'root',
'password': 'password',
'database': 'myapp',
'charset': 'utf8mb4'
}
def get_master_connection(self):
"""获取主库连接"""
return pymysql.connect(**self.master_config)
def get_slave_connection(self):
"""获取从库连接"""
return pymysql.connect(**self.slave_config)
def execute_write(self, sql, params=None):
"""执行写操作"""
conn = self.get_master_connection()
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params)
conn.commit()
return cursor.rowcount
finally:
conn.close()
def execute_read(self, sql, params=None):
"""执行读操作"""
conn = self.get_slave_connection()
try:
with conn.cursor(DictCursor) as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
finally:
conn.close()
# 使用示例
db = ReadWriteSplittingDB()
# 写操作走主库
db.execute_write("INSERT INTO users (name, email) VALUES (%s, %s)", ("张三", "zhangsan@example.com"))
# 读操作走从库
results = db.execute_read("SELECT * FROM users WHERE id = %s", (1,))
1.2 分库分表策略
当单表数据量超过千万级别时,查询性能会急剧下降。分库分表是解决这一问题的根本方案。
水平分表(Sharding): 将大表按某种规则拆分成多个小表,例如按用户ID取模或按时间范围拆分。
-- 原始订单表
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- 水平分表:按user_id取模分4张表
CREATE TABLE orders_0 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
CREATE TABLE orders_1 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- ... orders_2, orders_3
-- 分表路由函数(应用层实现)
def get_table_name(user_id):
"""根据user_id计算表名"""
table_index = user_id % 4
return f"orders_{table_index}"
def insert_order(user_id, amount):
"""插入订单到分表"""
table_name = get_table_name(user_id)
sql = f"INSERT INTO {table_name} (order_id, user_id, amount, create_time) VALUES (%s, %s, %s, NOW())"
# 执行SQL...
垂直分表: 将大表中不常用的字段拆分到扩展表中。
-- 主表:核心字段
CREATE TABLE user_core (
user_id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
status TINYINT,
create_time DATETIME
) ENGINE=InnoDB;
-- 扩展表:详细信息
CREATE TABLE user_profile (
user_id BIGINT PRIMARY KEY,
real_name VARCHAR(50),
avatar TEXT,
bio TEXT,
last_login DATETIME,
FOREIGN KEY (user_id) REFERENCES user_core(user_id)
) ENGINE=InnoDB;
1.3 数据库中间件
使用数据库中间件可以简化分库分表和读写分离的实现。
ShardingSphere-JDBC配置示例:
# application.yml
spring:
shardingsphere:
datasource:
names: ds0, ds1
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/db0
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/db1
username: root
password: password
rules:
sharding:
tables:
orders:
actual-data-nodes: ds$->{0..1}.orders_$->{0..3}
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-db-inline
sharding-algorithms:
orders-table-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
orders-db-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
二、MySQL配置参数调优
2.1 连接数优化
高并发下首先要调整的最大连接数,避免连接耗尽。
# my.cnf 配置
[mysqld]
# 最大连接数,根据服务器内存调整,一般设置为500-1000
max_connections = 800
# 每个用户最大连接数
max_user_connections = 200
# 连接超时时间(秒)
wait_timeout = 600
interactive_timeout = 600
# 连接队列大小
back_log = 500
# 最大错误连接尝试次数
max_connect_errors = 100000
动态调整(无需重启):
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 动态调整最大连接数(立即生效)
SET GLOBAL max_connections = 1000;
-- 查看连接拒绝统计
SHOW STATUS LIKE 'Aborted_connects';
2.2 缓冲池优化
InnoDB缓冲池是MySQL性能的关键,决定了数据在内存中的缓存能力。
# my.cnf 配置
[mysqld]
# 缓冲池大小,通常设置为物理内存的50%-70%
innodb_buffer_pool_size = 8G
# 缓冲池实例数,多核CPU建议设置为4-8
innodb_buffer_pool_instances = 8
# 缓冲池预热,重启时恢复缓冲池状态
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON
# 页大小,对于大表建议设置为64K
innodb_page_size = 16K
# 旧数据淘汰策略
innodb_old_blocks_time = 1000
innodb_old_blocks_pct = 37
监控缓冲池命中率:
-- 缓冲池读写统计
SHOW STATUS LIKE 'Innodb_buffer_pool_read%';
-- 计算命中率(应>99%)
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100
-- 查看缓冲池使用情况
SHOW ENGINE INNODB STATUS\G
-- 查看 BUFFER POOL AND MEMORY 部分
2.3 日志文件优化
日志文件大小和写入策略直接影响事务性能和数据安全性。
# my.cnf 配置
[mysqld]
# 重做日志文件大小,建议1-2G,太大恢复时间长,太小频繁切换
innodb_log_file_size = 2G
# 重做日志文件数量,至少2个
innodb_log_files_in_group = 3
# 日志缓冲区大小,建议8-64M
innodb_log_buffer_size = 64M
# 刷新策略
innodb_flush_log_at_trx_commit = 1 # 1: 每次提交都刷盘(最安全)
# 2: 每秒刷盘(性能好,可能丢失1秒数据)
# 0: 每checkpoint刷盘(性能最好,风险最大)
# 刷新方法
innodb_flush_method = O_DIRECT # 绕过OS缓存,直接写入磁盘
# 刷盘线程数
innodb_flush_neighbors = 1
innodb_flush_sync = OFF
2.4 事务和锁优化
# my.cnf 配置
[mysqld]
# 事务隔离级别,默认REPEATABLE-READ,高并发读多场景可考虑READ-COMMITTED
transaction-isolation = READ-COMMITTED
# 锁等待超时(秒)
innodb_lock_wait_timeout = 50
# 自增锁模式,高并发插入时建议设置为2(交错模式)
innodb_autoinc_lock_mode = 2
# 死锁检测,高并发下可关闭死锁检测,依靠超时机制
innodb_deadlock_detect = ON
# 间隙锁,高并发写入时建议关闭
innodb_locks_unsafe_for_binlog = OFF
三、SQL语句优化
3.1 索引优化策略
索引是提升查询性能最直接有效的方法。
索引设计原则:
- 选择性高的列适合建索引(如ID、邮箱)
- 避免在低选择性列上建索引(如性别、状态)
- 复合索引遵循最左前缀原则
- 避免过多索引(影响写性能)
-- 示例:用户表索引优化
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status TINYINT DEFAULT 1,
create_time DATETIME,
region VARCHAR(50),
age INT,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_status_create_time (status, create_time), -- 复合索引
INDEX idx_region_age (region, age) -- 复合索引
) ENGINE=InnoDB;
-- 优化前:全表扫描
EXPLAIN SELECT * FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- type: ALL, rows: 1000000
-- 优化后:使用索引
EXPLAIN SELECT * FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- type: range, rows: 50000
索引使用技巧:
-- 1. 覆盖索引:查询列都在索引中,避免回表
EXPLAIN SELECT id, username FROM users WHERE username LIKE 'zhang%';
-- 使用 idx_username 索引,Extra: Using index
-- 2. 索引下推:MySQL 5.6+ 特性
EXPLAIN SELECT * FROM users WHERE username LIKE 'zhang%' AND status = 1;
-- 索引过滤后,再回表过滤status
-- 3. 索引合并:MySQL 5.0+ 特性
EXPLAIN SELECT * FROM users WHERE username = 'zhangsan' OR email = 'zhangsan@example.com';
-- 同时使用 idx_username 和 idx_email,然后合并结果
3.2 避免索引失效的常见场景
-- ❌ 错误示例:索引失效
-- 1. 在索引列上使用函数
SELECT * FROM users WHERE DATE(create_time) = '2024-01-01';
-- 优化:改为范围查询
SELECT * FROM users WHERE create_time >= '2024-01-01' AND create_time < '2024-01-02';
-- 2. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000; -- phone是VARCHAR类型
-- 优化:保持类型一致
SELECT * FROM users WHERE phone = '13800138000';
-- 3. 前导模糊查询
SELECT * FROM users WHERE username LIKE '%zhang';
-- 无法使用索引,优化:只使用后缀模糊查询或全文索引
SELECT * FROM users WHERE username LIKE 'zhang%';
-- 4. OR连接不同列(部分版本)
SELECT * FROM users WHERE username = 'zhangsan' OR age = 25;
-- 优化:使用UNION或IN
SELECT * FROM users WHERE username = 'zhangsan'
UNION
SELECT * FROM users WHERE age = 25;
-- 5. 负向查询
SELECT * FROM users WHERE status != 1;
SELECT * FROM users WHERE username NOT IN ('zhangsan', 'lisi');
-- 优化:改为正向查询
SELECT * FROM users WHERE status IN (0, 2, 3);
3.3 分页优化
高并发下的分页查询容易出现性能问题,特别是深度分页。
-- ❌ 低效分页:OFFSET越大越慢
SELECT * FROM orders WHERE user_id = 123 ORDER BY id LIMIT 1000000, 20;
-- ✅ 优化方案1:延迟关联(子查询只查主键)
SELECT o.*
FROM orders o
JOIN (
SELECT id
FROM orders
WHERE user_id = 123
ORDER BY id
LIMIT 1000000, 20
) t ON o.id = t.id;
-- ✅ 优化方案2:记录上次最大ID(适用于滚动加载)
SELECT * FROM orders
WHERE user_id = 123 AND id > 1000000
ORDER BY id
LIMIT 20;
-- ✅ 优化方案3:使用Elasticsearch等搜索引擎
-- 对于超大数据量分页,建议使用ES的search_after
3.4 批量操作优化
高并发下,频繁的单条插入会导致性能瓶颈。
-- ❌ 低效:单条插入循环
INSERT INTO orders (user_id, amount) VALUES (1, 100.00);
INSERT INTO orders (user_id, amount) VALUES (2, 200.00);
-- ... 1000次
-- ✅ 高效:批量插入
INSERT INTO orders (user_id, amount) VALUES
(1, 100.00),
(2, 200.00),
(3, 300.00),
-- ... 1000条
(1000, 100000.00);
-- ✅ 优化:使用LOAD DATA INFILE(万级数据)
LOAD DATA LOCAL INFILE '/tmp/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount);
-- ✅ 优化:使用INSERT DELAYED(已废弃,建议用ON DUPLICATE KEY UPDATE)
INSERT INTO orders (user_id, amount) VALUES (1, 100.00)
ON DUPLICATE KEY UPDATE amount = VALUES(amount);
3.5 复杂查询优化
-- 示例:电商订单统计查询优化
-- ❌ 原始查询:多表JOIN,性能差
SELECT
o.order_id,
u.username,
p.product_name,
o.amount,
o.create_time
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1
AND o.create_time >= '2024-01-01'
AND u.region = '北京'
ORDER BY o.amount DESC
LIMIT 0, 20;
-- ✅ 优化方案:冗余字段 + 覆盖索引
-- 1. 在orders表冗余username和region字段
ALTER TABLE orders ADD COLUMN username VARCHAR(50);
ALTER TABLE orders ADD COLUMN region VARCHAR(50);
-- 2. 创建复合索引
CREATE INDEX idx_status_region_time_amount ON orders (status, region, create_time, amount DESC);
-- 3. 优化后的查询(避免JOIN)
SELECT
order_id,
username,
amount,
create_time
FROM orders
WHERE status = 1
AND region = '北京'
AND create_time >= '2024-01-01'
ORDER BY amount DESC
LIMIT 0, 20;
-- ✅ 优化方案2:物化视图(MySQL 8.0+)
CREATE MATERIALIZED VIEW mv_order_summary AS
SELECT
o.order_id,
u.username,
p.product_name,
o.amount,
o.create_time
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1;
-- 查询物化视图
SELECT * FROM mv_order_summary WHERE create_time >= '2024-01-01';
四、缓存策略
4.1 应用层缓存
使用Redis等内存数据库作为缓存层,减少数据库访问。
import redis
import json
from functools import wraps
# Redis连接配置
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
def cache_query(ttl=300):
"""查询结果缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# 执行原函数
result = func(*args, **kwargs)
# 写入缓存
redis_client.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# 使用示例
@cache_query(ttl=60)
def get_user_info(user_id):
"""从数据库获取用户信息"""
db = ReadWriteSplittingDB()
return db.execute_read("SELECT * FROM users WHERE id = %s", (user_id,))
# 高并发下,90%的请求会命中缓存
4.2 缓存更新策略
class CacheManager:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def get_user_with_cache(self, user_id):
"""获取用户信息(带缓存)"""
cache_key = f"user:{user_id}"
# 1. 先从缓存获取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查询数据库
user = self.db.execute_read("SELECT * FROM users WHERE id = %s", (user_id,))
# 3. 写入缓存
if user:
self.redis.setex(cache_key, 300, json.dumps(user))
return user
def update_user(self, user_id, data):
"""更新用户信息(同步缓存)"""
# 1. 更新数据库
sql = "UPDATE users SET name = %s, email = %s WHERE id = %s"
self.db.execute_write(sql, (data['name'], data['email'], user_id))
# 2. 删除缓存(下次查询时自动更新)
cache_key = f"user:{user_id}"
self.redis.delete(cache_key)
# 3. 可选:立即更新缓存
# self.redis.setex(cache_key, 300, json.dumps(data))
def delete_user(self, user_id):
"""删除用户(同步缓存)"""
# 1. 删除数据库
self.db.execute_write("DELETE FROM users WHERE id = %s", (user_id,))
# 2. 删除缓存
cache_key = f"user:{user_id}"
self.redis.delete(cache_key)
4.3 缓存穿透、击穿、雪崩防护
class CacheProtection:
def __init__(self, redis_client):
self.redis = redis_client
def get_with_cache穿透防护(self, key, fallback_func, ttl=300):
"""防止缓存穿透:查询不存在的数据"""
value = self.redis.get(key)
if value:
return json.loads(value) if value != "null" else None
# 查询数据库
result = fallback_func()
# 缓存空值(防止缓存穿透)
if result is None:
self.redis.setex(key, 60, "null") # 短过期时间
return None
self.redis.setex(key, ttl, json.dumps(result))
return result
def get_with_cache击穿防护(self, key, fallback_func, ttl=300):
"""防止缓存击穿:热点key过期时的并发查询"""
# 使用分布式锁
lock_key = f"lock:{key}"
lock = self.redis.set(lock_key, "1", nx=True, ex=10)
if lock:
try:
# 获取数据
result = fallback_func()
self.redis.setex(key, ttl, json.dumps(result))
return result
finally:
self.redis.delete(lock_key)
else:
# 等待并重试
import time
time.sleep(0.1)
return self.get_with_cache击穿防护(key, fallback_func, ttl)
def get_with_cache雪崩防护(self, key, fallback_func, ttl=300):
"""防止缓存雪崩:随机过期时间"""
# 随机过期时间(基础ttl ± 60秒)
import random
actual_ttl = ttl + random.randint(-60, 60)
result = fallback_func()
if result:
self.redis.setex(key, actual_ttl, json.dumps(result))
return result
五、高并发下的事务优化
5.1 事务设计原则
-- ❌ 错误:长事务(持有锁时间过长)
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
-- 等待用户输入...
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
COMMIT;
-- ✅ 正确:短事务
BEGIN;
-- 先计算好所有值
SET @new_balance_1 = (SELECT balance FROM account WHERE user_id = 1) - 100;
SET @new_balance_2 = (SELECT balance FROM account WHERE user_id = 2) + 100;
-- 然后快速执行
UPDATE account SET balance = @new_balance_1 WHERE user_id = 1;
UPDATE account SET balance = @new_balance_2 WHERE user_id = 2;
COMMIT;
5.2 乐观锁 vs 悲观锁
-- 悲观锁:SELECT ... FOR UPDATE(适合写冲突多的场景)
BEGIN;
-- 锁定记录,其他事务无法修改
SELECT balance FROM account WHERE user_id = 1 FOR UPDATE;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
COMMIT;
-- 乐观锁:使用版本号(适合读多写少的场景)
ALTER TABLE account ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本号
UPDATE account
SET balance = balance - 100, version = version + 1
WHERE user_id = 1 AND version = 0; -- 版本号匹配才更新
-- 检查影响行数
-- 如果影响行数为0,说明版本号不匹配,需要重试
5.3 死锁避免策略
-- 死锁示例:两个事务交叉锁定
-- 事务1: UPDATE account SET balance = 100 WHERE user_id = 1;
-- 事务2: UPDATE account SET balance = 200 WHERE user_id = 2;
-- 事务1: UPDATE account SET balance = 200 WHERE user_id = 2; -- 等待事务2
-- 事务2: UPDATE account SET balance = 100 WHERE user_id = 1; -- 等待事务1(死锁)
-- ✅ 避免策略:固定加锁顺序
-- 事务1和事务2都按user_id排序后加锁
BEGIN;
-- 先锁定user_id=1,再锁定user_id=2
UPDATE account SET balance = 100 WHERE user_id = 1;
UPDATE account SET balance = 200 WHERE user_id = 2;
COMMIT;
六、监控与诊断
6.1 慢查询日志
# my.cnf 配置
[mysqld]
# 开启慢查询日志
slow_query_log = ON
# 慢查询日志文件路径
slow_query_log_file = /var/log/mysql/slow.log
# 慢查询阈值(秒)
long_query_time = 1
# 记录未使用索引的查询
log_queries_not_using_indexes = ON
# 记录管理语句
log_slow_admin_statements = ON
# 日志输出格式
log_output = FILE
分析慢查询日志:
# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
# 使用pt-query-digest分析(更详细)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 实时监控慢查询
tail -f /var/log/mysql/slow.log | grep -v "Query_time: 0"
6.2 性能监控脚本
import pymysql
import time
import psutil
class MySQLMonitor:
def __init__(self, host, user, password):
self.conn = pymysql.connect(
host=host, user=user, password=password,
cursorclass=pymysql.cursors.DictCursor
)
def get_connection_stats(self):
"""获取连接统计"""
with self.conn.cursor() as cursor:
cursor.execute("SHOW STATUS LIKE 'Threads%'")
return {row['Variable_name']: row['Value'] for row in cursor.fetchall()}
def get_innodb_status(self):
"""获取InnoDB状态"""
with self.conn.cursor() as cursor:
cursor.execute("SHOW ENGINE INNODB STATUS")
return cursor.fetchone()['Status']
def get_slow_queries(self, seconds=1):
"""获取慢查询"""
with self.conn.cursor() as cursor:
cursor.execute("""
SELECT * FROM mysql.slow_log
WHERE start_time > NOW() - INTERVAL 60 SECOND
AND query_time > %s
ORDER BY query_time DESC
LIMIT 10
""", (seconds,))
return cursor.fetchall()
def get_table_stats(self, db_name):
"""获取表统计信息"""
with self.conn.cursor() as cursor:
cursor.execute("""
SELECT
table_name,
table_rows,
data_length,
index_length,
ROUND((data_length + index_length) / 1024 / 1024, 2) AS total_mb
FROM information_schema.tables
WHERE table_schema = %s
ORDER BY data_length DESC
""", (db_name,))
return cursor.fetchall()
def monitor_realtime(self, interval=5):
"""实时监控"""
while True:
# 系统资源
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
# MySQL状态
stats = self.get_connection_stats()
print(f"\n=== {time.strftime('%Y-%m-%d %H:%M:%S')} ===")
print(f"CPU: {cpu_percent}%")
print(f"Memory: {memory.percent}%")
print(f"MySQL Connections: {stats.get('Threads_connected', 0)}/{stats.get('Max_connections', 0)}")
print(f"Active Queries: {stats.get('Threads_running', 0)}")
time.sleep(interval)
# 使用示例
monitor = MySQLMonitor('localhost', 'root', 'password')
monitor.monitor_realtime()
6.3 性能模式(Performance Schema)
MySQL 5.6+ 提供了强大的性能监控功能。
-- 启用性能模式(默认开启)
SHOW VARIABLES LIKE 'performance_schema';
-- 查看最耗时的SQL
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT / 1000000000000 AS avg_time_sec,
SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 查看表I/O统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_NUMBER_OF_BYTES_READ / 1024 / 1024 AS read_mb,
SUM_NUMBER_OF_BYTES_WRITE / 1024 / 1024 AS write_mb
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_NUMBER_OF_BYTES_READ + SUM_NUMBER_OF_BYTES_WRITE DESC
LIMIT 10;
-- 查看索引使用情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_FETCH DESC
LIMIT 10;
七、高并发场景实战案例
7.1 秒杀系统优化
场景:10000 QPS的秒杀活动,库存只有100件。
架构设计:
- 前端限流:按钮置灰、验证码
- 服务端限流:令牌桶算法
- 库存预扣:Redis预减库存
- 异步下单:消息队列削峰
import redis
import time
import json
from threading import Lock
class SeckillService:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
self.lock = Lock()
def seckill(self, user_id, item_id):
"""秒杀核心逻辑"""
stock_key = f"seckill:stock:{item_id}"
user_key = f"seckill:user:{item_id}:{user_id}"
# 1. 检查是否已购买
if self.redis.exists(user_key):
return {"success": False, "message": "您已参与过秒杀"}
# 2. 预减库存(原子操作)
stock = self.redis.decr(stock_key)
if stock < 0:
self.redis.incr(stock_key) # 恢复库存
return {"success": False, "message": "库存不足"}
# 3. 标记用户已购买
self.redis.setex(user_key, 3600, "1")
# 4. 发送消息到队列(异步创建订单)
message = {
"user_id": user_id,
"item_id": item_id,
"timestamp": time.time()
}
self.redis.lpush("seckill:order:queue", json.dumps(message))
return {"success": True, "message": "秒杀成功,订单处理中"}
def process_order_queue(self):
"""后台处理订单队列"""
while True:
# 从队列获取消息
message = self.redis.brpop("seckill:order:queue", timeout=5)
if not message:
continue
data = json.loads(message[1])
user_id = data['user_id']
item_id = data['item_id']
try:
# 扣减真实库存(数据库)
sql = "UPDATE items SET stock = stock - 1 WHERE item_id = %s AND stock > 0"
result = self.db.execute_write(sql, (item_id,))
if result > 0:
# 创建订单
order_id = self.create_order(user_id, item_id)
print(f"订单创建成功: {order_id}")
else:
# 库存不足,回滚Redis
self.redis.incr(f"seckill:stock:{item_id}")
except Exception as e:
print(f"订单处理失败: {e}")
# 异常回滚
self.redis.incr(f"seckill:stock:{item_id}")
def create_order(self, user_id, item_id):
"""创建订单"""
sql = """
INSERT INTO orders (order_id, user_id, item_id, status, create_time)
VALUES (UUID(), %s, %s, 'pending', NOW())
"""
self.db.execute_write(sql, (user_id, item_id))
return "ORDER_" + str(int(time.time()))
7.2 朋友圈动态查询优化
场景:用户查看朋友圈,按时间倒序,支持分页。
优化方案:
- 时间线分表(按用户ID分片)
- 热点数据缓存
- 延迟加载大字段
-- 朋友圈动态表(分表)
CREATE TABLE moments_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT,
content TEXT,
images JSON,
create_time DATETIME,
INDEX idx_user_time (user_id, create_time DESC)
) ENGINE=InnoDB;
-- 用户关系表(缓存关注列表)
CREATE TABLE user_follows (
user_id BIGINT,
follow_id BIGINT,
create_time DATETIME,
PRIMARY KEY (user_id, follow_id),
INDEX idx_follow (follow_id)
) ENGINE=InnoDB;
-- 查询优化:只查ID,再延迟加载详情
SELECT m.id
FROM moments_0 m
WHERE m.user_id IN (
SELECT follow_id FROM user_follows WHERE user_id = 123
)
AND m.create_time >= '2024-01-01'
ORDER BY m.create_time DESC
LIMIT 0, 20;
-- 然后根据ID批量查询详情
SELECT * FROM moments_0 WHERE id IN (1,2,3,4,5...);
八、总结与最佳实践
8.1 高并发优化检查清单
架构层:
- [ ] 是否采用读写分离?
- [ ] 数据量是否超过1000万?考虑分库分表
- [ ] 是否使用数据库中间件简化管理?
配置层:
- [ ] max_connections 是否足够?
- [ ] innodb_buffer_pool_size 是否合理?
- [ ] 慢查询日志是否开启?
- [ ] 是否启用查询缓存(MySQL 8.0已移除)?
SQL层:
- [ ] 所有查询都有合适的索引?
- [ ] 避免SELECT *,只查询需要的字段
- [ ] 复杂查询是否可以拆分或冗余字段?
- [ ] 分页查询是否使用延迟关联?
缓存层:
- [ ] 热点数据是否缓存?
- [ ] 是否有缓存穿透、击穿、雪崩防护?
- [ ] 缓存更新策略是否合理?
监控层:
- [ ] 慢查询监控是否到位?
- [ ] 是否有实时性能监控?
- [ ] 是否定期分析慢查询日志?
8.2 性能优化黄金法则
- 二八定律:80%的性能问题由20%的SQL导致,优先优化这些SQL
- 空间换时间:适当冗余字段,使用缓存
- 异步化:非核心逻辑异步处理,减少响应时间
- 限流降级:高并发时保护数据库,避免雪崩
- 持续监控:性能优化是持续过程,需要实时监控和调整
8.3 常见误区
- ❌ 盲目增加硬件资源(应先优化SQL和架构)
- ❌ 过度索引(影响写性能)
- ❌ 忽略事务设计(导致锁竞争)
- ❌ 缓存与数据库不一致(需要合理的更新策略)
- ❌ 忽略监控(无法及时发现问题)
通过以上策略的综合运用,可以有效提升MySQL在高并发场景下的性能表现,支撑海量请求的挑战。记住,性能优化是一个系统工程,需要从架构、配置、SQL、缓存、监控等多个维度综合考虑,持续迭代优化。
