引言:理解高并发场景下的MySQL挑战
在当今互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库,在高并发环境下面临着流量洪峰和锁竞争两大核心挑战。
流量洪峰指的是在短时间内大量请求涌入,导致数据库连接池耗尽、CPU和内存资源紧张,甚至出现服务不可用的情况。锁竞争则是多个事务同时访问相同资源时产生的等待和阻塞,严重时会导致死锁和性能急剧下降。这些问题如果处理不当,不仅会影响用户体验,还可能造成数据不一致或系统崩溃。
本文将从索引优化、查询优化、事务与锁优化、架构升级等多个维度,全面解析MySQL应对高并发的策略。我们将通过具体的案例和代码示例,帮助读者深入理解并掌握这些优化技巧。
一、索引优化:高并发性能的基石
1.1 索引的基本原理与重要性
索引是MySQL中提高查询性能最有效的手段之一。在高并发场景下,合理的索引设计可以将查询性能提升几个数量级,同时减少锁竞争的范围和时间。
索引的工作原理类似于书籍的目录。当我们需要查找特定内容时,通过目录可以快速定位到目标页面,而不需要逐页翻阅。在MySQL中,索引使用B+树数据结构存储,支持高效的范围查询和等值查询。
1.2 索引优化的具体策略
1.2.1 覆盖索引(Covering Index)
覆盖索引是指索引包含了查询所需的所有字段,避免了回表操作,从而大幅提升查询性能。
场景示例:假设我们有一个用户订单表,经常需要查询用户的订单状态。
-- 创建订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
status TINYINT NOT NULL,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_status (user_id, status)
);
-- 优化前的查询(需要回表)
SELECT order_no, amount FROM orders WHERE user_id = 1001 AND status = 1;
-- 优化后的覆盖索引
ALTER TABLE orders ADD INDEX idx_user_status_amount (user_id, status, order_no, amount);
-- 现在查询可以直接从索引中获取所有数据
SELECT order_no, amount FROM orders WHERE user_id = 1001 AND status = 1;
通过EXPLAIN分析可以看到,使用覆盖索引后,Extra字段显示”Using index”,表示避免了回表操作。
1.2.2 最左前缀原则与索引下推
MySQL的复合索引遵循最左前缀原则。假设我们创建了索引(a, b, c),那么以下查询都能有效利用索引:
WHERE a = ?WHERE a = ? AND b = ?WHERE a = ? AND b = ? AND c = ?
但以下查询无法充分利用索引:
WHERE b = ?WHERE b = ? AND c = ?WHERE c = ?
索引下推(ICP) 是MySQL 5.6引入的优化,可以在索引遍历过程中就对不符合条件的记录进行过滤。
-- 创建复合索引
ALTER TABLE orders ADD INDEX idx_user_time_status (user_id, create_time, status);
-- 索引下推优化的查询
SELECT * FROM orders
WHERE user_id = 1001
AND create_time >= '2024-01-01'
AND status = 1;
-- 通过EXPLAIN可以看到,Extra字段显示"Using index condition"
-- 表示MySQL在索引层面就过滤了不符合条件的记录
1.2.3 索引选择性与区分度
索引的选择性是指不重复的索引值与表记录总数的比值。选择性越高,索引的效率越高。通常选择性大于0.3的列才适合建立索引。
-- 计算列的选择性
SELECT
COUNT(DISTINCT status) / COUNT(*) AS selectivity
FROM orders;
-- 对于选择性低的列(如性别、状态),可以考虑:
-- 1. 不建立索引
-- 2. 结合其他高选择性列建立复合索引
-- 3. 使用前缀索引(针对字符串)
ALTER TABLE orders ADD INDEX idx_status_prefix (status(1));
1.3 索引维护与监控
1.3.1 索引使用情况监控
-- 查看索引使用情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_READ,
COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
ORDER BY COUNT_READ DESC;
-- 查找未使用的索引(可能造成写性能下降)
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.INDEX_NAME
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage u
ON t.TABLE_SCHEMA = u.OBJECT_SCHEMA
AND t.TABLE_NAME = u.OBJECT_NAME
AND t.INDEX_NAME = u.INDEX_NAME
WHERE u.COUNT_READ IS NULL
AND t.INDEX_NAME != 'PRIMARY';
1.3.2 索引碎片整理
-- 查看表碎片情况
SELECT
TABLE_NAME,
ROUND(DATA_LENGTH / 1024 / 1024, 2) AS data_size_mb,
ROUND(DATA_FREE / 1024 / 1024, 2) AS free_size_mb
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database';
-- 优化表(整理碎片)
OPTIMIZE TABLE orders;
-- 或者使用ALTER TABLE重建索引
ALTER TABLE orders ENGINE = InnoDB;
二、查询优化:让SQL飞起来
2.1 避免全表扫描
全表扫描是高并发场景下的性能杀手。我们需要通过合理的索引和查询条件避免全表扫描。
2.1.1 避免SELECT *
-- 反例:查询所有字段,造成不必要的网络传输和内存消耗
SELECT * FROM orders WHERE user_id = 1001;
-- 正例:只查询需要的字段
SELECT id, order_no, status, amount FROM orders WHERE user_id = 1001;
-- 更好的做法:使用覆盖索引
ALTER TABLE orders ADD INDEX idx_user_cover (user_id, id, order_no, status, amount);
SELECT id, order_no, status, amount FROM orders WHERE user_id = 1001;
2.1.2 避免在WHERE子句中对字段进行函数操作
-- 反例:无法使用索引
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 正例:可以使用索引
SELECT * FROM orders
WHERE create_time >= '2024-01-01 00:00:00'
AND create_time < '2024-01-02 00:00:00';
2.1.3 避免使用LIKE以%开头
-- 反例:无法使用索引
SELECT * FROM orders WHERE order_no LIKE '%12345';
-- 正例:可以使用索引
SELECT * FROM orders WHERE order_no LIKE 'ABC%';
-- 如果必须使用后缀匹配,可以考虑:
-- 1. 倒序存储 + 索引
ALTER TABLE orders ADD COLUMN order_no_reverse VARCHAR(64);
UPDATE orders SET order_no_reverse = REVERSE(order_no);
ALTER TABLE orders ADD INDEX idx_order_no_reverse (order_no_reverse);
-- 查询时
SELECT * FROM orders WHERE order_no_reverse LIKE REVERSE('%12345');
2.2 分页优化
在高并发场景下,深度分页是一个常见问题。LIMIT 1000000, 10 这样的查询会扫描大量无用数据。
2.2.1 延迟关联优化
-- 原始低效查询
SELECT * FROM orders
WHERE user_id = 1001
ORDER BY create_time DESC
LIMIT 1000000, 10;
-- 优化方案:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders
WHERE user_id = 1001
ORDER BY create_time DESC
LIMIT 1000000, 10
) t ON o.id = t.id;
2.2.2 位置记录法
-- 第一次查询
SELECT * FROM orders
WHERE user_id = 1001
ORDER BY create_time DESC
LIMIT 10;
-- 第二次查询(记录上次最后一条的位置)
SELECT * FROM orders
WHERE user_id = 1001
AND create_time < '2024-01-01 12:00:00' -- 上次最后一条的create_time
ORDER BY create_time DESC
LIMIT 10;
2.3 JOIN优化
2.3.1 小表驱动大表
-- 假设user表1000条记录,orders表100万条记录
-- 正确的做法:小表驱动大表
SELECT u.*, o.order_no
FROM user u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = 1001;
-- 错误的做法:大表驱动小表(性能差)
SELECT u.*, o.order_no
FROM orders o
LEFT JOIN user u ON o.user_id = u.id
WHERE u.id = 1001;
2.3.2 避免笛卡尔积
-- 反例:缺少关联条件,产生笛卡尔积
SELECT * FROM orders, user;
-- 正例:明确关联条件
SELECT * FROM orders o, user u WHERE o.user_id = u.id;
-- 更好的写法:使用INNER JOIN
SELECT * FROM orders o INNER JOIN user u ON o.user_id = u.id;
2.4 批量操作优化
在高并发场景下,频繁的单条插入/更新会造成严重的性能问题。
2.4.1 批量插入
-- 反例:逐条插入(1000次数据库交互)
INSERT INTO orders (user_id, order_no, status) VALUES (1, 'A001', 1);
INSERT INTO orders (user_id, order_no, status) VALUES (2, 'A002', 1);
-- ... 重复1000次
-- 正例:批量插入(1次数据库交互)
INSERT INTO orders (user_id, order_no, status) VALUES
(1, 'A001', 1),
(2, 'A002', 1),
(3, 'A003', 1),
-- ... 一次插入多条
(1000, 'A1000', 1);
-- Java代码示例(使用PreparedStatement批量处理)
String sql = "INSERT INTO orders (user_id, order_no, status) VALUES (?, ?, ?)";
PreparedStatement ps = conn.prepareStatement(sql);
for (Order order : orderList) {
ps.setInt(1, order.getUserId());
ps.setString(2, order.getOrderNo());
ps.setInt(3, order.getStatus());
ps.addBatch();
}
ps.executeBatch();
2.4.2 批量更新
-- 使用CASE WHEN进行批量更新
UPDATE orders
SET status = CASE id
WHEN 1 THEN 2
WHEN 2 THEN 3
WHEN 3 THEN 4
ELSE status
END,
amount = CASE id
WHEN 1 THEN 100.00
WHEN 2 THEN 200.00
WHEN 3 THEN 300.00
ELSE amount
END
WHERE id IN (1, 2, 3);
-- 或者使用INSERT ON DUPLICATE KEY UPDATE
INSERT INTO orders (id, user_id, order_no, status, amount) VALUES
(1, 1001, 'A001', 2, 100.00),
(2, 1002, 'A002', 2, 200.00),
(3, 1003, 'A003', 2, 300.00)
ON DUPLICATE KEY UPDATE
status = VALUES(status),
amount = VALUES(amount);
三、事务与锁优化:解决锁竞争的核心
3.1 理解MySQL锁机制
3.1.1 锁的类型
MySQL中的锁主要分为以下几类:
- 共享锁(S锁):允许事务读取一行数据
- 排他锁(X锁):允许事务删除或更新一行数据
- 意向锁(IS/IX锁):表级锁,表示事务打算在表中的某些行上加S锁或X锁
- 记录锁(Record Lock):锁定索引记录
- 间隙锁(Gap Lock):锁定索引记录之间的间隙
- 临键锁(Next-Key Lock):记录锁 + 间隙锁,用于解决幻读
3.1.2 锁的兼容性矩阵
| 锁类型 | S锁 | X锁 | IS锁 | IX锁 |
|---|---|---|---|---|
| S锁 | 兼容 | 冲突 | 兼容 | 冲突 |
| X锁 | 冲突 | 冲突 | 冲突 | 冲突 |
| IS锁 | 兼容 | 冲突 | 兼容 | 兼容 |
| IX锁 | 冲突 | 冲突 | 兼容 | 兼容 |
3.2 减少锁竞争的策略
3.2.1 缩短事务长度
事务持有锁的时间越长,锁竞争的概率就越大。
// 反例:事务过长,持有锁时间太久
@Transactional
public void processOrder(Long orderId) {
// 1. 查询订单(加S锁)
Order order = orderMapper.selectById(orderId);
// 2. 远程调用库存服务(耗时操作,持有锁)
inventoryService.checkStock(order.getProductId());
// 3. 远程调用支付服务(耗时操作,持有锁)
paymentService.processPayment(order);
// 4. 更新订单状态(加X锁)
order.setStatus(2);
orderMapper.update(order);
}
// 正例:将耗时操作移到事务外
public void processOrder(Long orderId) {
// 1. 先查询(不加锁或只加S锁)
Order order = orderMapper.selectById(orderId);
// 2. 耗时的外部调用(在事务外)
inventoryService.checkStock(order.getProductId());
paymentService.processPayment(order);
// 3. 在短事务中更新数据库
updateOrderStatus(orderId, 2);
}
@Transactional
public void updateOrderStatus(Long orderId, Integer status) {
orderMapper.updateStatus(orderId, status);
}
3.2.2 减少锁的粒度
-- 使用乐观锁(减少悲观锁的使用)
ALTER TABLE orders ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本号
UPDATE orders
SET status = 2, version = version + 1
WHERE id = 1001 AND version = 0;
-- 如果更新失败(版本号不匹配),说明数据已被其他事务修改
-- 应用层需要重试或返回错误
-- Java代码实现乐观锁
public boolean updateOrderWithOptimisticLock(Order order) {
int retryCount = 0;
while (retryCount < MAX_RETRY) {
int affectedRows = orderMapper.updateWithVersion(order);
if (affectedRows > 0) {
return true;
}
// 版本冲突,重试
retryCount++;
order = orderMapper.selectById(order.getId());
}
return false;
}
3.2.3 使用合适的事务隔离级别
-- 查看当前隔离级别
SELECT @@transaction_isolation;
-- 设置事务隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 在应用层设置(Spring Boot)
spring:
datasource:
hikari:
transaction-isolation: TRANSACTION_READ_COMMITTED
隔离级别选择建议:
- READ UNCOMMITTED:不推荐,可能读到脏数据
- READ COMMITTED:大多数场景推荐,避免脏读,锁竞争最小
- REPEATABLE READ:MySQL默认级别,保证可重复读,但会产生间隙锁
- SERIALIZABLE:最高隔离级别,锁竞争最严重,只在特殊场景使用
3.3 死锁检测与处理
3.3.1 死锁场景分析
-- 场景1:交叉更新导致死锁
-- 事务A
BEGIN;
UPDATE orders SET status = 2 WHERE id = 1;
-- 稍后
UPDATE orders SET status = 2 WHERE id = 2;
COMMIT;
-- 事务B(同时执行)
BEGIN;
UPDATE orders SET status = 2 WHERE id = 2;
-- 稍后
UPDATE orders SET status = 2 WHERE id = 1;
COMMIT;
-- 场景2:间隙锁导致的死锁
-- 事务A
BEGIN;
SELECT * FROM orders WHERE id BETWEEN 10 AND 20 FOR UPDATE;
-- 插入id=15的记录
INSERT INTO orders (id, user_id, order_no) VALUES (15, 1001, 'A015');
COMMIT;
-- 事务B(同时执行)
BEGIN;
SELECT * FROM orders WHERE id BETWEEN 10 AND 20 FOR UPDATE;
-- 插入id=15的记录(会阻塞,直到事务A提交或回滚)
INSERT INTO orders (id, user_id, order_no) VALUES (15, 1002, 'A016');
COMMIT;
3.3.2 死锁监控与诊断
-- 查看最近的死锁信息
SHOW ENGINE INNODB STATUS\G
-- 在输出中查找"LATEST DETECTED DEADLOCK"部分
-- 开启死锁监控(MySQL 5.6+)
SET GLOBAL innodb_print_all_deadlocks = ON;
-- 查看死锁相关的系统变量
SHOW VARIABLES LIKE 'innodb_deadlock_detect%';
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout%';
3.3.3 应用层死锁处理策略
public class DeadlockRetryHandler {
private static final int MAX_RETRY = 3;
private static final long RETRY_DELAY_MS = 100;
public <T> T executeWithRetry(Supplier<T> operation) {
int retryCount = 0;
while (retryCount < MAX_RETRY) {
try {
return operation.get();
} catch (DeadlockException e) {
retryCount++;
if (retryCount >= MAX_RETRY) {
throw e;
}
// 随机延迟,避免多个事务同时重试
try {
Thread.sleep(RETRY_DELAY_MS + (long)(Math.random() * 100));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
throw new RuntimeException("Max retry exceeded");
}
}
// 使用示例
DeadlockRetryHandler handler = new DeadlockRetryHandler();
handler.executeWithRetry(() -> {
// 你的业务逻辑
return orderService.processOrder(orderId);
});
3.4 锁等待超时优化
-- 查看当前锁等待情况
SELECT
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;
-- 查看锁等待超时设置
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout';
-- 调整锁等待超时时间(根据业务需求)
SET GLOBAL innodb_lock_wait_timeout = 50;
四、架构升级:从单机到分布式
4.1 读写分离
4.1.1 主从复制原理
MySQL主从复制基于binlog(二进制日志)实现:
- 主库将变更写入binlog
- 从库I/O线程读取主库binlog并写入中继日志(relay log)
- 从库SQL线程执行中继日志中的SQL
4.1.2 读写分离实现
应用层实现(ShardingSphere-JDBC)
# application.yml
spring:
shardingsphere:
datasource:
names: master, slave0, slave1
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master:3306/db
username: root
password: password
slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave0:3306/db
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1:3306/db
username: root
password: password
rules:
readwrite-splitting:
data-sources:
ds0:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave0,slave1
load-balancer-name: round_robin
load-balancers:
round_robin:
type: ROUND_ROBIN
MyCat实现(中间件方式)
<!-- schema.xml -->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
<table name="orders" dataNode="dn1,dn2,dn3" rule="mod-long" />
</schema>
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataNode name="dn3" dataHost="localhost1" database="db3" />
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="1" writeType="0" dbType="mysql" dbDriver="native">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="master:3306" user="root" password="password">
<readHost host="hostS1" url="slave0:3306" user="root" password="password" />
<readHost host="hostS2" url="slave1:3306" user="root" password="password" />
</writeHost>
</dataHost>
4.1.3 主从延迟问题处理
-- 查看从库延迟
SHOW SLAVE STATUS\G
-- 关注 Seconds_Behind_Master 字段
-- 在应用层处理延迟(写后读问题)
public Order getOrderAfterWrite(Long orderId) {
// 写入后立即读取,可能读到旧数据
// 方案1:强制读主库
return orderMapper.selectFromMaster(orderId);
// 方案2:延迟双删
// 1. 删除缓存
// 2. 更新数据库
// 3. 延迟一段时间后再次删除缓存
// 方案3:使用版本号或时间戳
// 读取时带上版本号,如果版本号不匹配,等待或重试
}
4.2 分库分表
4.2.1 垂直拆分
-- 原始大表
CREATE TABLE user (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
password VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(20),
-- 以下为详细信息,查询频率低
address VARCHAR(200),
id_card VARCHAR(20),
bank_card VARCHAR(30),
-- 以下为统计信息,更新频繁
login_count INT,
last_login_time DATETIME,
total_amount DECIMAL(10,2)
);
-- 垂直拆分后
-- 用户基本信息表(高频访问)
CREATE TABLE user_base (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
password VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(20)
);
-- 用户详细信息表(低频访问)
CREATE TABLE user_detail (
user_id BIGINT PRIMARY KEY,
address VARCHAR(200),
id_card VARCHAR(20),
bank_card VARCHAR(30),
FOREIGN KEY (user_id) REFERENCES user_base(id)
);
-- 用户统计信息表(高频更新)
CREATE TABLE user_stats (
user_id BIGINT PRIMARY KEY,
login_count INT,
last_login_time DATETIME,
total_amount DECIMAL(10,2),
FOREIGN KEY (user_id) REFERENCES user_base(id)
);
4.2.2 水平拆分
-- 按用户ID取模分表(订单表)
-- orders_0, orders_1, orders_2, orders_3
-- 创建分表函数
DELIMITER $$
CREATE FUNCTION get_order_table_name(user_id BIGINT)
RETURNS VARCHAR(50)
DETERMINISTIC
BEGIN
DECLARE table_suffix INT;
SET table_suffix = user_id % 4;
RETURN CONCAT('orders_', table_suffix);
END$$
DELIMITER ;
-- 应用层分表路由(Java实现)
public class OrderTableRouter {
private static final int TABLE_COUNT = 4;
public String getTableName(Long userId) {
int suffix = (int) (userId % TABLE_COUNT);
return "orders_" + suffix;
}
public String getTableName(Long userId, Long orderId) {
// 优先使用userId,如果userId为空则使用orderId
if (userId != null) {
return getTableName(userId);
}
int suffix = (int) (orderId % TABLE_COUNT);
return "orders_" + suffix;
}
}
// MyBatis Mapper示例
@Mapper
public interface OrderMapper {
@Select("SELECT * FROM ${tableName} WHERE id = #{id}")
Order selectById(@Param("tableName") String tableName, @Param("id") Long id);
@Insert("INSERT INTO ${tableName} (user_id, order_no, status) VALUES (#{order.userId}, #{order.orderNo}, #{order.status})")
void insert(@Param("tableName") String tableName, @Param("order") Order order);
}
// Service层使用
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
private OrderTableRouter router = new OrderTableRouter();
public Order getOrder(Long id, Long userId) {
String tableName = router.getTableName(userId);
return orderMapper.selectById(tableName, id);
}
public void createOrder(Order order) {
String tableName = router.getTableName(order.getUserId());
orderMapper.insert(tableName, order);
}
}
4.2.3 使用ShardingSphere实现分库分表
# ShardingSphere配置
spring:
shardingsphere:
datasource:
names: ds0, ds1
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://server0:3306/db0
username: root
password: password
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://server1:3306/db1
username: root
password: password
rules:
sharding:
tables:
orders:
actual-data-nodes: ds${0..1}.orders_${0..3}
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: orders-db-inline
sharding-algorithms:
orders-table-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
sharding-column: user_id
sharding-count: 4
orders-db-inline:
type: CLASS_BASED
props:
strategy: standard
algorithmClassName: com.example.ModShardingAlgorithm
sharding-column: user_id
sharding-count: 2
4.3 缓存策略
4.3.1 多级缓存架构
// 本地缓存 + 分布式缓存 + 数据库
public class OrderService {
// 本地缓存(Caffeine)
private final Cache<Long, Order> localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
// Redis分布式缓存
@Autowired
private RedisTemplate<String, Order> redisTemplate;
@Autowired
private OrderMapper orderMapper;
public Order getOrder(Long id) {
// 1. 查询本地缓存
Order order = localCache.getIfPresent(id);
if (order != null) {
return order;
}
// 2. 查询Redis
String key = "order:" + id;
order = redisTemplate.opsForValue().get(key);
if (order != null) {
// 回填本地缓存
localCache.put(id, order);
return order;
}
// 3. 查询数据库
order = orderMapper.selectById(id);
if (order != null) {
// 回填缓存
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
localCache.put(id, order);
}
return order;
}
public void updateOrder(Order order) {
// 更新数据库
orderMapper.update(order);
// 删除缓存(Cache Aside Pattern)
String key = "order:" + order.getId();
redisTemplate.delete(key);
localCache.invalidate(order.getId());
// 延迟双删(处理主从延迟)
scheduledExecutorService.schedule(() -> {
redisTemplate.delete(key);
}, 500, TimeUnit.MILLISECONDS);
}
}
4.3.2 缓存穿透、击穿、雪崩防护
// 缓存穿透防护(查询不存在的数据)
public Order getOrderWithPenetrationProtection(Long id) {
String key = "order:" + id;
String cacheKey = "cache:" + key;
// 查询缓存
String cached = redisTemplate.opsForValue().get(cacheKey);
if ("NULL".equals(cached)) {
return null; // 明确标记为不存在
}
if (cached != null) {
return JSON.parseObject(cached, Order.class);
}
// 查询数据库
Order order = orderMapper.selectById(id);
if (order == null) {
// 缓存空值,防止穿透
redisTemplate.opsForValue().set(cacheKey, "NULL", 5, TimeUnit.MINUTES);
return null;
}
// 正常缓存
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(order), 30, TimeUnit.MINUTES);
return order;
}
// 缓存击穿防护(热点key过期)
public Order getOrderWithBreakdownProtection(Long id) {
String key = "order:" + id;
// 使用分布式锁防止缓存击穿
String lockKey = "lock:" + key;
String lockValue = UUID.randomUUID().toString();
try {
// 尝试获取锁(10秒过期,防止死锁)
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
// 获取锁成功,查询数据库并回填缓存
Order order = orderMapper.selectById(id);
if (order != null) {
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
}
return order;
} else {
// 未获取锁,等待并重试
Thread.sleep(100);
return getOrderWithBreakdownProtection(id);
}
} finally {
// 释放锁(使用Lua脚本保证原子性)
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
// 缓存雪崩防护(大量key同时过期)
public void warmupCache() {
// 预热缓存
List<Order> orders = orderMapper.selectAll();
for (Order order : orders) {
String key = "order:" + order.getId();
// 随机过期时间,避免同时过期
int expireTime = 30 + new Random().nextInt(10); // 30-40分钟
redisTemplate.opsForValue().set(key, order, expireTime, TimeUnit.MINUTES);
}
}
4.4 消息队列削峰
4.4.1 异步处理架构
// Controller层接收请求,快速返回
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {
// 1. 参数校验
validateRequest(request);
// 2. 生成订单ID
String orderId = generateOrderId();
// 3. 发送消息到队列(立即返回)
OrderMessage message = new OrderMessage(orderId, request);
rabbitTemplate.convertAndSend("order.exchange", "order.create", message);
// 4. 返回订单ID,客户端轮询查询结果
return ResponseEntity.accepted().body(orderId);
}
}
// 消费者处理订单
@Component
@RabbitListener(queues = "order.queue")
public class OrderMessageConsumer {
@Autowired
private OrderService orderService;
@RabbitHandler
public void processOrder(OrderMessage message) {
try {
// 1. 检查库存
boolean hasStock = inventoryService.checkStock(message.getProductId());
if (!hasStock) {
// 库存不足,发送失败消息
sendFailureMessage(message, "库存不足");
return;
}
// 2. 创建订单(数据库操作)
Order order = orderService.createOrder(message);
// 3. 扣减库存
inventoryService.deductStock(message.getProductId(), message.getQuantity());
// 4. 发送成功消息
sendSuccessMessage(order);
} catch (Exception e) {
// 异常处理,重试或死信队列
handleException(message, e);
}
}
}
4.4.2 延迟队列实现订单超时取消
// 使用RabbitMQ的TTL + 死信队列实现延迟消息
@Configuration
public class DelayedOrderConfig {
// 正常订单队列
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
// 延迟队列(消息过期后进入死信队列)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "order.exchange");
args.put("x-dead-letter-routing-key", "order.timeout");
args.put("x-message-ttl", 30 * 60 * 1000); // 30分钟超时
return new Queue("order.delay.queue", true, false, false, args);
}
// 死信队列(处理超时订单)
@Bean
public Queue orderTimeoutQueue() {
return new Queue("order.timeout.queue", true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(orderTimeoutQueue())
.to(orderExchange())
.with("order.timeout");
}
}
// 生产者发送延迟消息
public void sendOrderDelayMessage(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.delay",
order,
message -> {
message.getMessageProperties().setExpiration("1800000"); // 30分钟
return message;
}
);
}
// 消费者处理超时订单
@Component
@RabbitListener(queues = "order.timeout.queue")
public class OrderTimeoutConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void handleTimeout(Order order) {
// 检查订单状态
Order dbOrder = orderMapper.selectById(order.getId());
if (dbOrder != null && dbOrder.getStatus() == OrderStatus.PENDING.getValue()) {
// 取消订单
orderMapper.updateStatus(order.getId(), OrderStatus.CANCELLED.getValue());
// 释放库存
inventoryService.releaseStock(order.getProductId(), order.getQuantity());
// 发送取消通知
sendCancelNotification(order);
}
}
}
4.5 数据库连接池优化
4.5.1 HikariCP配置优化
spring:
datasource:
hikari:
# 连接池名称
pool-name: MyHikariCP
# 最小空闲连接
minimum-idle: 10
# 最大连接数(根据业务调整)
maximum-pool-size: 50
# 连接超时时间(毫秒)
connection-timeout: 30000
# 空闲连接存活时间(毫秒)
idle-timeout: 600000
# 连接最大生命周期(毫秒)
max-lifetime: 1800000
# 连接测试查询
connection-test-query: SELECT 1
# 是否启用JMX监控
register-mbeans: true
# 连接预热
initialization-fail-timeout: 1
# 连接泄漏检测
leak-detection-threshold: 60000
4.5.2 连接池监控
// 监控连接池状态
@Component
public class ConnectionPoolMonitor {
@Autowired
private DataSource dataSource;
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void monitorConnectionPool() {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
log.info("连接池状态 - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待连接: {}",
poolMXBean.getActiveConnections(),
poolMXBean.getIdleConnections(),
poolMXBean.getTotalConnections(),
poolMXBean.getThreadsAwaitingConnection());
// 告警逻辑
if (poolMXBean.getThreadsAwaitingConnection() > 10) {
log.warn("连接池等待线程数过高: {}", poolMXBean.getThreadsAwaitingConnection());
// 发送告警
alertService.sendAlert("数据库连接池等待线程数过高");
}
}
}
}
五、监控与告警:及时发现问题
5.1 性能监控指标
5.1.1 关键指标监控
-- 查看数据库整体性能指标
SELECT
VARIABLE_NAME,
VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME IN (
'Threads_connected',
'Threads_running',
'Slow_queries',
'Questions',
'Queries',
'Innodb_row_lock_waits',
'Innodb_row_lock_time_avg',
'Innodb_buffer_pool_reads',
'Innodb_buffer_pool_read_requests'
);
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';
-- 查看InnoDB锁等待
SELECT
COUNT(*) AS lock_waits,
AVG(lock_time) AS avg_lock_time
FROM information_schema.innodb_lock_waits;
5.1.2 慢查询日志分析
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录
-- 分析慢查询日志(使用mysqldumpslow)
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
-- 或者使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
5.1.3 使用Performance Schema监控
-- 查看最耗时的SQL
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT / 1000000000000 AS avg_time_sec,
SUM_ROWS_EXAMINED,
SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 查看表I/O统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_TIMER_WAIT / 1000000000000 AS total_time_sec
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
5.2 应用层监控
5.2.1 Spring Boot Actuator + Micrometer
# application.yml
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
export:
prometheus:
enabled: true
distribution:
percentiles-histogram:
http.server.requests: true
percentiles:
http.server.requests: 0.5,0.95,0.99
// 自定义数据库监控指标
@Component
public class DatabaseMetrics {
private final MeterRegistry meterRegistry;
public DatabaseMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordQuery(String queryType, long duration, boolean success) {
Timer.builder("db.query.duration")
.tag("type", queryType)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
Counter.builder("db.query.count")
.tag("type", queryType)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment();
}
}
5.2.2 自定义埋点
// 在Service层埋点
@Service
public class OrderService {
@Autowired
private DatabaseMetrics metrics;
public Order getOrder(Long id) {
long start = System.currentTimeMillis();
boolean success = false;
try {
Order order = orderMapper.selectById(id);
success = true;
return order;
} finally {
metrics.recordQuery("select", System.currentTimeMillis() - start, success);
}
}
}
5.3 告警策略
5.3.1 Prometheus + Grafana告警规则
# prometheus-alerts.yml
groups:
- name: database
rules:
# 连接数告警
- alert: MySQLTooManyConnections
expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL连接数过高"
description: "当前连接数 {{ $value }}%,超过80%"
# 慢查询告警
- alert: MySQLSlowQueries
expr: rate(mysql_global_status_slow_queries[5m]) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL慢查询增多"
description: "每秒慢查询 {{ $value }} 个"
# 锁等待告警
- alert: MySQLLockWaits
expr: mysql_global_status_innodb_row_lock_waits > 100
for: 5m
labels:
severity: critical
annotations:
summary: "MySQL锁等待严重"
description: "InnoDB行锁等待 {{ $value }} 次"
# 主从延迟告警
- alert: MySQLReplicationLag
expr: mysql_slave_lag_seconds > 30
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL主从延迟"
description: "从库延迟 {{ $value }} 秒"
六、实战案例:秒杀系统优化
6.1 秒杀场景特点分析
秒杀场景具有以下特点:
- 瞬时流量巨大:短时间内百万级请求
- 库存有限:商品数量少,竞争激烈
- 读多写少:大量查询请求,少量下单请求
- 数据一致性要求高:不能超卖
6.2 秒杀系统架构设计
客户端 → CDN → Nginx → 服务网关 → 秒杀服务 → Redis → MySQL
↓
消息队列 → 异步处理 → 数据库
6.3 核心代码实现
6.3.1 库存预热与扣减
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:order:";
/**
* 预热库存到Redis
*/
public void preloadStock(Long seckillId, Integer stock) {
redisTemplate.opsForValue().set(STOCK_KEY + seckillId, stock.toString());
}
/**
* 秒杀下单(Redis预扣库存)
*/
public String seckill(Long seckillId, Long userId) {
String stockKey = STOCK_KEY + seckillId;
String orderKey = ORDER_KEY + seckillId + ":" + userId;
// 1. 检查是否已下单(防止重复下单)
if (redisTemplate.hasKey(orderKey)) {
throw new RuntimeException("您已参与过秒杀");
}
// 2. 使用Lua脚本原子扣减库存
String luaScript =
"local stock = redis.call('get', KEYS[1]) " +
"if not stock then return -1 end " +
"if tonumber(stock) <= 0 then return 0 end " +
"redis.call('decr', KEYS[1]) " +
"return 1";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(stockKey)
);
if (result == null || result == -1) {
throw new RuntimeException("秒杀活动不存在");
}
if (result == 0) {
throw new RuntimeException("库存不足");
}
// 3. 标记已下单
redisTemplate.opsForValue().set(orderKey, "1", 30, TimeUnit.MINUTES);
// 4. 发送消息到队列异步创建订单
SeckillMessage message = new SeckillMessage(seckillId, userId);
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
// 5. 返回订单ID(前端轮询查询结果)
return generateOrderNo();
}
/**
* 异步创建订单(消费者)
*/
@RabbitListener(queues = "seckill.queue")
public void createOrder(SeckillMessage message) {
try {
// 1. 检查数据库库存(防止超卖)
Integer stock = orderMapper.getSeckillStock(message.getSeckillId());
if (stock <= 0) {
// 回滚Redis库存
rollbackStock(message.getSeckillId());
return;
}
// 2. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setSeckillId(message.getSeckillId());
order.setUserId(message.getUserId());
order.setStatus(OrderStatus.SUCCESS.getValue());
orderMapper.insert(order);
// 3. 扣减数据库库存(使用乐观锁)
int affected = orderMapper.decreaseStock(message.getSeckillId());
if (affected == 0) {
// 库存不足,回滚并记录异常
rollbackStock(message.getSeckillId());
log.error("秒杀库存不足,seckillId: {}", message.getSeckillId());
}
} catch (Exception e) {
log.error("创建订单失败", e);
// 异常处理,死信队列或重试
}
}
/**
* 回滚Redis库存
*/
private void rollbackStock(Long seckillId) {
String stockKey = STOCK_KEY + seckillId;
redisTemplate.opsForValue().increment(stockKey);
}
}
6.3.2 数据库层优化
-- 秒杀商品表
CREATE TABLE seckill_goods (
id BIGINT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
stock INT NOT NULL,
start_time DATETIME,
end_time DATETIME,
version INT DEFAULT 0, -- 乐观锁版本号
INDEX idx_time (start_time, end_time)
);
-- 秒杀订单表(分表)
CREATE TABLE seckill_order_0 (
id BIGINT PRIMARY KEY,
order_no VARCHAR(64) UNIQUE,
seckill_id BIGINT,
user_id BIGINT,
status TINYINT,
create_time DATETIME,
INDEX idx_user (user_id),
INDEX idx_seckill (seckill_id)
) PARTITION BY HASH(user_id) PARTITIONS 4;
-- 扣减库存SQL(乐观锁)
UPDATE seckill_goods
SET stock = stock - 1, version = version + 1
WHERE id = ? AND stock > 0 AND version = ?;
-- 查询库存(不加锁)
SELECT stock FROM seckill_goods WHERE id = ?;
6.3.3 接口限流与防刷
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 令牌桶限流
*/
public boolean tryAcquire(String key, int permits, int maxPermits, int refillRate) {
String luaScript =
"local key = KEYS[1] " +
"local permits = tonumber(ARGV[1]) " +
"local maxPermits = tonumber(ARGV[2]) " +
"local refillRate = tonumber(ARGV[3]) " +
"local now = tonumber(ARGV[4]) " +
"local interval = 1000 / refillRate " +
"local tokens = redis.call('hget', key, 'tokens') " +
"local lastRefill = redis.call('hget', key, 'lastRefill') " +
"if not tokens then tokens = maxPermits else tokens = tonumber(tokens) end " +
"if not lastRefill then lastRefill = now else lastRefill = tonumber(lastRefill) end " +
"local elapsed = now - lastRefill " +
"if elapsed > 0 then " +
" tokens = math.min(maxPermits, tokens + elapsed / interval) " +
"end " +
"if tokens >= permits then " +
" tokens = tokens - permits " +
" redis.call('hset', key, 'tokens', tokens, 'lastRefill', now) " +
" redis.call('expire', key, 3600) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList("rate_limit:" + key),
permits, maxPermits, refillRate, System.currentTimeMillis()
);
return result != null && result == 1;
}
}
// 在Controller中使用
@RestController
public class SeckillController {
@Autowired
private RateLimiter rateLimiter;
@Autowired
private SeckillService seckillService;
@PostMapping("/seckill/{seckillId}")
public Result seckill(@PathVariable Long seckillId, @RequestHeader("userId") Long userId) {
// 1. 用户级限流(每个用户每秒最多1次)
String userKey = "user:" + userId;
if (!rateLimiter.tryAcquire(userKey, 1, 1, 1)) {
return Result.error("操作太频繁,请稍后再试");
}
// 2. IP级限流(防止恶意刷单)
String ipKey = "ip:" + getClientIp();
if (!rateLimiter.tryAcquire(ipKey, 10, 100, 10)) {
return Result.error("请求过于频繁,请稍后再试");
}
// 3. 全局限流(保护后端)
String globalKey = "global:seckill:" + seckillId;
if (!rateLimiter.tryAcquire(globalKey, 1000, 10000, 1000)) {
return Result.error("系统繁忙,请稍后再试");
}
// 4. 执行秒杀
try {
String orderNo = seckillService.seckill(seckillId, userId);
return Result.success(orderNo);
} catch (Exception e) {
return Result.error(e.getMessage());
}
}
}
6.4 秒杀系统监控
// 秒杀系统专用监控
@Component
public class SeckillMonitor {
@Autowired
private MeterRegistry meterRegistry;
private final AtomicLong totalRequests = new AtomicLong(0);
private final AtomicLong successRequests = new AtomicLong(0);
private final AtomicLong stockDepletionTime = new AtomicLong(0);
public void recordRequest(boolean success) {
totalRequests.incrementAndGet();
if (success) {
successRequests.incrementAndGet();
}
// 计算成功率
double successRate = (double) successRequests.get() / totalRequests.get();
meterRegistry.gauge("seckill.success.rate", successRate);
}
public void recordStockDepletion(long timeMs) {
stockDepletionTime.set(timeMs);
meterRegistry.timer("seckill.stock.depletion.time").record(timeMs, TimeUnit.MILLISECONDS);
}
@Scheduled(fixedRate = 10000)
public void logMetrics() {
log.info("秒杀监控 - 总请求: {}, 成功: {}, 成功率: {}%, 库存耗时: {}ms",
totalRequests.get(),
successRequests.get(),
totalRequests.get() > 0 ?
String.format("%.2f", (double) successRequests.get() / totalRequests.get() * 100) : 0,
stockDepletionTime.get());
}
}
七、总结与最佳实践
7.1 高并发优化的核心原则
- 空间换时间:使用索引、缓存减少查询时间
- 时间换空间:异步处理、消息队列削峰
- 减少竞争:降低锁粒度、缩短事务
- 分散压力:读写分离、分库分表
- 快速失败:限流、熔断、降级
7.2 优化优先级建议
- 第一优先级:索引优化、慢SQL优化(成本低,效果明显)
- 第二优先级:缓存策略、读写分离(中等成本,效果显著)
- 第三优先级:异步处理、消息队列(较高成本,解决峰值问题)
- 第四优先级:分库分表、架构升级(高成本,解决数据量问题)
7.3 持续优化建议
- 建立性能基线:记录系统正常运行时的指标
- 定期性能分析:每周分析慢查询日志
- 压测常态化:定期进行压力测试
- 监控告警:建立完善的监控体系
- 代码审查:重点关注数据库相关代码
- 容量规划:根据业务增长提前规划扩容
7.4 常见误区提醒
- 过度索引:索引不是越多越好,会影响写性能
- 缓存滥用:缓存一致性问题复杂,需要谨慎设计
- 分库分表过早:会增加系统复杂度,应在单表性能达到瓶颈后再考虑
- 忽视事务:高并发下事务控制不当会导致严重问题
- 缺少监控:没有监控就无法知道优化效果
通过以上全面的优化策略,MySQL可以在高并发场景下保持良好的性能表现。关键是要根据实际业务场景选择合适的优化手段,并持续监控和调整。记住,没有银弹,只有最适合的方案。
