引言:理解高并发环境下的MySQL挑战
在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着前所未有的挑战。高并发环境下,数据库可能面临连接数耗尽、CPU飙升、磁盘I/O瓶颈、锁竞争激烈等问题,导致响应时间延长甚至服务不可用。
高并发处理的核心目标是:在保证数据一致性和完整性的前提下,最大化数据库的吞吐量和响应速度。这需要从架构设计、配置优化、SQL调优、硬件资源等多个维度综合考虑。本文将深入探讨MySQL在高并发环境下的处理策略,提供可落地的优化方案。
一、连接层优化:从源头控制并发压力
1.1 合理配置连接数参数
MySQL的连接处理是高并发的第一道关卡。关键参数包括max_connections、back_log和thread_cache_size。
-- 查看当前连接数配置
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'thread_cache_size';
SHOW STATUS LIKE 'Threads_connected';
-- 动态调整连接数(无需重启)
SET GLOBAL max_connections = 2000;
SET GLOBAL thread_cache_size = 100;
配置建议:
max_connections:默认值151,高并发场景建议设置为CPU核心数×2×256,但不超过操作系统文件描述符限制。通常设置为1000-5000。back_log:请求连接队列长度,建议设置为max_connections的1/4到1/2,例如500-1000。thread_cache_size:线程缓存,避免频繁创建销毁线程。建议设置为100-200。
监控指标:
-- 查看连接拒绝情况
SHOW STATUS LIKE 'Aborted_connects';
-- 查看慢连接
SHOW STATUS LIKE 'Slow_launch_threads';
1.2 连接池技术应用
应用层使用连接池是高并发的标配。以Java为例,使用HikariCP连接池:
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class DataSourceConfig {
public static HikariDataSource createDataSource() {
HikariConfig config = new H1kariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=UTC");
config.setUsername("root");
config.setPassword("password");
// 核心连接池参数
config.setMaximumPoolSize(50); // 最大连接数
config.setMinimumIdle(10); // 最小空闲连接
config.setConnectionTimeout(30000); // 连接超时30秒
config.setIdleTimeout(600000); // 空闲超时10分钟
config.setMaxLifetime(1800000); // 连接最大存活30分钟
config.setLeakDetectionThreshold(60000); // 泄漏检测60秒
// 性能优化
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
return new HikariDataSource(config);
}
}
关键配置说明:
maximumPoolSize:根据应用服务器CPU和内存配置,通常每个连接消耗2-10MB内存。rewriteBatchedStatements=true:启用批量SQL重写,大幅提升批量插入性能。useServerPrepStmts=true:启用服务器端预编译语句,减少SQL解析开销。
1.3 读写分离架构
读写分离是分散并发压力的有效手段。主库处理写操作,多个从库处理读操作。
-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
log_bin = mysql-bin
read_only = 1
super_read_only = 1
Java代码实现读写分离:
public class RoutingDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setMaster() {
contextHolder.set("master");
}
public static void setSlave() {
contextHolder.set("slave");
}
public static void clear() {
contextHolder.remove();
}
@Override
protected Object determineCurrentLookupKey() {
return contextHolder.get();
}
}
// 使用AOP进行读写分离
@Aspect
@Component
public class DataSourceAspect {
@Before("execution(* com.example.service.*.*(..))")
public void before(JoinPoint joinPoint) {
String methodName = joinPoint.getSignature().getName();
if (methodName.startsWith("get") || methodName.startsWith("list") || methodName.startsWith("count")) {
RoutingDataSource.setSlave();
} else {
RoutingDataSource.setMaster();
}
}
@After("execution(* com.example.service.*.*(..))")
public void after() {
RoutingDataSource.clear();
}
}
二、存储引擎优化:InnoDB深度调优
2.1 内存参数配置
InnoDB的性能很大程度上取决于内存配置。关键参数包括innodb_buffer_pool_size、innodb_log_file_size和innodb_flush_log_at_trx_commit。
-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_log_file_size';
SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit';
-- 动态调整(MySQL 5.7+)
SET GLOBAL innodb_buffer_pool_size = 4*1024*1024*1024; -- 4GB
配置建议:
innodb_buffer_pool_size:核心参数,建议设置为系统内存的50%-75%。例如,32GB内存的服务器可设置为24GB。innodb_buffer_pool_instances:缓冲池实例数,减少并发竞争。建议每个实例至少1GB,最大64个。例如24GB可设置为8-12个。innodb_log_file_size:重做日志文件大小,建议1-2GB。太小会导致频繁checkpoint,太大会增加恢复时间。innodb_flush_log_at_trx_commit:ACID保证级别:1(默认):每次事务提交都写入磁盘,最安全但最慢。0:每秒写入磁盘,性能最高但可能丢失1秒数据。2:每次提交写入OS缓存,每秒刷盘,折中方案。
my.cnf完整配置示例:
[mysqld]
# 内存配置
innodb_buffer_pool_size = 24G
innodb_buffer_pool_instances = 12
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON
# 日志配置
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# IO配置
innodb_io_capacity = 2000
innodb_io_capacity_max = 4000
innodb_flush_neighbors = 0
# 并发控制
innodb_thread_concurrency = 0
innodb_read_io_threads = 8
innodb_write_io_threads = 8
2.2 事务与锁优化
高并发下锁竞争是性能杀手。优化策略包括:
1. 减少事务粒度:
-- 错误示例:大事务
START TRANSACTION;
UPDATE orders SET status = 'processing' WHERE user_id = 1001;
UPDATE order_items SET quantity = 5 WHERE order_id = 1001;
UPDATE inventory SET stock = stock - 5 WHERE product_id = 2001;
-- ... 更多操作
COMMIT;
-- 正确示例:小事务
START TRANSACTION;
UPDATE orders SET status = 'processing' WHERE order_id = 1001 AND user_id = 1001;
COMMIT;
START TRANSACTION;
UPDATE order_items SET quantity = 5 WHERE order_id = 1001;
COMMIT;
2. 乐观锁替代悲观锁:
-- 悲观锁(性能差)
SELECT * FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1 WHERE id = 1;
-- 乐观锁(推荐)
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 5; -- 假设当前版本是5
-- 检查影响行数,如果为0说明被其他事务修改,需要重试
3. 死锁检测与处理:
-- 查看死锁信息
SHOW ENGINE INNODB STATUS\G
-- 开启死锁监控
SET GLOBAL innodb_print_all_deadlocks = ON;
-- 应用层重试机制(Java示例)
public <T> T executeWithRetry(Function<Void, T> operation, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
return operation.apply(null);
} catch (DeadlockException e) {
attempt++;
if (attempt >= maxRetries) throw e;
// 指数退避
try {
Thread.sleep((long) (Math.random() * Math.pow(2, attempt) * 100));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
throw new RuntimeException("Max retries exceeded");
}
2.3 索引优化策略
索引是查询性能的灵魂。高并发场景下,索引不当会导致全表扫描,引发灾难。
1. 覆盖索引(Covering Index):
-- 查询:只访问索引就能返回结果
SELECT user_id, order_id, amount FROM orders
WHERE user_id = 1001 AND status = 'paid';
-- 优化:创建覆盖索引
CREATE INDEX idx_user_status_amount ON orders(user_id, status, amount);
2. 前缀索引:
-- 对长字符串列使用前缀索引
SELECT COUNT(*) FROM users WHERE email = 'test@example.com';
-- 优化:只索引前20个字符(根据选择性调整)
CREATE INDEX idx_email_prefix ON users(email(20));
3. 索引下推(ICP): MySQL 5.6+ 支持索引下推,自动优化:
-- 查询条件
SELECT * FROM users WHERE age > 25 AND name LIKE 'A%';
-- 索引:idx_age_name(age, name)
-- MySQL会先在索引层过滤age,再过滤name,减少回表次数
4. 索引监控与清理:
-- 查看未使用的索引(MySQL 8.0+)
SELECT * FROM sys.schema_unused_indexes;
-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;
-- 查看索引使用情况
SHOW INDEX FROM orders;
三、查询优化:从SQL到执行计划
3.1 执行计划分析
使用EXPLAIN分析查询性能,是SQL优化的第一步。
-- 基础用法
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;
-- 格式化输出
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 1001;
-- 查看实际执行计划(包括统计信息)
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 1001;
执行计划关键字段解读:
type:访问类型,性能从好到坏:system > const > eq_ref > ref > range > index > ALLkey:实际使用的索引rows:预估扫描行数Extra:额外信息,如Using index(覆盖索引)、Using filesort(文件排序)
优化示例:
-- 问题SQL:全表扫描
EXPLAIN SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01';
-- type: ALL, rows: 1000000
-- 优化1:使用日期范围
EXPLAIN SELECT * FROM orders
WHERE created_at >= '2024-01-01 00:00:00'
AND created_at < '2024-01-02 00:00:00';
-- type: range, rows: 5000
-- 优化2:添加索引
CREATE INDEX idx_created_at ON orders(created_at);
-- type: range, rows: 5000, Extra: Using index condition
3.2 避免常见陷阱
1. 隐式类型转换:
-- phone字段是VARCHAR类型
SELECT * FROM users WHERE phone = 13800138000; -- 隐式转换,索引失效
SELECT * FROM users WHERE phone = '13800138000'; -- 正确
2. OR条件优化:
-- 问题SQL
SELECT * FROM orders WHERE user_id = 1001 OR amount > 1000;
-- 优化1:UNION ALL
(SELECT * FROM orders WHERE user_id = 1001)
UNION ALL
(SELECT * FROM orders WHERE amount > 1000 AND user_id != 1001);
-- 优化2:使用IN(如果值不多)
SELECT * FROM orders WHERE user_id IN (1001, 1002, 1003);
3. 分页优化:
-- 问题SQL:深度分页性能差
SELECT * FROM orders WHERE status = 'paid' ORDER BY id LIMIT 1000000, 20;
-- 优化1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM orders WHERE status = 'paid' ORDER BY id LIMIT 1000000, 20) t
ON o.id = t.id;
-- 优化2:书签记录位置
SELECT * FROM orders
WHERE status = 'paid' AND id > 1000000
ORDER BY id LIMIT 20;
3.3 批量操作与预编译
批量插入优化:
// 错误方式:逐条插入
for (Order order : orders) {
jdbcTemplate.update("INSERT INTO orders (user_id, amount) VALUES (?, ?)",
order.getUserId(), order.getAmount());
}
// 正确方式:批量插入
jdbcTemplate.batchUpdate("INSERT INTO orders (user_id, amount) VALUES (?, ?)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Order order = orders.get(i);
ps.setLong(1, order.getUserId());
ps.setBigDecimal(2, order.getAmount());
}
@Override
public int getBatchSize() {
return orders.size();
}
});
// 使用rewriteBatchedStatements=true后,性能提升10-100倍
预编译语句缓存:
// 在连接URL中启用
String url = "jdbc:mysql://localhost:3306/mydb?" +
"useServerPrepStmts=true&" +
"cachePrepStmts=true&" +
"prepStmtCacheSize=250&" +
"prepStmtCacheSqlLimit=2048";
四、架构扩展:从单机到分布式
4.1 分库分表(Sharding)
当单表数据量超过千万级,需要分库分表。
垂直分表:
-- 原表
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
status VARCHAR(20),
-- 20+个字段...
description TEXT
);
-- 拆分后
CREATE TABLE orders_base (
id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
status VARCHAR(20)
);
CREATE TABLE orders_desc (
order_id BIGINT PRIMARY KEY,
description TEXT
);
水平分表(按用户ID哈希):
-- 分表规则:user_id % 4
CREATE TABLE orders_0 LIKE orders;
CREATE TABLE orders_1 LIKE orders;
CREATE TABLE orders_2 LIKE orders;
CREATE TABLE orders_3 LIKE orders;
-- 应用层路由
public class ShardingRouter {
public String getTableName(Long userId) {
int index = (int) (userId % 4);
return "orders_" + index;
}
}
// 查询时动态表名
String tableName = shardingRouter.getTableName(userId);
String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";
使用ShardingSphere(推荐):
# sharding.yaml
dataSources:
ds_0: jdbc:mysql://localhost:3306/db0
ds_1: jdbc:mysql://localhost:3306/db1
shardingRule:
tables:
orders:
actualDataNodes: ds_${0..1}.orders_${0..3}
tableStrategy:
inline:
shardingColumn: user_id
algorithmExpression: orders_${user_id % 4}
databaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
bindingTables:
- orders,order_items
4.2 缓存策略
Redis缓存热点数据:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String ORDER_CACHE_PREFIX = "order:";
private static final long CACHE_TTL = 300; // 5分钟
public Order getOrder(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 1. 先查缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 2. 缓存未命中,查数据库
order = orderMapper.selectById(orderId);
if (order != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
}
return order;
}
@CacheEvict(value = "orders", key = "#orderId")
public void updateOrder(Long orderId, Order order) {
orderMapper.updateById(order);
// 删除缓存
redisTemplate.delete(ORDER_CACHE_PREFIX + orderId);
}
}
缓存穿透与雪崩防护:
// 布隆过滤器防止缓存穿透
public class BloomFilterService {
private BloomFilter<Long> bloomFilter = BloomFilter.create(
Funnels.longFunnel(),
1000000, // 预期元素数量
0.01 // 误判率
);
public Order getOrder(Long orderId) {
// 先检查布隆过滤器
if (!bloomFilter.mightContain(orderId)) {
return null; // 肯定不存在
}
// 正常查缓存和数据库
return getOrderFromCache(orderId);
}
public void addOrderId(Long orderId) {
bloomFilter.put(orderId);
}
}
// 缓存预热防止雪崩
@PostConstruct
public void warmUpCache() {
List<Long> hotOrderIds = orderMapper.getHotOrderIds();
for (Long id : hotOrderIds) {
Order order = orderMapper.selectById(id);
redisTemplate.opsForValue().set("order:" + id, order, 600, TimeUnit.SECONDS);
}
}
4.3 异步处理与消息队列
削峰填谷:
// Controller层:快速接收请求,返回受理成功
@PostMapping("/order")
public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {
// 参数校验
validateRequest(request);
// 发送到消息队列
messageQueue.send("order-topic", request);
// 立即返回,不等待数据库写入
return ResponseEntity.accepted().body("订单已受理,订单号:" + request.getOrderId());
}
// 消费者端:异步处理
@KafkaListener(topics = "order-topic")
public void processOrder(OrderRequest request) {
try {
// 1. 扣减库存(悲观锁/乐观锁)
int updated = inventoryMapper.decreaseStock(request.getProductId(), request.getQuantity());
if (updated == 0) {
// 库存不足,发送失败消息
messageQueue.send("order-fail-topic", request);
return;
}
// 2. 创建订单
Order order = createOrderFromRequest(request);
orderMapper.insert(order);
// 3. 发送成功消息
messageQueue.send("order-success-topic", order);
} catch (Exception e) {
// 记录日志,发送重试消息
log.error("订单处理失败: {}", request, e);
messageQueue.send("order-retry-topic", request);
}
}
五、监控与诊断:持续优化的基础
5.1 性能监控指标
慢查询日志:
-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒记录
SET GLOBAL log_queries_not_using_indexes = ON;
-- 查看慢查询统计
SELECT COUNT(*), AVG(query_time) FROM mysql.slow_log WHERE start_time > NOW() - INTERVAL 1 DAY;
实时性能视图:
-- 查看当前正在执行的查询
SELECT * FROM information_schema.processlist WHERE command != 'Sleep';
-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G
-- 查看锁等待
SELECT * FROM information_schema.innodb_lock_waits;
SELECT * FROM information_schema.innodb_locks;
SELECT * FROM information_schema.innodb_trx;
5.2 使用Performance Schema
MySQL 5.6+ 提供了强大的性能监控:
-- 启用性能模式
UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES'
WHERE NAME LIKE 'statement/%';
-- 查看最耗时的SQL
SELECT DIGEST_TEXT, COUNT_STAR, AVG_TIMER_WAIT/1000000000000 AS avg_seconds
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC LIMIT 10;
-- 查看表IO
SELECT TABLE_SCHEMA, TABLE_NAME, SUM_READS, SUM_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_READS + SUM_WRITE DESC LIMIT 10;
5.3 第三方监控工具
Prometheus + Grafana监控:
# mysqld_exporter配置
data_source_name: root:password@(localhost:3306)/
collect.global_status: true
collect.info_schema.innodb_metrics: true
collect.auto_increment.columns: true
collect.info_schema.processlist: true
collect.binlog_size: true
collect.info_schema.tablestats: true
collect.global_variables: true
collect.info_schema.query_response_time: true
collect.info_schema.userstats: true
collect.info_schema.tables: true
collect.perf_schema.tablelocks: true
collect.perf_schema.file_events: true
collect.perf_schema.indexiowaits: true
collect.perf_schema.tableiowaits: true
collect.slave_status: true
collect.slave_hosts: true
关键监控指标:
- QPS/TPS
- 慢查询数量
- 连接数使用率
- InnoDB缓冲池命中率
- 锁等待时间
- 磁盘I/O利用率
六、高级优化技巧
6.1 分区表(Partitioning)
对于超大表,分区可以提升查询性能和管理效率。
-- 按日期范围分区
CREATE TABLE logs (
id BIGINT AUTO_INCREMENT,
log_time DATETIME NOT NULL,
message TEXT,
PRIMARY KEY (id, log_time)
) PARTITION BY RANGE COLUMNS(log_time) (
PARTITION p202401 VALUES LESS THAN ('2024-02-01'),
PARTITION p202402 VALUES LESS THAN ('2024-03-01'),
PARTITION p202403 VALUES LESS THAN ('2024-04-01'),
PARTITION p_max VALUES LESS THAN (MAXVALUE)
);
-- 查询时自动分区裁剪
SELECT * FROM logs WHERE log_time >= '2024-03-01'; -- 只扫描p202403和p_max
6.2 全文索引
对于文本搜索场景,使用全文索引替代LIKE。
-- 创建全文索引
ALTER TABLE articles ADD FULLTEXT INDEX ft_title_content (title, content);
-- 自然语言搜索
SELECT * FROM articles
WHERE MATCH(title, content) AGAINST('database optimization' IN NATURAL LANGUAGE MODE);
-- 布尔模式
SELECT * FROM articles
WHERE MATCH(title, content) AGAINST('+database -optimization' IN BOOLEAN MODE);
6.3 虚拟列与函数索引
MySQL 5.7+ 支持虚拟列,可以创建基于表达式的索引。
-- 创建虚拟列
ALTER TABLE orders ADD COLUMN order_year YEAR AS (YEAR(created_at)) VIRTUAL;
-- 在虚拟列上创建索引
CREATE INDEX idx_order_year ON orders(order_year);
-- 查询优化
SELECT * FROM orders WHERE order_year = 2024; -- 使用索引
七、实战案例:秒杀系统优化
7.1 问题分析
秒杀场景特点:
- 瞬时并发:QPS可达10万+
- 库存有限:竞争激烈
- 读多写少:大部分请求是查询
7.2 优化方案
1. 前端拦截:
// 限流:按钮置灰,防止重复提交
let isSubmitting = false;
function seckill() {
if (isSubmitting) return;
isSubmitting = true;
// 倒计时控制
if (Date.now() < seckillStartTime) {
alert("未到开始时间");
return;
}
// 提交请求
fetch('/api/seckill', { method: 'POST' })
.then(() => { isSubmitting = false; })
.catch(() => { isSubmitting = false; });
}
2. Redis预扣库存:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:orders:";
public boolean seckill(Long userId, Long productId) {
String stockKey = STOCK_KEY + productId;
String orderKey = ORDER_KEY + productId + ":" + userId;
// 1. 检查是否已购买
if (redisTemplate.hasKey(orderKey)) {
return false; // 已购买
}
// 2. 原子扣减库存(Lua脚本保证原子性)
String luaScript =
"if redis.call('get', KEYS[1]) >= ARGV[1] then " +
" redis.call('decrby', KEYS[1], ARGV[1]); " +
" return 1; " +
"else " +
" return 0; " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(stockKey),
"1"
);
if (result == 0) {
return false; // 库存不足
}
// 3. 记录购买标记(5分钟过期)
redisTemplate.opsForValue().set(orderKey, "1", 300, TimeUnit.SECONDS);
// 4. 发送消息到MQ异步创建订单
messageQueue.send("seckill-order", new SeckillRequest(userId, productId));
return true;
}
}
3. 数据库最终一致性:
@KafkaListener(topics = "seckill-order")
public void createOrder(SeckillRequest request) {
try {
// 1. 检查数据库库存(防止超卖)
Integer stock = inventoryMapper.selectStock(request.getProductId());
if (stock <= 0) {
// 回滚Redis库存
redisTemplate.opsForValue().increment("seckill:stock:" + request.getProductId(), 1);
return;
}
// 2. 创建订单(使用乐观锁)
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setStatus("CREATED");
int inserted = orderMapper.insert(order);
if (inserted > 0) {
// 3. 扣减数据库库存
int updated = inventoryMapper.decreaseStock(request.getProductId(), 1);
if (updated == 0) {
// 库存不足,回滚
throw new RuntimeException("库存不足");
}
}
} catch (Exception e) {
// 记录失败,人工补偿
log.error("秒杀订单创建失败", e);
}
}
4. 数据库层优化:
-- 库存表使用InnoDB,开启行级锁
CREATE TABLE inventory (
product_id BIGINT PRIMARY KEY,
stock INT NOT NULL,
version INT DEFAULT 0, -- 乐观锁版本
INDEX idx_product (product_id)
) ENGINE=InnoDB;
-- 扣减库存SQL(乐观锁)
UPDATE inventory
SET stock = stock - 1, version = version + 1
WHERE product_id = ? AND version = ? AND stock > 0;
八、总结与最佳实践
8.1 优化优先级
- SQL与索引优化:成本最低,效果最明显
- 配置优化:调整MySQL参数,释放硬件潜力
- 缓存策略:减少数据库访问
- 架构扩展:读写分离、分库分表
- 异步处理:削峰填谷,提升吞吐量
8.2 持续优化原则
- 监控驱动:基于数据做优化,而不是猜测
- 渐进式:每次只改一个变量,观察效果
- 压测验证:使用sysbench、JMeter等工具验证优化效果
- 回滚预案:任何变更都要有回滚方案
8.3 高并发黄金法则
- 能不查库就不查:缓存、静态化、CDN
- 能异步就不同步:消息队列、任务调度
- 能内存计算就不磁盘:Redis、本地缓存
- 能并行就不串行:线程池、批量处理
- 能限流就不硬抗:熔断、降级、限流
通过以上策略的综合运用,MySQL完全可以应对百万级QPS的高并发挑战。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。
