引言:理解高并发挑战
在当今互联网应用中,高并发场景已经成为常态。特别是在电商大促、社交媒体热点事件、金融交易高峰期等场景下,系统可能面临每秒数十万甚至上百万的请求冲击。MySQL作为最流行的关系型数据库,在高并发环境下容易出现连接耗尽、锁竞争激烈、磁盘I/O瓶颈等问题,严重时会导致数据库响应缓慢甚至崩溃。
面对百万级流量冲击,单纯依靠硬件升级往往无法从根本上解决问题。我们需要从架构设计、SQL优化、缓存策略、数据库配置等多个维度进行系统性的优化。本文将深入探讨MySQL高并发处理的实战策略,帮助您构建稳定可靠的数据库系统。
一、高并发场景下的MySQL瓶颈分析
1.1 连接层瓶颈
MySQL的连接处理能力受限于max_connections参数,默认值通常为151。在高并发场景下,大量连接请求可能导致连接拒绝(ERROR 1040)。同时,每个连接都会消耗内存资源,过多的连接会导致内存抖动。
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 查看连接拒绝统计
SHOW STATUS LIKE 'Connection_errors_max_connection';
1.2 锁竞争瓶颈
InnoDB存储引擎使用行级锁,但在高并发写入场景下,热点数据的锁竞争依然激烈。特别是在以下场景:
- 高频更新的计数器(如点赞数、浏览量)
- 库存扣减操作
- 订单状态变更
锁等待超时会导致事务回滚,严重影响吞吐量。
1.3 磁盘I/O瓶颈
高并发下的大量随机读写操作会给磁盘带来巨大压力。特别是未命中缓存的数据页读取,以及频繁的redo log写入,都会导致I/O等待升高。
1.4 CPU瓶颈
复杂的查询、大量的排序操作、临时表使用等都会消耗大量CPU资源。在高并发下,CPU可能成为限制因素。
二、连接层优化策略
2.1 连接池配置优化
应用层必须使用连接池,避免频繁创建销毁连接。以下是主流连接池的配置建议:
HikariCP配置示例(Java Spring Boot):
spring:
datasource:
hikari:
# 连接池名称,便于监控
pool-name: HighConcurrencyDBPool
# 最小空闲连接数
minimum-idle: 10
# 最大连接数,根据并发量调整,建议不超过max_connections的80%
maximum-pool-size: 100
# 连接超时时间(毫秒)
connection-timeout: 30000
# 连接最大生命周期(毫秒)
max-lifetime: 1800000
# 空闲连接超时时间(毫秒)
idle-timeout: 300000
# 连接测试查询
connection-test-query: SELECT 1
Druid连接池配置示例:
spring:
datasource:
druid:
# 初始化大小
initial-size: 10
# 最小空闲连接数
min-idle: 10
# 最大连接数
max-active: 100
# 获取连接等待超时时间
max-wait: 60000
# 连接空闲时间检测间隔
time-between-eviction-runs-millis: 60000
# 连接最小生存时间
min-evictable-idle-time-millis: 300000
# 连接测试查询
validation-query: SELECT 1
# 申请连接时执行validationQuery检测连接是否有效
test-on-borrow: false
# 归还连接时执行validationQuery检测连接是否有效
test-on-return: false
# 申请连接时检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery
test-while-idle: true
2.2 MySQL服务器连接参数调优
-- 修改最大连接数(需要重启)
SET GLOBAL max_connections = 500;
-- 查看连接相关状态
SHOW STATUS LIKE 'Threads%';
-- Threads_connected: 当前连接数
-- Threads_running: 当前活跃连接数(正在执行SQL)
-- Threads_cached: 缓存的线程数
-- 查看连接错误
SHOW STATUS LIKE 'Connection_errors%';
重要配置参数:
# my.cnf 或 my.ini
[mysqld]
# 最大连接数,根据服务器内存调整,每个连接约消耗10MB内存
max_connections = 500
# 线程缓存数,减少线程创建开销
thread_cache_size = 50
# back_log表示等待连接的队列长度
back_log = 200
# 连接超时时间(秒)
wait_timeout = 600
interactive_timeout = 600
2.3 读写分离架构
通过主从复制实现读写分离,将读请求分发到从库,减轻主库压力:
// Spring Boot多数据源配置示例
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties("spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
三、SQL优化策略
3.1 索引优化
索引是提升查询性能的关键,但不当的索引会成为性能杀手。
索引设计原则:
- 遵循最左前缀原则
- 避免过多索引(影响写入性能)
- 区分度高的列适合建索引
- 利用覆盖索引避免回表
实战案例:订单表查询优化
-- 原始表结构
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
status TINYINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_user_id (user_id),
INDEX idx_status_create_time (status, create_time)
);
-- 优化前:全表扫描
SELECT * FROM orders WHERE status = 1 AND create_time > '2024-01-01';
-- 优化后:使用索引
-- 确保查询条件包含索引最左列
EXPLAIN SELECT order_no, amount
FROM orders
WHERE status = 1
AND create_time > '2024-01-01'
AND user_id = 12345; -- 添加user_id条件,利用索引
索引优化检查:
-- 查看表索引
SHOW INDEX FROM orders;
-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query%';
-- 开启慢查询日志(生产环境谨慎使用)
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录
3.2 避免索引失效的常见场景
-- 1. 避免在索引列上使用函数
-- 错误示例
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 正确示例
SELECT * FROM orders WHERE create_time >= '2024-01-01' AND create_time < '2024-01-02';
-- 2. 避免使用LIKE以%开头
-- 错误示例
SELECT * FROM orders WHERE order_no LIKE '%12345';
-- 正确示例(如果必须,考虑全文索引)
SELECT * FROM orders WHERE order_no LIKE '12345%';
-- 3. 避免OR条件导致索引失效
-- 错误示例
SELECT * FROM orders WHERE user_id = 123 OR amount > 100;
-- 正确示例
SELECT * FROM orders WHERE user_id = 123
UNION ALL
SELECT * FROM orders WHERE amount > 100 AND user_id != 123;
-- 4. 避免隐式类型转换
-- 错误示例(order_no是varchar,传入数字)
SELECT * FROM orders WHERE order_no = 12345;
-- 正确示例
SELECT * FROM orders WHERE order_no = '12345';
3.3 分页查询优化
高并发场景下,深度分页查询性能极差:
-- 低效分页(扫描大量数据)
SELECT * FROM orders WHERE status = 1 ORDER BY id LIMIT 1000000, 20;
-- 优化方案1:延迟关联(覆盖索引)
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders
WHERE status = 1
ORDER BY id
LIMIT 1000000, 20
) t ON o.id = t.id;
-- 优化方案2:业务层限制分页深度
-- 例如:只允许查询前100页,超过后提示导出数据
-- 优化方案3:使用位置记录
-- 记录上一页最后一条ID
SELECT * FROM orders WHERE status = 1 AND id > 1000000 ORDER BY id LIMIT 20;
3.4 事务优化
长事务会持有锁资源,影响并发:
-- 错误示例:大事务
BEGIN;
UPDATE orders SET status = 2 WHERE user_id = 123;
UPDATE order_items SET status = 2 WHERE order_id IN (SELECT id FROM orders WHERE user_id = 123);
UPDATE inventory SET stock = stock - 1 WHERE product_id IN (SELECT product_id FROM order_items WHERE order_id IN (SELECT id FROM orders WHERE user_id = 123));
COMMIT; -- 持有锁时间过长
-- 正确示例:拆分小事务
-- 事务1
BEGIN;
UPDATE orders SET status = 2 WHERE user_id = 123;
COMMIT;
-- 事务2
BEGIN;
UPDATE order_items SET status = 2 WHERE order_id IN (SELECT id FROM orders WHERE user_id = 123);
COMMIT;
-- 事务3
BEGIN;
UPDATE inventory SET stock = stock - 1 WHERE product_id IN (SELECT product_id FROM order_items WHERE order_id IN (SELECT id FROM orders WHERE user_id = 123));
COMMIT;
四、缓存策略
4.1 多级缓存架构
用户请求 → CDN缓存 → 应用缓存(Redis) → 数据库缓存 → 磁盘
Redis缓存示例:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String ORDER_CACHE_PREFIX = "order:";
private static final long CACHE_TTL = 3600; // 1小时
// 查询订单(缓存+数据库)
public Order getOrder(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 1. 先查缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 2. 缓存未命中,查数据库
order = orderMapper.selectById(orderId);
if (order != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
}
return order;
}
// 更新订单(删除缓存)
public void updateOrder(Order order) {
// 1. 更新数据库
orderMapper.updateById(order);
// 2. 删除缓存(Cache Aside Pattern)
String cacheKey = ORDER_CACHE_PREFIX + order.getId();
redisTemplate.delete(cacheKey);
}
}
缓存穿透防护:
// 解决缓存穿透:查询不存在的数据
public Order getOrderWithPenetrationProtection(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
String nullKey = cacheKey + ":null";
// 检查空值缓存
if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
return null;
}
// 查询缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 查询数据库
order = orderMapper.selectById(orderId);
if (order != null) {
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
} else {
// 缓存空值,防止穿透(设置较短TTL)
redisTemplate.opsForValue().set(nullKey, "", 60, TimeUnit.SECONDS);
}
return order;
}
缓存雪崩防护:
// 解决缓存雪崩:设置随机TTL
public void setCacheWithRandomTTL(String key, Object value) {
long ttl = CACHE_TTL + new Random().nextInt(300); // 增加随机性
redisTemplate.opsForValue().set(key, value, ttl, TimeUnit.SECONDS);
}
// 解决缓存击穿:使用分布式锁
public Order getOrderWithBreakdownProtection(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
String lockKey = "lock:" + orderId;
// 1. 查询缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 2. 获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(lockAcquired)) {
try {
// 双重检查
order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 查询数据库
order = orderMapper.selectById(orderId);
if (order != null) {
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
}
} finally {
redisTemplate.delete(lockKey);
}
} else {
// 等待并重试
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return getOrderWithBreakdownProtection(orderId);
}
return order;
}
4.2 缓存与数据库一致性
最终一致性方案:
// 方案1:延迟双删(适用于写多读少)
public void updateOrderWithDelayDelete(Order order) {
// 1. 删除缓存
String cacheKey = ORDER_CACHE_PREFIX + order.getId();
redisTemplate.delete(cacheKey);
// 2. 更新数据库
orderMapper.updateById(order);
// 3. 延迟再次删除(确保主从同步完成)
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> {
redisTemplate.delete(cacheKey);
}, 500, TimeUnit.MILLISECONDS);
}
// 方案2:订阅binlog同步(使用Canal等工具)
// Canal监听binlog,自动更新缓存
五、数据库架构优化
5.1 分库分表
当单表数据量超过千万级,需要考虑分库分表。
水平分表示例:
-- 按用户ID取模分表
-- orders_0, orders_1, ..., orders_7
-- 分表路由逻辑(Java实现)
public class TableRouter {
private static final int TABLE_COUNT = 8;
public static String getTableName(Long userId) {
int index = (int) (userId % TABLE_COUNT);
return "orders_" + index;
}
public static String getTableName(String orderNo) {
// 根据订单号前缀或哈希
int hash = orderNo.hashCode();
int index = Math.abs(hash) % TABLE_COUNT;
return "orders_" + index;
}
}
// MyBatis动态表名插件
@Intercepts({
@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})
})
public class DynamicTableNameInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
BoundSql boundSql = statementHandler.getBoundSql();
String sql = boundSql.getSql();
// 替换表名
if (sql.contains("orders")) {
// 从参数中获取路由信息
Object parameterObject = boundSql.getParameterObject();
if (parameterObject instanceof Map) {
Map<?, ?> map = (Map<?, ?>) parameterObject;
Long userId = (Long) map.get("userId");
if (userId != null) {
String tableName = TableRouter.getTableName(userId);
sql = sql.replace("orders", tableName);
// 通过反射修改SQL
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, sql);
}
}
}
return invocation.proceed();
}
}
分库分表中间件:ShardingSphere配置
# sharding.yaml
dataSources:
ds_0: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_0
username: root
password: password
ds_1: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_1
username: root
password: password
shardingRule:
tables:
orders:
actualDataNodes: ds_${0..1}.orders_${0..7}
tableStrategy:
inline:
shardingColumn: user_id
algorithmExpression: orders_${user_id % 8}
databaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
bindingTables:
- orders
defaultDatabaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
defaultTableStrategy:
none:
5.2 分区表
对于数据量较大但不需要分库分表的场景,可以使用分区表:
-- 按时间范围分区
CREATE TABLE orders (
id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
status TINYINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
create_time DATETIME NOT NULL,
PRIMARY KEY (id, create_time)
) PARTITION BY RANGE (YEAR(create_time) * 100 + MONTH(create_time)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p202404 VALUES LESS THAN (202405),
PARTITION p202405 VALUES LESS THAN (202406),
PARTITION p202406 VALUES LESS THAN (202407),
PARTITION p202407 VALUES LESS THAN (202408),
PARTITION p202408 VALUES LESS THAN (202409),
PARTITION p202409 VALUES LESS THAN (202410),
PARTITION p202410 VALUES LESS THAN (202411),
PARTITION p202411 VALUES LESS THAN (202412),
PARTITION p202412 VALUES LESS THAN (202501),
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 查询时自动分区裁剪
EXPLAIN SELECT * FROM orders WHERE create_time >= '2024-03-01' AND create_time < '2024-04-01';
-- 只扫描p202403分区
-- 删除旧分区快速清理数据
ALTER TABLE orders DROP PARTITION p202401;
5.3 主从复制与读写分离
主从配置:
-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
sync_binlog = 1
-- 从库配置
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1
-- 主库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
-- 从库配置复制
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=4;
START SLAVE;
SHOW SLAVE STATUS\G
GTID复制(推荐):
-- 主库
[mysqld]
gtid_mode = ON
enforce_gtid_consistency = ON
-- 从库
[mysqld]
gtid_mode = ON
enforce_gtid_consistency = ON
log_slave_updates = ON
read_only = 1
-- 从库配置
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_AUTO_POSITION = 1;
START SLAVE;
5.4 组复制(MGR)
MySQL Group Replication提供高可用和读写分离:
-- 安装插件
INSTALL PLUGIN group_replication SONAME 'group_replication.so';
-- 配置(my.cnf)
[mysqld]
# 基础配置
server_id = 1
gtid_mode = ON
enforce_gtid_consistency = ON
binlog_checksum = NONE
# MGR配置
plugin_load_add = group_replication.so
group_replication_group_name = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
group_replication_start_on_boot = OFF
group_replication_local_address = "192.168.1.101:33061"
group_replication_group_seeds = "192.168.1.101:33061,192.168.1.102:33061,192.168.1.103:33061"
group_replication_bootstrap_group = OFF
-- 启动组复制
SET GLOBAL group_replication_bootstrap_group = OFF;
START GROUP_REPLICATION;
六、MySQL参数调优
6.1 InnoDB核心参数
[mysqld]
# 缓冲池大小(建议物理内存的50-70%)
innodb_buffer_pool_size = 8G
# 缓冲池实例数(缓冲池大于1G时设置)
innodb_buffer_pool_instances = 8
# 日志文件大小(建议1-2GB)
innodb_log_file_size = 2G
innodb_log_files_in_group = 3
# 刷新策略
innodb_flush_log_at_trx_commit = 1 # 1=完全安全,2=性能更好但可能丢失1秒数据
innodb_flush_method = O_DIRECT # 绕过OS缓存
# I/O线程数
innodb_read_io_threads = 8
innodb_write_io_threads = 8
# 并发线程数
innodb_thread_concurrency = 0 # 0表示不限制
# 刷脏页比例
innodb_max_dirty_pages_pct = 75
# 死锁检测
innodb_deadlock_detect = ON
innodb_lock_wait_timeout = 50
# 自适应哈希索引
innodb_adaptive_hash_index = ON
6.2 查询缓存与线程缓存
# 查询缓存(MySQL 8.0已移除,5.7及之前可配置)
query_cache_type = 0 # 建议关闭,高并发下性能差
query_cache_size = 0
# 线程缓存
thread_cache_size = 50
# 连接超时
wait_timeout = 600
interactive_timeout = 600
# back_log
back_log = 200
6.3 临时表与排序缓存
# 内存临时表大小
tmp_table_size = 64M
max_heap_table_size = 64M
# 排序缓冲区
sort_buffer_size = 2M # 每个线程分配,不宜过大
# 读缓冲区
read_buffer_size = 1M
read_rnd_buffer_size = 2M
# Join缓冲区
join_buffer_size = 2M
七、监控与告警
7.1 关键监控指标
-- 1. QPS/TPS
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Com_commit';
-- 2. 慢查询数
SHOW GLOBAL STATUS LIKE 'Slow_queries';
-- 3. 连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Threads_running';
-- 4. InnoDB缓冲池命中率
SHOW STATUS LIKE 'Innodb_buffer_pool_read%';
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100%
-- 5. 锁等待
SHOW STATUS LIKE 'Innodb_row_lock_waits';
SHOW STATUS LIKE 'Innodb_row_lock_time_avg';
-- 6. 磁盘I/O
SHOW STATUS LIKE 'Innodb_data_pending_reads';
SHOW STATUS LIKE 'Innodb_data_pending_writes';
7.2 监控脚本示例
#!/bin/bash
# MySQL监控脚本
MYSQL_CMD="mysql -u root -p'password' -e"
# 获取QPS
QUERIES=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Queries';" | grep Queries | awk '{print $2}')
sleep 1
QUERIES2=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Queries';" | grep Queries | awk '{print $2}')
QPS=$((QUERIES2 - QUERIES))
# 获取TPS
COMMIT=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Com_commit';" | grep Com_commit | awk '{print $2}')
ROLLBACK=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Com_rollback';" | grep Com_rollback | awk '{print $2}')
sleep 1
COMMIT2=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Com_commit';" | grep Com_commit | awk '{print $2}')
ROLLBACK2=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Com_rollback';" | grep Com_rollback | awk '{print $2}')
TPS=$((COMMIT2 - COMMIT + ROLLBACK2 - ROLLBACK))
# 获取连接数
THREADS_CONNECTED=$($MYSQL_CMD "SHOW STATUS LIKE 'Threads_connected';" | grep Threads_connected | awk '{print $2}')
# 获取慢查询数
SLOW_QUERIES=$($MYSQL_CMD "SHOW STATUS LIKE 'Slow_queries';" | grep Slow_queries | awk '{print $2}')
# 输出监控数据
echo "$(date '+%Y-%m-%d %H:%M:%S') QPS:$QPS TPS:$TPS Threads:$THREADS_CONNECTED Slow:$SLOW_QUERIES"
# 告警阈值
if [ $QPS -gt 5000 ]; then
echo "WARNING: QPS is too high: $QPS"
fi
if [ $THREADS_CONNECTED -gt 400 ]; then
echo "WARNING: Too many connections: $THREADS_CONNECTED"
fi
7.3 慢查询分析
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL long_query_time = 1; -- 记录超过1秒的查询
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL min_examined_row_limit = 1000; -- 至少检查1000行才记录
-- 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
-- 使用pt-query-digest分析
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
八、实战案例:应对秒杀场景
8.1 秒杀业务特点
- 瞬时高并发:大量用户同时抢购
- 库存扣减:核心操作,不能超卖
- 数据一致性要求高
8.2 秒杀架构设计
// 秒杀服务完整实现
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductMapper productMapper;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:order:";
private static final String USER_ORDER_KEY = "seckill:user_order:";
/**
* 秒杀流程:
* 1. 前置校验:库存预热、用户限购
* 2. 内存标记:快速失败
* 3. 分布式锁:控制并发
* 4. 异步下单:提升吞吐量
*/
public SeckillResult seckill(Long productId, Long userId) {
String stockKey = STOCK_KEY + productId;
String orderKey = ORDER_KEY + productId;
String userOrderKey = USER_ORDER_KEY + userId + ":" + productId;
// 1. 前置校验:用户是否已下单
if (redisTemplate.hasKey(userOrderKey)) {
return SeckillResult.fail("您已参与秒杀,请勿重复下单");
}
// 2. 内存标记:库存是否充足
String stock = redisTemplate.opsForValue().get(stockKey);
if (stock == null || Integer.parseInt(stock) <= 0) {
return SeckillResult.fail("库存不足");
}
// 3. 分布式锁:防止超卖
String lockKey = "lock:" + productId;
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.SECONDS);
if (!Boolean.TRUE.equals(lockAcquired)) {
return SeckillResult.fail("系统繁忙,请稍后重试");
}
try {
// 4. 双重检查库存
stock = redisTemplate.opsForValue().get(stockKey);
if (stock == null || Integer.parseInt(stock) <= 0) {
return SeckillResult.fail("库存不足");
}
// 5. 扣减库存(Redis原子操作)
Long decrement = redisTemplate.opsForValue().decrement(stockKey);
if (decrement < 0) {
// 回滚
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("库存不足");
}
// 6. 标记用户已下单
redisTemplate.opsForValue().set(userOrderKey, "1", 1, TimeUnit.HOURS);
// 7. 发送MQ消息异步创建订单
SeckillMessage message = new SeckillMessage();
message.setProductId(productId);
message.setUserId(userId);
message.setOrderNo(generateOrderNo());
// 发送到MQ(RabbitMQ/RocketMQ)
mqTemplate.send("seckill.order", message);
return SeckillResult.success(message.getOrderNo());
} finally {
redisTemplate.delete(lockKey);
}
}
/**
* MQ消费者处理订单
*/
@RabbitListener(queues = "seckill.order")
public void handleSeckillOrder(SeckillMessage message) {
try {
// 1. 检查数据库库存
Product product = productMapper.selectById(message.getProductId());
if (product == null || product.getStock() <= 0) {
// 库存不足,回滚Redis
redisTemplate.opsForValue().increment(STOCK_KEY + message.getProductId());
redisTemplate.delete(USER_ORDER_KEY + message.getUserId() + ":" + message.getProductId());
return;
}
// 2. 数据库扣减库存(乐观锁)
int updated = productMapper.decreaseStock(message.getProductId());
if (updated == 0) {
// 扣减失败,回滚
redisTemplate.opsForValue().increment(STOCK_KEY + message.getProductId());
redisTemplate.delete(USER_ORDER_KEY + message.getUserId() + ":" + message.getProductId());
return;
}
// 3. 创建订单
Order order = new Order();
order.setOrderNo(message.getOrderNo());
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setAmount(product.getPrice());
order.setStatus(1);
order.setCreateTime(new Date());
orderMapper.insert(order);
// 4. 订单创建成功,无需额外操作
} catch (Exception e) {
// 异常处理:记录日志、补偿机制
log.error("处理秒杀订单异常", e);
// 发送到死信队列,人工补偿
}
}
private String generateOrderNo() {
return "SK" + System.currentTimeMillis() + new Random().nextInt(1000);
}
}
8.3 数据库表设计
-- 秒杀商品表
CREATE TABLE seckill_product (
id BIGINT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10,2) NOT NULL,
stock INT NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NOT NULL,
version INT NOT NULL DEFAULT 0, -- 乐观锁版本号
INDEX idx_time (start_time, end_time)
);
-- 秒杀订单表(分表)
CREATE TABLE seckill_order_0 (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_no VARCHAR(64) NOT NULL UNIQUE,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status TINYINT NOT NULL,
create_time DATETIME NOT NULL,
INDEX idx_user_product (user_id, product_id)
) PARTITION BY HASH(user_id) PARTITIONS 8;
-- 乐观锁更新
UPDATE seckill_product
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND stock > 0 AND version = 1;
九、压力测试与性能调优
9.1 使用sysbench进行压测
# 安装sysbench
yum install sysbench
# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test --table-size=1000000 \
/usr/share/sysbench/oltp_read_write.lua prepare
# 执行压测(100并发,60秒)
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test --table-size=1000000 --threads=100 --time=60 \
--report-interval=10 \
/usr/share/sysbench/oltp_read_write.lua run
# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=test \
/usr/share/sysbench/oltp_read_write.lua cleanup
9.2 自定义压测脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
import pymysql
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
# 数据库配置
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test',
'charset': 'utf8mb4'
}
# 统计信息
stats = {
'total': 0,
'success': 0,
'failed': 0,
'slow': 0,
'lock_wait': 0,
'start_time': time.time()
}
lock = threading.Lock()
def get_db_connection():
return pymysql.connect(**DB_CONFIG)
def execute_query(thread_id):
conn = None
try:
conn = get_db_connection()
cursor = conn.cursor()
# 模拟不同类型的查询
query_type = random.choice(['read', 'write', 'update'])
if query_type == 'read':
# 读操作
user_id = random.randint(1, 10000)
cursor.execute("SELECT * FROM orders WHERE user_id = %s LIMIT 10", (user_id,))
cursor.fetchall()
elif query_type == 'write':
# 写操作
order_no = f"TEST{int(time.time()*1000)}{random.randint(1000,9999)}"
user_id = random.randint(1, 10000)
amount = round(random.uniform(10, 1000), 2)
cursor.execute(
"INSERT INTO orders (order_no, user_id, amount, status, create_time) VALUES (%s, %s, %s, 1, NOW())",
(order_no, user_id, amount)
)
conn.commit()
else:
# 更新操作
user_id = random.randint(1, 10000)
cursor.execute(
"UPDATE orders SET amount = amount + 1 WHERE user_id = %s AND status = 1",
(user_id,)
)
conn.commit()
with lock:
stats['success'] += 1
stats['total'] += 1
return True
except pymysql.err.OperationalError as e:
if e.args[0] == 1205: # Lock wait timeout
with lock:
stats['lock_wait'] += 1
stats['failed'] += 1
else:
with lock:
stats['failed'] += 1
return False
except Exception as e:
with lock:
stats['failed'] += 1
return False
finally:
if conn:
conn.close()
def monitor():
"""监控线程"""
while True:
time.sleep(10)
elapsed = time.time() - stats['start_time']
qps = stats['total'] / elapsed if elapsed > 0 else 0
print(f"\n[{time.strftime('%H:%M:%S')}] "
f"Total: {stats['total']} | "
f"QPS: {qps:.2f} | "
f"Success: {stats['success']} | "
f"Failed: {stats['failed']} | "
f"LockWait: {stats['lock_wait']}")
def main():
# 启动监控线程
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
# 压测参数
threads = 100 # 并发数
duration = 60 # 持续时间(秒)
total_requests = threads * 10 # 每个线程请求数
print(f"开始压测:并发{threads},持续{duration}秒")
start = time.time()
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = []
for i in range(total_requests):
future = executor.submit(execute_query, i)
futures.append(future)
# 控制总时长
if time.time() - start > duration:
break
# 等待完成或超时
for future in as_completed(futures, timeout=duration + 10):
try:
future.result(timeout=0.1)
except:
pass
elapsed = time.time() - start
print(f"\n\n压测结果:")
print(f"总耗时: {elapsed:.2f}秒")
print(f"总请求数: {stats['total']}")
print(f"成功数: {stats['success']}")
print(f"失败数: {stats['failed']}")
print(f"QPS: {stats['total'] / elapsed:.2f}")
print(f"成功率: {stats['success'] / stats['total'] * 100:.2f}%")
print(f"锁等待: {stats['lock_wait']}")
if __name__ == '__main__':
main()
9.3 性能调优流程
- 基线测试:记录当前性能指标
- 识别瓶颈:通过监控定位问题
- 针对性优化:索引、参数、架构
- 回归测试:验证优化效果
- 持续监控:建立长期监控体系
十、故障排查与应急处理
10.1 常见故障场景
场景1:连接数爆满
# 紧急处理
mysql -u root -p -e "SHOW PROCESSLIST;" > /tmp/processlist.txt
# 杀掉空闲连接
mysql -u root -p -e "SELECT CONCAT('KILL ',id,';') FROM information_schema.processlist WHERE command = 'Sleep' AND time > 600;" | mysql -u root -p
# 临时调整连接数
mysql -u root -p -e "SET GLOBAL max_connections = 1000;"
场景2:慢查询导致CPU飙升
# 查看当前正在执行的查询
mysql -u root -p -e "SHOW FULL PROCESSLIST;" | grep -v Sleep
# 杀掉慢查询
mysql -u root -p -e "SELECT CONCAT('KILL ',id,';') FROM information_schema.processlist WHERE time > 60;" | mysql -u root -p
# 临时关闭慢查询日志避免磁盘写满
mysql -u root -p -e "SET GLOBAL slow_query_log = OFF;"
场景3:磁盘空间不足
# 查看磁盘使用
df -h
# 查看日志文件
ls -lh /var/log/mysql/
# 清理旧日志
mysql -u root -p -e "PURGE BINARY LOGS BEFORE '2024-01-01 00:00:00';"
# 临时调整innodb日志文件
# 注意:需要重启,谨慎操作
10.2 诊断工具
-- 查看InnoDB状态(重要)
SHOW ENGINE INNODB STATUS\G
-- 查看锁信息
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;
-- 查看事务
SELECT * FROM information_schema.INNODB_TRX;
-- 查看缓冲池状态
SHOW ENGINE INNODB STATUS\G
-- 关注:BUFFER POOL AND MEMORY, ROW OPERATIONS
10.3 应急预案
预案1:降级策略
- 关闭非核心功能
- 读请求走从库
- 写请求限流
预案2:熔断机制
// 使用Hystrix或Resilience4j
@HystrixCommand(
fallbackMethod = "getOrderFallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000"),
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20"),
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "5000")
}
)
public Order getOrder(Long id) {
return orderMapper.selectById(id);
}
public Order getOrderFallback(Long id) {
// 返回缓存或默认值
return cachedOrderService.get(id);
}
十一、总结与最佳实践
11.1 高并发处理黄金法则
- 缓存为王:90%的读请求应该在缓存层解决
- 异步化:写操作尽量异步,提升响应速度
- 限流降级:保护数据库不被压垮
- 分而治之:通过分库分表分散压力
- 监控先行:建立完善的监控告警体系
11.2 配置检查清单
应用层:
- [ ] 使用连接池
- [ ] 实现多级缓存
- [ ] 异步处理写操作
- [ ] 限流熔断
数据库层:
- [ ] 索引优化
- [ ] 读写分离
- [ ] 参数调优
- [ ] 慢查询监控
架构层:
- [ ] 主从复制
- [ ] 分库分表
- [ ] 缓存集群
- [ ] 消息队列
11.3 持续优化建议
- 定期review慢查询:每周分析慢查询日志
- 监控指标基线:建立性能基线,及时发现异常
- 容量规划:根据业务增长提前扩容
- 演练故障场景:定期进行故障演练,验证预案有效性
- 文档沉淀:记录每次故障和优化经验
通过以上策略的综合应用,可以有效应对百万级流量冲击,确保MySQL数据库在高并发场景下的稳定运行。记住,没有银弹,需要根据具体业务场景选择合适的组合策略,并持续监控和优化。
