引言:理解高并发挑战
在现代互联网应用中,高并发访问已成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着巨大的并发压力。高并发场景下,数据库性能瓶颈往往成为系统稳定性的关键制约因素。
高并发对MySQL的主要挑战包括:
- 连接数耗尽:大量并发请求导致连接池耗尽,新请求无法获取连接
- CPU资源争抢:频繁的上下文切换和查询执行消耗大量CPU资源
- I/O瓶颈:高并发读写导致磁盘I/O成为性能瓶颈
- 锁竞争:行锁、表锁等资源竞争导致事务等待和死锁
- 缓存失效:热点数据更新导致缓存穿透、雪崩等问题
本文将从架构优化、数据库配置调优、SQL优化、缓存策略等多个维度,系统性地介绍MySQL高并发处理的实战策略。
一、架构层面的优化策略
1.1 读写分离架构
读写分离是应对高并发读多写少场景的经典架构模式。通过将读操作和写操作分离到不同的数据库实例,可以显著提升系统整体吞吐量。
核心原理:
- 主库(Master)负责所有写操作(INSERT/UPDATE/DELETE)
- 从库(Slave)负责所有读操作(SELECT)
- 应用层通过路由策略区分读写请求
实现方案示例:
// 基于Spring Boot的读写分离实现
@Component
public class DataSourceRouter extends AbstractRoutingDataSource {
private static final ThreadLocal<DataSourceType> CONTEXT_HOLDER =
ThreadLocal.withInitial(() -> DataSourceType.MASTER);
public enum DataSourceType {
MASTER, SLAVE
}
public static void setMaster() {
CONTEXT_HOLDER.set(DataSourceType.MASTER);
}
public static void setSlave() {
CONTEXT_HOLDER.set(DataSourceType.SLAVE);
}
public static void clear() {
CONTEXT_HOLDER.remove();
}
@Override
protected Object determineCurrentLookupKey() {
return CONTEXT_HOLDER.get();
}
}
// 使用AOP进行读写路由
@Aspect
@Component
public class DataSourceAspect {
@Before("execution(* com.example.service.*.*(..))")
public void before(JoinPoint joinPoint) {
String methodName = joinPoint.getSignature().getName();
if (methodName.startsWith("get") || methodName.startsWith("list") ||
methodName.startsWith("query")) {
DataSourceRouter.setSlave();
} else {
DataSourceRouter.setMaster();
}
}
@After("execution(* com.example.service.*.*(..))")
public void after() {
DataSourceRouter.clear();
}
}
配置要点:
- 确保主从复制延迟在可接受范围内(通常秒)
- 对于强一致性要求的读操作,仍需走主库
- 监控从库延迟,自动剔除延迟过高的从库
1.2 分库分表策略
当单表数据量超过千万级或并发量极高时,需要考虑分库分表(Sharding)。
水平分表示例:
-- 用户表按user_id取模分表
-- user_0, user_1, ..., user_9
-- 创建分表
CREATE TABLE user_0 (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
created_at TIMESTAMP,
INDEX idx_username (username)
) ENGINE=InnoDB;
-- 分表规则(Java实现)
public class ShardingUtil {
private static final int TABLE_COUNT = 10;
public static String getTableName(String userId) {
int hash = userId.hashCode() % TABLE_COUNT;
return "user_" + (hash < 0 ? -hash : hash);
}
public static String getTableName(Long userId) {
return "user_" + (userId % TABLE_COUNT);
}
}
// 使用示例
String tableName = ShardingUtil.getTableName(userId);
String sql = "INSERT INTO " + tableName + " (id, username, email) VALUES (?, ?, ?)";
分库分表中间件:
- ShardingSphere:功能完整的分布式数据库中间件
- MyCat:基于Cobar的开源数据库中间件
- Vitess:YouTube开源的MySQL集群管理工具
1.3 数据库连接池优化
连接池是应用与数据库之间的关键缓冲层,合理配置可大幅提升并发处理能力。
HikariCP配置示例:
# application.yml
spring:
datasource:
hikari:
# 连接池名称
pool-name: MySQLHikariCP
# 最小空闲连接数
minimum-idle: 10
# 最大连接数(根据并发量调整)
maximum-pool-size: 50
# 连接超时时间(毫秒)
connection-timeout: 30000
# 连接最大生命周期(毫秒)
max-lifetime: 1800000
# 空闲连接超时时间(毫秒)
idle-timeout: 600000
# 连接测试查询
connection-test-query: SELECT 1
# 是否缓存预编译语句
cache-prep-statements: true
# 预编译语句缓存大小
prep-stmt-cache-size: 250
# 预编译语句最大长度
prep-stmt-cache-sql-limit: 2048
连接池监控指标:
- 活跃连接数 vs 最大连接数
- 等待连接的线程数
- 连接获取平均时间
- 连接泄露检测
二、MySQL配置调优
2.1 核心参数优化
MySQL的配置参数直接影响其并发处理能力。以下是最关键的几个参数:
# my.cnf 关键配置
[mysqld]
# 连接相关
max_connections = 1000 # 最大连接数,根据内存和业务调整
max_connect_errors = 100000 # 最大连接错误次数
back_log = 500 # 连接请求队列长度
# InnoDB引擎核心参数
innodb_buffer_pool_size = 16G # 缓冲池大小,通常为物理内存的50-75%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G # 重做日志文件大小
innodb_log_buffer_size = 64M # 重做日志缓冲区
innodb_flush_log_at_trx_commit = 1 # 事务提交策略(1=严格模式,2=性能模式)
innodb_flush_method = O_DIRECT # 避免双缓冲
innodb_io_capacity = 2000 # InnoDB可用的IOPS
innodb_read_io_threads = 8 # 读线程数
innodb_write_io_threads = 8 # 写线程数
# 查询缓存(MySQL 8.0已移除,5.7及之前版本)
query_cache_type = 0 # 关闭查询缓存
query_cache_size = 0
# 临时表和排序
tmp_table_size = 256M # 临时表大小
max_heap_table_size = 256M # 内存表最大大小
sort_buffer_size = 4M # 排序缓冲区(每个线程)
join_buffer_size = 4M # 连接缓冲区(每个线程)
# 其他
table_open_cache = 2000 # 表缓存
table_definition_cache = 1400 # 表定义缓存
thread_cache_size = 100 # 线程缓存
2.2 慢查询日志分析
慢查询日志是发现性能问题的利器,配合工具可快速定位问题SQL。
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 记录超过1秒的查询
SET GLOBAL log_queries_not_using_indexes = 'ON';
-- 查看慢查询统计
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';
使用pt-query-digest分析慢日志:
# 安装Percona Toolkit
sudo apt-get install percona-toolkit
# 分析慢日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 输出示例:
# 120.5s user time, 20s system time, 120.5M rss, 200.5M vsz
# Current date: 2024-01-15 10:30:00
# Hostname: db-server-01
# Files: slow.log
# Overall: 12345 total, 123 unique, 0.00 QPS, 0.00x concurrency ________
# Time range: 2024-01-15 00:00:00 to 10:30:00
# Attribute total min max avg 95% stddev median
# ============ ======= ======= ======= ======= ======= ======= =======
# Exec time 1234s 0.01s 45.2s 1.00s 5.2s 3.45s 0.50s
# Lock time 0.50s 0.00s 0.05s 0.0004s 0.001s 0.002s 0.000s
# Rows sent 123.45M 0 1.00M 10.00 100.00 50.00 5.00
# Rows examine 1234.56M 0 10.00M 100.00 1000.00 500.00 50.00
# Query size 123.45M 20 10240 1024 2048 512 1024
# Profile
# Rank Query ID Response time Calls R/Call V/M Item
# ==== ================== ============= ===== ====== ===== ====
# 1 0x123456789ABCDEF 500.5000 40.5% 5000 0.1001 0.02 SELECT user
# 2 0x987654321FEDCBA 300.3000 24.3% 3000 0.1001 0.03 SELECT order
2.3 使用Performance Schema监控
MySQL 5.6+ 提供的Performance Schema可以实时监控数据库性能指标。
-- 查看当前最耗时的查询
SELECT
THREAD_ID,
EVENT_NAME,
SUM_TIMER_WAIT/1000000000000 AS wait_seconds,
COUNT_STAR AS calls
FROM events_statements_summary_by_thread
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
-- 查看表I/O统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
SUM_TIMER_READ/1000000000000 AS read_seconds,
SUM_TIMER_WRITE/1000000000000 AS write_seconds,
COUNT_READ,
COUNT_WRITE
FROM table_io_waits_summary_by_object_type
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
-- 查看索引使用情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
三、SQL优化策略
3.1 索引优化
索引是提升查询性能最有效的手段,但需要合理设计。
索引设计原则:
- 选择性高的列优先建立索引
- 联合索引遵循最左前缀原则
- 避免过度索引(写操作会变慢)
- 定期分析索引使用情况
索引优化示例:
-- 原始查询(慢)
SELECT * FROM orders
WHERE user_id = 12345
AND status = 'PAID'
AND created_at >= '2024-01-01';
-- 优化方案1:创建联合索引
CREATE INDEX idx_user_status_created ON orders (user_id, status, created_at);
-- 优化方案2:覆盖索引(避免回表)
CREATE INDEX idx_user_status_created_cover ON orders (user_id, status, created_at, order_no, amount);
-- 查看索引使用情况
EXPLAIN SELECT * FROM orders WHERE user_id = 12345 AND status = 'PAID';
-- 关注:type=ref, key=idx_user_status_created, rows=少量
-- 查看索引碎片
SELECT
TABLE_NAME,
INDEX_NAME,
CONCAT(ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2), 'MB') AS total_size,
CONCAT(ROUND(DATA_FREE / 1024 / 1024, 2), 'MB') AS free_size
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database'
ORDER BY DATA_FREE DESC;
-- 重建索引(减少碎片)
ALTER TABLE orders ENGINE=InnoDB; -- 在线DDL,MySQL 5.6+
-- 或
OPTIMIZE TABLE orders; -- 需要锁表,适用于小表
3.2 避免索引失效的常见场景
-- 1. 隐式类型转换
-- 错误:user_id是varchar,传入数字
SELECT * FROM users WHERE user_id = 123; -- 索引失效
-- 正确
SELECT * FROM users WHERE user_id = '123';
-- 2. 函数操作
-- 错误
SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01';
-- 正确
SELECT * FROM orders WHERE created_at >= '2024-01-01' AND created_at < '2024-01-02';
-- 3. LIKE以通配符开头
-- 错误
SELECT * FROM users WHERE username LIKE '%john%';
-- 正确(前缀匹配)
SELECT * FROM users WHERE username LIKE 'john%';
-- 4. OR条件(部分场景)
-- 错误(如果两个条件都有索引,可能失效)
SELECT * FROM orders WHERE user_id = 123 OR amount > 1000;
-- 正确(使用UNION ALL)
SELECT * FROM orders WHERE user_id = 123
UNION ALL
SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;
-- 5. 负向查询
-- 错误
SELECT * FROM orders WHERE status != 'CANCELLED';
-- 正确(如果状态值有限,使用IN)
SELECT * FROM orders WHERE status IN ('PAID', 'SHIPPED', 'DELIVERED');
3.3 分页优化
深度分页是高并发场景下的性能杀手。
-- 原始分页(慢,扫描大量数据)
SELECT * FROM orders
WHERE user_id = 12345
ORDER BY created_at DESC
LIMIT 1000000, 20; -- 跳过100万行
-- 优化方案1:延迟关联(覆盖索引)
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders
WHERE user_id = 12345
ORDER BY created_at DESC
LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;
-- 优化方案2:记录上次位置(业务层优化)
-- 第一次查询
SELECT * FROM orders
WHERE user_id = 12345
AND created_at < '2024-01-15 10:00:00' -- 上次最后一条的时间
ORDER BY created_at DESC
LIMIT 20;
-- 优化方案3:使用子查询(MySQL 8.0+)
WITH ranked_orders AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC) as rn
FROM orders
WHERE user_id = 12345
)
SELECT * FROM ranked_orders WHERE rn BETWEEN 1000001 AND 1000020;
3.4 批量操作优化
高并发下,单条插入/更新会带来巨大开销,批量操作是关键。
// 错误:单条插入,性能极差
for (Order order : orderList) {
orderMapper.insert(order); // 每次都要网络往返
}
// 正确:批量插入
public void batchInsert(List<Order> orderList) {
String sql = "INSERT INTO orders (order_no, user_id, amount, status) VALUES (?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
conn.setAutoCommit(false); // 关闭自动提交
for (int i = 0; i < orderList.size(); i++) {
Order order = orderList.get(i);
ps.setString(1, order.getOrderNo());
ps.setLong(2, order.getUserId());
ps.setBigDecimal(3, order.getAmount());
ps.setString(4, order.getStatus());
ps.addBatch();
// 每1000条提交一次
if (i % 1000 == 0 || i == orderList.size() - 1) {
ps.executeBatch();
conn.commit();
}
}
} catch (SQLException e) {
// 异常处理
}
}
// MyBatis批量插入
<insert id="batchInsert" parameterType="list">
INSERT INTO orders (order_no, user_id, amount, status)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.orderNo}, #{item.userId}, #{item.amount}, #{item.status})
</foreach>
</insert>
// 批量更新
<update id="batchUpdate" parameterType="list">
<foreach collection="list" item="item" separator=";">
UPDATE orders
SET status = #{item.status}, amount = #{item.amount}
WHERE id = #{item.id}
</foreach>
</update>
3.5 事务优化
长事务和大事务会严重影响并发性能。
-- 错误:大事务(一次性插入10万条)
START TRANSACTION;
INSERT INTO logs (...) VALUES (...); -- 10万次
COMMIT;
-- 正确:分批提交
DELIMITER $$
CREATE PROCEDURE batch_insert_logs()
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE batch_size INT DEFAULT 1000;
DECLARE total_rows INT DEFAULT 100000;
WHILE i < total_rows DO
START TRANSACTION;
INSERT INTO logs (...)
VALUES
(CONCAT('log_', i), ...),
(CONCAT('log_', i+1), ...),
...
(CONCAT('log_', i+batch_size-1), ...);
COMMIT;
SET i = i + batch_size;
-- 小延迟,避免CPU占满
DO SLEEP(0.01);
END WHILE;
END$$
DELIMITER ;
-- 事务隔离级别选择
-- 高并发读多写少:READ COMMITTED(降低幻读风险,性能较好)
-- 高并发写多:REPEATABLE READ(默认级别,保证一致性)
SET GLOBAL transaction_isolation = 'READ-COMMITTED';
四、缓存应用策略
4.1 缓存设计原则
缓存是解决高并发问题的终极武器,但需要精心设计。
缓存设计黄金法则:
- 缓存命中率:目标>95%,低于80%需要重新设计
- 缓存更新策略:Cache-Aside、Write-Through、Write-Behind
- 缓存一致性:最终一致性 vs 强一致性
- 缓存容量:根据热点数据量设计
- 过期策略:主动失效 vs 被动过期
4.2 Redis缓存实战
4.2.1 缓存穿透防护
缓存穿透指查询不存在的数据,导致请求直接打到数据库。
// 布隆过滤器 + 缓存方案
@Component
public class CachePenetrationService {
private final RedisTemplate<String, Object> redisTemplate;
private final BloomFilter<String> bloomFilter;
public CachePenetrationService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
// 初始化布隆过滤器,预期插入100万条,误判率0.01%
this.bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000,
0.0001
);
}
public Object getUserById(Long userId) {
String key = "user:" + userId;
// 1. 先查布隆过滤器
if (!bloomFilter.mightContain(key)) {
// 一定不存在
return null;
}
// 2. 查缓存
Object user = redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 3. 查数据库
user = userMapper.selectById(userId);
if (user != null) {
// 4. 写入缓存
redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
} else {
// 5. 缓存空值(防止缓存穿透)
redisTemplate.opsForValue().set(key, NULL_OBJECT, 5, TimeUnit.MINUTES);
}
return user;
}
// 初始化时加载热点数据到布隆过滤器
@PostConstruct
public void initBloomFilter() {
List<Long> userIds = userMapper.selectAllIds();
for (Long userId : userIds) {
bloomFilter.put("user:" + userId);
}
}
}
4.2.2 缓存雪崩防护
缓存雪崩指大量缓存同时失效,导致数据库压力激增。
// 解决方案1:随机过期时间
public void setWithRandomExpire(String key, Object value, int baseExpireSeconds) {
// 基础过期时间 + 随机偏移(0-300秒)
int randomExpire = baseExpireSeconds + new Random().nextInt(300);
redisTemplate.opsForValue().set(key, value, randomExpire, TimeUnit.SECONDS);
}
// 解决方案2:热点数据永不过期 + 后台刷新
@Component
public class HotDataCacheService {
private final RedisTemplate<String, Object> redisTemplate;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@PostConstruct
public void init() {
// 每5分钟刷新热点数据
scheduler.scheduleAtFixedRate(this::refreshHotData, 0, 5, TimeUnit.MINUTES);
}
private void refreshHotData() {
Set<String> hotKeys = redisTemplate.opsForZSet().range("hot:keys", 0, -1);
if (hotKeys != null) {
for (String key : hotKeys) {
// 异步刷新
CompletableFuture.runAsync(() -> {
Object data = loadFromDB(key);
if (data != null) {
redisTemplate.opsForValue().set(key, data, 30, TimeUnit.MINUTES);
}
});
}
}
}
private Object loadFromDB(String key) {
// 从数据库加载数据
return null;
}
}
// 解决方案3:Redis集群 + 本地缓存二级缓存
@Component
public class MultiLevelCacheService {
private final Cache localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
private final RedisTemplate<String, Object> redisTemplate;
public Object get(String key) {
// 1. 本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. Redis缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
localCache.put(key, value);
return value;
}
// 3. 数据库
value = loadFromDB(key);
if (value != null) {
redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
localCache.put(key, value);
}
return value;
}
}
4.2.3 缓存与数据库一致性保证
// 方案1:Cache-Aside + 删除缓存策略
public class CacheAsidePattern {
// 更新操作:先更新数据库,再删除缓存
public void updateOrder(Order order) {
// 1. 更新数据库
orderMapper.update(order);
// 2. 删除缓存(让下次查询重新加载)
String key = "order:" + order.getId();
redisTemplate.delete(key);
// 3. 延迟双删(解决主从延迟)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(500); // 等待主从同步
redisTemplate.delete(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 查询操作
public Order getOrder(Long id) {
String key = "order:" + id;
// 1. 查缓存
Order order = (Order) redisTemplate.opsForValue().get(key);
if (order != null) {
return order;
}
// 2. 查数据库
order = orderMapper.selectById(id);
if (order != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
}
return order;
}
}
// 方案2:基于Canal的数据库binlog监听(最终一致性)
@Component
public class CanalCacheUpdater {
private final RedisTemplate<String, Object> redisTemplate;
// 模拟Canal监听binlog
public void onBinlogEvent(String table, String type, Long id) {
String key = table + ":" + id;
if ("UPDATE".equals(type) || "DELETE".equals(type)) {
// 删除缓存
redisTemplate.delete(key);
// 如果是更新,可以异步预热缓存
if ("UPDATE".equals(type)) {
CompletableFuture.runAsync(() -> {
Object data = loadFromDB(table, id);
if (data != null) {
redisTemplate.opsForValue().set(key, data, 30, TimeUnit.MINUTES);
}
});
}
}
}
}
4.2.4 Redis集群配置
# Redis集群配置(Spring Boot)
spring:
redis:
# 集群模式
cluster:
nodes:
- 192.168.1.101:6379
- 192.168.1.102:6379
- 192.168.1.103:6379
- 192.168.1.104:6379
- 192.168.1.105:6379
- 192.168.1.106:6379
max-redirects: 3
# 连接池配置
lettuce:
pool:
max-active: 100
max-idle: 50
min-idle: 10
max-wait: 1000ms
cluster:
refresh:
adaptive: true # 自适应刷新拓扑
Redis Lua脚本保证原子性:
-- 原子性获取并更新缓存
-- KEYS[1]: 缓存key
-- ARGV[1]: 过期时间(秒)
-- ARGV[2]: 数据库查询结果(JSON字符串)
local key = KEYS[1]
local expire = tonumber(ARGV[1])
local data = ARGV[2]
-- 检查缓存是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
return redis.call('GET', key)
else
-- 缓存不存在,写入并设置过期时间
redis.call('SET', key, data, 'EX', expire)
return data
end
Java调用Lua脚本:
@Component
public class RedisLuaService {
private final RedisTemplate<String, Object> redisTemplate;
private final DefaultRedisScript<String> getOrSetScript;
public RedisLuaService(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
// 加载Lua脚本
getOrSetScript = new DefaultRedisScript<>();
getOrSetScript.setLocation(new ClassPathResource("lua/getOrSet.lua"));
getOrSetScript.setResultType(String.class);
}
public String getOrSet(String key, int expireSeconds, Supplier<String> dataLoader) {
// 执行Lua脚本
String result = redisTemplate.execute(
getOrSetScript,
Collections.singletonList(key),
String.valueOf(expireSeconds),
dataLoader.get() // 只有缓存不存在时才会调用
);
return result;
}
}
4.3 多级缓存架构
// 完整的多级缓存实现
@Component
public class MultiLevelCache {
// L1: Caffeine本地缓存(进程内)
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.SECONDS) // 短过期时间
.recordStats() // 记录统计
.build();
// L2: Redis分布式缓存
private final RedisTemplate<String, Object> redisTemplate;
// L3: 数据库
private final UserMapper userMapper;
public User getUser(Long userId) {
String key = "user:" + userId;
// L1: 本地缓存
User user = (User) localCache.getIfPresent(key);
if (user != null) {
Metrics.counter("cache.hit", "level", "local").increment();
return user;
}
// L2: Redis缓存
user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
// 回填本地缓存
localCache.put(key, user);
Metrics.counter("cache.hit", "level", "redis").increment();
return user;
}
// L3: 数据库
user = userMapper.selectById(userId);
if (user != null) {
// 双写缓存
redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
localCache.put(key, user);
Metrics.counter("cache.miss").increment();
} else {
// 缓存空值
redisTemplate.opsForValue().set(key, NULL_OBJECT, 5, TimeUnit.MINUTES);
}
return user;
}
// 批量查询优化
public List<User> batchGetUsers(List<Long> userIds) {
List<String> keys = userIds.stream()
.map(id -> "user:" + id)
.collect(Collectors.toList());
// 1. 批量查本地缓存
Map<String, Object> localResult = localCache.getAllPresent(keys);
// 2. 批量查Redis
List<String> remainingKeys = keys.stream()
.filter(k -> !localResult.containsKey(k))
.collect(Collectors.toList());
List<Object> redisValues = redisTemplate.opsForValue().multiGet(remainingKeys);
// 3. 组装结果并回填本地缓存
// ... 省略具体实现
// 4. 查数据库(剩余未命中的)
// ... 省略具体实现
return null;
}
}
五、高并发场景下的特殊优化
5.1 秒杀场景优化
秒杀是典型的高并发写场景,需要特殊处理。
架构设计:
- 库存预热:提前将库存加载到Redis
- 请求拦截:前端限流 + 后端限流
- 库存扣减:Redis Lua脚本原子操作
- 异步下单:消息队列削峰
// 秒杀服务实现
@Service
public class SeckillService {
private final RedisTemplate<String, Object> redisTemplate;
private final RabbitTemplate rabbitTemplate;
private final OrderMapper orderMapper;
// 库存扣减Lua脚本
private static final String DEDUCT_STOCK_SCRIPT =
"local stock = redis.call('GET', KEYS[1]) " +
"if tonumber(stock) <= 0 then " +
" return -1 " +
"end " +
"redis.call('DECR', KEYS[1]) " +
"return tonumber(stock) - 1";
private final DefaultRedisScript<Long> deductStockScript;
public SeckillService(RedisTemplate<String, Object> redisTemplate,
RabbitTemplate rabbitTemplate,
OrderMapper orderMapper) {
this.redisTemplate = redisTemplate;
this.rabbitTemplate = rabbitTemplate;
this.orderMapper = orderMapper;
deductStockScript = new DefaultRedisScript<>();
deductStockScript.setScriptText(DEDUCT_STOCK_SCRIPT);
deductStockScript.setResultType(Long.class);
}
/**
* 秒杀下单
* @param userId 用户ID
* @param productId 商品ID
* @return 订单ID(异步生成)
*/
public String seckill(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userKey = "seckill:user:" + productId;
// 1. 检查是否已购买(防止重复下单)
if (redisTemplate.opsForSet().isMember(userKey, userId)) {
throw new RuntimeException("您已参与过秒杀");
}
// 2. 原子扣减库存(Lua脚本)
Long remainingStock = redisTemplate.execute(
deductStockScript,
Collections.singletonList(stockKey)
);
if (remainingStock == null || remainingStock < 0) {
throw new RuntimeException("库存不足");
}
// 3. 记录已购买用户
redisTemplate.opsForSet().add(userKey, userId);
redisTemplate.expire(userKey, 1, TimeUnit.HOURS);
// 4. 发送消息到队列,异步创建订单
SeckillMessage message = new SeckillMessage();
message.setUserId(userId);
message.setProductId(productId);
message.setStock(remainingStock);
rabbitTemplate.convertAndSend("seckill.order", message);
// 5. 返回订单ID(预生成)
String orderId = "SECKILL_" + System.currentTimeMillis() + "_" + userId;
return orderId;
}
/**
* 异步创建订单(消费者)
*/
@RabbitListener(queues = "seckill.order")
public void createOrder(SeckillMessage message) {
try {
// 1. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setAmount(new BigDecimal("99.00"));
order.setStatus("PAID");
order.setCreateTime(new Date());
orderMapper.insert(order);
// 2. 发送订单成功消息
rabbitTemplate.convertAndSend("order.success", order);
} catch (Exception e) {
// 异常处理:记录日志、补偿机制
log.error("创建订单失败", e);
// 库存回滚
String stockKey = "seckill:stock:" + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
}
}
}
5.2 限流策略
// 基于Redis的分布式限流器
@Component
public class RateLimiter {
private final RedisTemplate<String, Object> redisTemplate;
// Lua脚本:令牌桶算法
private static final String RATE_LIMIT_SCRIPT =
"local key = KEYS[1] " +
"local limit = tonumber(ARGV[1]) " +
"local expire = tonumber(ARGV[2]) " +
"local current = tonumber(redis.call('GET', key) or '0') " +
"if current + 1 > limit then " +
" return 0 " +
"else " +
" redis.call('INCR', key) " +
" if current == 0 then " +
" redis.call('EXPIRE', key, expire) " +
" end " +
" return 1 " +
"end";
private final DefaultRedisScript<Long> rateLimitScript;
public RateLimiter(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
rateLimitScript = new DefaultRedisScript<>();
rateLimitScript.setScriptText(RATE_LIMIT_SCRIPT);
rateLimitScript.setResultType(Long.class);
}
/**
* 尝试获取令牌
* @param key 限流key(如:user:123:api:/order/create)
* @param limit 限制数量
* @param expireTime 过期时间(秒)
* @return true-允许,false-拒绝
*/
public boolean tryAcquire(String key, int limit, int expireTime) {
Long result = redisTemplate.execute(
rateLimitScript,
Collections.singletonList(key),
String.valueOf(limit),
String.valueOf(expireTime)
);
return result != null && result == 1;
}
/**
* 限流注解
*/
@Aspect
@Component
public class RateLimitAspect {
@Autowired
private RateLimiter rateLimiter;
@Around("@annotation(rateLimit)")
public Object around(ProceedingJoinPoint pjp, RateLimitAnnotation rateLimit) throws Throwable {
// 生成key:方法名 + 用户ID
String key = String.format("%s:%s:%s",
rateLimit.key(),
getUserId(),
pjp.getSignature().getName());
if (!rateLimiter.tryAcquire(key, rateLimit.limit(), rateLimit.period())) {
throw new RateLimitException("请求过于频繁,请稍后再试");
}
return pjp.proceed();
}
}
}
// 使用示例
@RestController
public class OrderController {
@RateLimitAnnotation(key = "order", limit = 10, period = 60)
@PostMapping("/order/create")
public Result createOrder(@RequestBody OrderDTO orderDTO) {
// 创建订单逻辑
return Result.success();
}
}
5.3 消息队列削峰
// RabbitMQ配置
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
// 队列持久化
return new Queue("order.queue", true);
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order.exchange");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.*");
}
// 消费者限流配置
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5); // 并发消费者数
factory.setMaxConcurrentConsumers(10); // 最大并发数
factory.setPrefetchCount(10); // 预取消息数(关键参数)
return factory;
}
}
// 生产者
@Service
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrderMessage(Order order) {
// 发送延迟消息(用于订单超时取消)
rabbitTemplate.convertAndSend(
"order.exchange",
"order.create",
order,
message -> {
// 设置延迟属性(需要延迟队列插件)
message.getMessageProperties().setDelay(30 * 1000); // 30秒延迟
return message;
}
);
}
}
// 消费者
@Component
public class OrderConsumer {
private final OrderService orderService;
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
try {
// 业务处理
orderService.process(order);
} catch (Exception e) {
// 异常重试
throw new AmqpRejectAndDontRequeueException("处理失败", e);
}
}
}
六、监控与告警
6.1 关键监控指标
-- MySQL监控SQL
-- 1. 当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';
-- 2. 慢查询数量
SHOW STATUS LIKE 'Slow_queries';
-- 3. InnoDB缓冲池命中率
SELECT
(1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';
-- 4. 锁等待情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
LOCK_TYPE,
LOCK_MODE,
LOCK_STATUS,
THREAD_ID,
PROCESSLIST_ID
FROM performance_schema.data_locks;
-- 5. 复制延迟
SHOW SLAVE STATUS\G
-- 关注:Seconds_Behind_Master
6.2 监控系统搭建
Prometheus + Grafana监控方案:
# docker-compose.yml
version: '3'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
mysql_exporter:
image: prom/mysqld-exporter
environment:
- DATA_SOURCE_NAME=exporter:exporter@(mysql:3306)/
ports:
- "9104:9104"
prometheus.yml配置:
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysql_exporter:9104']
关键监控面板:
- MySQL Connections(连接数趋势)
- Query Performance(查询性能)
- InnoDB Metrics(InnoDB引擎指标)
- Replication Lag(复制延迟)
- Slow Queries(慢查询统计)
6.3 告警规则示例
# alert-rules.yml
groups:
- name: mysql_alerts
rules:
- alert: MySQLTooManyConnections
expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL连接数过高"
description: "当前连接数 {{ $value }}%,超过80%"
- alert: MySQLSlowQueries
expr: increase(mysql_global_status_slow_queries[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL慢查询增多"
description: "5分钟内新增 {{ $value }} 个慢查询"
- alert: MySQLReplicationLag
expr: mysql_slave_lag_seconds > 10
for: 5m
labels:
severity: critical
annotations:
summary: "MySQL复制延迟"
description: "从库延迟 {{ $value }} 秒"
七、实战案例:电商秒杀系统
7.1 完整架构图
客户端 → CDN → Nginx → 限流 → 秒杀服务 → Redis集群
↓
消息队列 → 订单服务 → MySQL集群
↓
监控告警系统
7.2 核心代码实现
// 秒杀服务完整实现
@Service
public class SeckillSystem {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private RateLimiter rateLimiter;
// 库存扣减Lua脚本
private static final String DEDUCT_STOCK_SCRIPT =
"local stock_key = KEYS[1] " +
"local stock = tonumber(redis.call('GET', stock_key) or '0') " +
"if stock <= 0 then return -1 end " +
"redis.call('DECR', stock_key) " +
"return stock - 1";
private final DefaultRedisScript<Long> deductScript;
public SeckillSystem() {
deductScript = new DefaultRedisScript<>();
deductScript.setScriptText(DEDUCT_STOCK_SCRIPT);
deductScript.setResultType(Long.class);
}
/**
* 秒杀主流程
*/
@Transactional
public SeckillResult seckill(Long userId, Long productId) {
// 1. 限流检查(用户级)
String userLimitKey = "limit:user:" + userId;
if (!rateLimiter.tryAcquire(userLimitKey, 5, 60)) {
return SeckillResult.fail("请求过于频繁");
}
// 2. 限流检查(IP级)
String ip = getClientIp();
String ipLimitKey = "limit:ip:" + ip;
if (!rateLimiter.tryAcquire(ipLimitKey, 20, 60)) {
return SeckillResult.fail("IP请求过于频繁");
}
// 3. 检查是否已购买
String userKey = "seckill:users:" + productId;
if (redisTemplate.opsForSet().isMember(userKey, userId)) {
return SeckillResult.fail("您已参与过秒杀");
}
// 4. 原子扣减库存
String stockKey = "seckill:stock:" + productId;
Long remaining = redisTemplate.execute(deductScript, Collections.singletonList(stockKey));
if (remaining == null || remaining < 0) {
return SeckillResult.fail("库存不足");
}
// 5. 记录购买用户
redisTemplate.opsForSet().add(userKey, userId);
redisTemplate.expire(userKey, 2, TimeUnit.HOURS);
// 6. 发送消息到队列
SeckillMessage message = new SeckillMessage();
message.setUserId(userId);
message.setProductId(productId);
message.setStock(remaining);
message.setTimestamp(System.currentTimeMillis());
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
// 7. 返回结果
String orderId = "SECKILL_" + System.currentTimeMillis() + "_" + userId;
return SeckillResult.success(orderId);
}
/**
* 异步订单处理
*/
@RabbitListener(queues = "seckill.queue")
public void processSeckillOrder(SeckillMessage message) {
try {
// 1. 检查库存(二次确认)
String stockKey = "seckill:stock:" + message.getProductId();
Long stock = (Long) redisTemplate.opsForValue().get(stockKey);
if (stock == null || stock < 0) {
// 库存异常,回滚
redisTemplate.opsForValue().increment(stockKey);
return;
}
// 2. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setAmount(new BigDecimal("99.00"));
order.setStatus("PAID");
order.setCreateTime(new Date());
orderMapper.insert(order);
// 3. 发送订单成功消息
rabbitTemplate.convertAndSend("order.exchange", "order.success", order);
} catch (Exception e) {
log.error("订单处理失败", e);
// 库存回滚
String stockKey = "seckill:stock:" + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
}
}
/**
* 库存预热
*/
@PostConstruct
public void preloadStock() {
// 从数据库加载库存到Redis
List<Product> products = productMapper.selectAll();
for (Product product : products) {
String key = "seckill:stock:" + product.getId();
redisTemplate.opsForValue().set(key, product.getStock(), 2, TimeUnit.HOURS);
}
}
private String generateOrderNo() {
return "SECKILL_" + System.currentTimeMillis() + "_" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
private String getClientIp() {
// 实际实现从Request中获取
return "127.0.0.1";
}
}
7.3 性能测试数据
测试环境:4核8G MySQL,2核4G Redis,单机应用
并发数:1000
测试时长:60秒
结果:
- 总请求数:60,000
- 成功数:58,200
- 失败数:1,800(限流、库存不足)
- 平均响应时间:45ms
- P99响应时间:120ms
- 数据库QPS:约500(异步写入)
- Redis QPS:约2000
- CPU使用率:应用60%,MySQL 40%,Redis 50%
八、总结与最佳实践
8.1 高并发优化 checklist
架构层:
- [ ] 读写分离是否实施
- [ ] 分库分表是否必要
- [ ] 连接池配置是否合理
- [ ] 缓存策略是否完善
数据库层:
- [ ] 核心参数是否优化
- [ ] 慢查询是否定期清理
- [ ] 索引是否合理
- [ ] 事务是否精简
应用层:
- [ ] 批量操作是否使用
- [ ] 异步处理是否充分
- [ ] 限流是否到位
- [ ] 监控是否完善
8.2 常见误区
- 过度优化:不要在并发量不高的情况下过度设计
- 缓存滥用:缓存不是银弹,不合理的缓存会带来一致性问题
- 忽视监控:没有监控的优化是盲目的
- 单点瓶颈:任何一层都可能成为瓶颈,需要全局视角
8.3 持续优化建议
- 建立性能基线:记录正常情况下的各项指标
- 定期压测:模拟真实场景进行压力测试
- 代码审查:重点关注数据库相关代码
- 知识沉淀:建立团队的优化知识库
通过以上策略的综合应用,可以有效提升MySQL在高并发场景下的性能表现。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。
