引言:理解高并发场景下的数据库挑战
在现代互联网应用中,高并发访问已经成为常态。无论是电商平台的秒杀活动,还是社交媒体的热点事件,数据库都面临着前所未有的压力。MySQL作为最流行的关系型数据库,在高并发场景下容易出现性能瓶颈、死锁甚至崩溃。本文将从索引优化、查询优化、事务控制、架构设计等多个维度,详细探讨如何构建高可用的MySQL系统,避免数据库崩溃与死锁问题。
一、索引优化:高并发的第一道防线
1.1 索引的基本原理与选择策略
索引是MySQL性能优化的核心。在高并发场景下,合理的索引可以将查询性能提升几个数量级。理解B+树索引的结构是优化的基础。
主键索引与二级索引的区别
- 主键索引(聚簇索引):叶子节点存储整行数据
- 二级索引(非聚簇索引):叶子节点存储主键值
-- 创建用户表示例
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_status_created (status, created_at)
) ENGINE=InnoDB;
-- 查看索引使用情况
SHOW INDEX FROM users;
1.2 覆盖索引与回表优化
在高并发查询中,回表操作是性能杀手。覆盖索引可以避免回表,直接从索引中获取所需数据。
覆盖索引示例:
-- 假设需要查询用户ID和用户名
SELECT id, username FROM users WHERE username = 'john_doe';
-- 如果只有idx_username索引,需要回表获取id
-- 优化方案:创建复合索引
ALTER TABLE users ADD INDEX idx_username_id (username, id);
-- 或者查询时只查询索引包含的列
SELECT username FROM users WHERE username = 'john_doe'; -- 使用idx_username即可
1.3 索引失效的常见场景与避免方法
在高并发环境下,索引失效会导致全表扫描,瞬间拖垮数据库。
常见索引失效场景:
- 函数操作:
WHERE YEAR(created_at) = 2024 - 隐式类型转换:
WHERE phone = 13800138000(phone是varchar) - 前缀模糊查询:
WHERE username LIKE '%john' - OR条件:
WHERE status = 1 OR age > 18(部分版本优化) - 范围查询后的索引失效:
WHERE age > 18 AND score > 90
优化方案:
-- 错误示例
SELECT * FROM users WHERE YEAR(created_at) = 2024;
-- 正确示例:使用范围查询
SELECT * FROM users WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01';
-- 错误示例:隐式类型转换
SELECT * FROM orders WHERE order_no = 123456; -- order_no是varchar
-- 正确示例
SELECT * FROM orders WHERE order_no = '123456';
1.4 高并发下的索引维护成本
索引不是越多越好。每个索引都会增加写操作的开销(INSERT/UPDATE/DELETE需要维护索引)。
索引维护成本分析:
- 单个索引的写操作成本约为原数据的10-15%
- 5个索引的表,写操作成本增加50-75%
- 高并发写场景下,过多索引会导致严重的性能问题
优化建议:
-- 监控索引使用频率
SELECT * FROM sys.schema_unused_indexes;
-- 删除未使用的索引
DROP INDEX idx_unused ON users;
-- 对于低频查询,考虑使用联合索引替代多个单列索引
-- 原:INDEX idx_a, INDEX idx_b
-- 优化:INDEX idx_a_b (a, b)
二、查询优化:减少数据库负载
2.1 避免SELECT * 和大字段查询
在高并发场景下,网络传输和内存消耗也是重要考虑因素。
优化示例:
-- 错误:查询大字段
SELECT * FROM articles WHERE id = 123; -- 包含content大文本字段
-- 正确:按需查询
SELECT id, title, summary, author FROM articles WHERE id = 123;
-- 错误:查询不需要的列
SELECT * FROM users WHERE id = 1;
-- 正确:只查询需要的列
SELECT id, username, email FROM users WHERE id = 1;
2.2 分页查询优化
传统LIMIT分页在深度分页时性能极差。
问题示例:
-- 当page=1000时,性能极差
SELECT * FROM orders ORDER BY id LIMIT 1000000, 20;
优化方案1:延迟关联
-- 使用子查询先定位主键,再关联获取数据
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders ORDER BY id LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;
优化方案2:位置记录法
-- 记录上一页最后一条记录的id
-- 第一页
SELECT * FROM orders WHERE id > 0 ORDER BY id LIMIT 20;
-- 第二页(假设上一页最后id=100)
SELECT * FROM orders WHERE id > 100 ORDER BY id LIMIT 20;
2.3 JOIN优化策略
JOIN是高并发场景下的性能瓶颈,需要谨慎使用。
JOIN优化原则:
- 小表驱动大表
- ON字段必须有索引
- 避免笛卡尔积
优化示例:
-- 错误:大表驱动小表
SELECT u.*, o.*
FROM large_table l
JOIN small_table s ON l.id = s.large_id
WHERE l.status = 1;
-- 正确:小表驱动大表
SELECT s.*, l.*
FROM small_table s
JOIN large_table l ON s.large_id = l.id
WHERE l.status = 1;
-- 确保JOIN字段有索引
ALTER TABLE small_table ADD INDEX idx_large_id (large_id);
ALTER TABLE large_table ADD INDEX idx_id (id);
2.4 批量操作与减少交互
高并发下,减少数据库交互次数至关重要。
批量插入优化:
-- 错误:逐条插入
INSERT INTO logs (user_id, action, created_at) VALUES (1, 'login', NOW());
INSERT INTO logs (user_id, action, created_at) VALUES (2, 'login', NOW());
INSERT INTO logs (user_id, action, created_at) VALUES (3, 'login', NOW());
-- 正确:批量插入
INSERT INTO logs (user_id, action, created_at) VALUES
(1, 'login', NOW()),
(2, 'login', NOW()),
(3, 'login', NOW());
-- 批量更新
UPDATE users SET status = 2 WHERE id IN (1, 2, 3, 4, 5);
三、事务与锁机制:避免死锁的核心
3.1 MySQL锁机制详解
理解MySQL的锁机制是避免死锁的关键。
锁的类型:
- 共享锁(S锁):读锁,多个事务可以同时持有
- 排他锁(X锁):写锁,只有一个事务可以持有
- 意向锁:表级锁,表示事务将在某行加S锁或X锁
锁的粒度:
- 行锁:锁定单行数据
- 间隙锁:锁定索引范围
- 临键锁:行锁+间隙锁(InnoDB默认)
3.2 死锁的产生与检测
死锁的四个必要条件:
- 互斥条件:资源不能被共享
- 请求与保持条件:已获得资源的进程可以请求新资源
- 不剥夺条件:已分配的资源不能被强制剥夺
- 循环等待条件:进程之间形成环形等待链
死锁示例:
-- 事务A
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 等待事务B释放id=2的锁
-- 事务B
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;
-- 等待事务A释放id=1的锁
-- 结果:死锁!
3.3 避免死锁的最佳实践
1. 固定加锁顺序
-- 错误:不同事务加锁顺序不一致
-- 事务A:UPDATE table SET ... WHERE id IN (1, 2, 3)
-- 事务B:UPDATE table SET ... WHERE id IN (3, 2, 1)
-- 正确:统一按主键排序
-- 所有事务都按id升序处理
UPDATE accounts SET balance = balance - 100 WHERE id IN (1, 2, 3) ORDER BY id;
2. 使用SELECT FOR UPDATE显式加锁
-- 事务A
START TRANSACTION;
-- 先查询并锁定记录
SELECT * FROM inventory WHERE product_id = 100 FOR UPDATE;
-- 执行更新
UPDATE inventory SET stock = stock - 1 WHERE product_id = 100;
COMMIT;
-- 事务B
START TRANSACTION;
-- 同样先锁定记录(会等待事务A释放)
SELECT * FROM inventory WHERE product_id = 100 FOR UPDATE;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 100;
COMMIT;
3. 优化事务粒度
-- 错误:大事务
START TRANSACTION;
UPDATE users SET status = 2 WHERE id IN (SELECT id FROM users WHERE age > 18);
UPDATE orders SET status = 'processed' WHERE user_id IN (SELECT id FROM users WHERE age > 18);
-- 可能锁住大量行,增加死锁概率
-- 正确:小事务
-- 分批处理
START TRANSACTION;
UPDATE users SET status = 2 WHERE id BETWEEN 1 AND 1000;
COMMIT;
START TRANSACTION;
UPDATE orders SET status = 'processed' WHERE user_id BETWEEN 1 AND 1000;
COMMIT;
4. 设置合理的超时时间
-- 设置锁等待超时(默认50秒)
SET SESSION innodb_lock_wait_timeout = 10;
-- 设置事务超时
SET SESSION max_execution_time = 5000;
3.4 死锁监控与诊断
查看死锁日志:
-- 开启死锁监控
SET GLOBAL innodb_print_all_deadlocks = ON;
-- 查看最近死锁信息
SHOW ENGINE INNODB STATUS\G
死锁信息分析:
LATEST DETECTED DEADLOCK
2024-01-15 10:30:45 0x7f0a1b2c3d4e
*** (1) TRANSACTION:
TRANSACTION 12345, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s)
MySQL thread id 12345, OS thread handle 0x7f0a1b2c3d4e, query id 100
UPDATE accounts SET balance = balance - 100 WHERE id = 1
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12345 lock_mode X locks rec but not gap waiting
*** (2) TRANSACTION:
TRANSACTION 12346, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
3 lock struct(s), heap size 1136, 2 row lock(s)
MySQL thread id 12346, OS thread handle 0x7f0a1b2c3d4f, query id 101
UPDATE accounts SET balance = balance - 100 WHERE id = 2
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12346 lock_mode X locks rec but not gap
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12346 lock_mode X locks rec but not gap waiting
*** WE ROLL BACK TRANSACTION (2)
四、架构优化:从单库到分布式
4.1 读写分离架构
读写分离是提升并发能力的基础架构优化。
架构图:
应用层
↓
负载均衡
↓
主库(写) ←→ 从库(读)
↓
数据同步(binlog)
实现方案:
// Spring Boot配置多数据源
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
// 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
// 使用示例
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
// 写操作使用主库
@Transactional
public User createUser(User user) {
DataSourceContextHolder.setDataSource("master");
return userRepository.save(user);
}
// 读操作使用从库
public User getUser(Long id) {
DataSourceContextHolder.setDataSource("slave");
return userRepository.findById(id).orElse(null);
}
}
4.2 分库分表策略
当单库无法支撑时,需要分库分表。
分片策略:
- 垂直分库:按业务模块拆分
- 水平分表:按数据特征拆分
分片键选择原则:
- 数据分布均匀
- 查询命中率高
- 避免跨片查询
分库分表示例:
-- 用户表按user_id分16个表
-- user_0 到 user_15
-- 分片函数(Java实现)
public class ShardingUtil {
private static final int SHARD_COUNT = 16;
public static String getTableName(Long userId) {
int shardIndex = (int) (userId % SHARD_COUNT);
return "user_" + shardIndex;
}
public static String getDatabaseName(Long userId) {
int dbIndex = (int) (userId % 8); // 8个数据库
return "db_" + dbIndex;
}
}
// 使用示例
public class UserRepository {
public void saveUser(User user) {
String dbName = ShardingUtil.getDatabaseName(user.getId());
String tableName = ShardingUtil.getTableName(user.getId());
// 动态切换数据源和表名
DataSourceContextHolder.setDataSource(dbName);
String sql = String.format("INSERT INTO %s (id, username, email) VALUES (?, ?, ?)",
tableName);
jdbcTemplate.update(sql, user.getId(), user.getUsername(), user.getEmail());
}
}
4.3 分布式事务解决方案
分库分表后,事务成为挑战。
解决方案:
1. 两阶段提交(2PC)
// 伪代码示例
@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
try {
// 阶段1:准备
boolean fromSuccess = prepareDeduct(fromId, amount);
boolean toSuccess = prepareAdd(toId, amount);
if (fromSuccess && toSuccess) {
// 阶段2:提交
commitDeduct(fromId);
commitAdd(toId);
} else {
// 回滚
rollbackDeduct(fromId);
rollbackAdd(toId);
}
} catch (Exception e) {
// 超时处理
handleTimeout();
}
}
2. TCC模式(Try-Confirm-Cancel)
// Try阶段:资源检查和预留
public boolean tryDeduct(Long userId, BigDecimal amount) {
String sql = "UPDATE account SET frozen = frozen + ? WHERE id = ? AND balance >= ?";
return jdbcTemplate.update(sql, amount, userId, amount) > 0;
}
// Confirm阶段:确认扣款
public boolean confirmDeduct(Long userId, BigDecimal amount) {
String sql = "UPDATE account SET balance = balance - ?, frozen = frozen - ? WHERE id = ?";
return jdbcTemplate.update(sql, amount, amount, userId) > 0;
}
// Cancel阶段:取消预留
public boolean cancelDeduct(Long userId, BigDecimal amount) {
String sql = "UPDATE account SET frozen = frozen - ? WHERE id = ?";
return jdbcTemplate.update(sql, amount, userId) > 0;
}
3. 消息队列最终一致性
// 发送方
@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// 1. 扣减余额(本地事务)
deductBalance(fromId, amount);
// 2. 发送消息(同事务)
sendTransferMessage(fromId, toId, amount);
}
// 接收方
@RocketMQMessageListener
public void handleTransferMessage(TransferMessage message) {
try {
// 幂等性检查
if (isProcessed(message.getId())) {
return;
}
// 增加余额
addBalance(message.getToId(), message.getAmount());
// 记录处理
markProcessed(message.getId());
} catch (Exception e) {
// 重试或死信队列
throw new RuntimeException(e);
}
}
4.4 缓存策略:减轻数据库压力
缓存设计原则:
- 缓存穿透:查询不存在的数据
- 缓存击穿:热点key过期
- 缓存雪崩:大量key同时过期
解决方案:
// 缓存穿透解决方案:布隆过滤器
public class BloomFilterUtil {
private static final int EXPECTED_INSERTIONS = 1000000;
private static final double FALSE_POSITIVE_RATE = 0.01;
private BloomFilter<Long> bloomFilter;
public BloomFilterUtil() {
this.bloomFilter = BloomFilter.create(
Funnels.longFunnel(),
EXPECTED_INSERTIONS,
FALSE_POSITIVE_RATE
);
}
public boolean mightContain(Long id) {
return bloomFilter.mightContain(id);
}
public void put(Long id) {
bloomFilter.put(id);
}
}
// 缓存击穿解决方案:互斥锁
public User getUserWithCache(Long id) {
String key = "user:" + id;
// 1. 先查缓存
User user = redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 2. 获取分布式锁
String lockKey = "lock:" + key;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 双重检查
user = redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 查询数据库
user = userRepository.findById(id).orElse(null);
// 写入缓存
if (user != null) {
redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
} else {
// 缓存空值,防止穿透
redisTemplate.opsForValue().set(key, new User(), 5, TimeUnit.MINUTES);
}
return user;
} finally {
redisTemplate.delete(lockKey);
}
} else {
// 等待并重试
try {
Thread.sleep(50);
return getUserWithCache(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
// 缓存雪崩解决方案:随机过期时间
public void setWithRandomExpire(String key, Object value, int baseExpireSeconds) {
// 随机时间:基础时间 + 随机偏移(0-300秒)
int randomExpire = baseExpireSeconds + new Random().nextInt(300);
redisTemplate.opsForValue().set(key, value, randomExpire, TimeUnit.SECONDS);
}
五、高并发配置调优
5.1 MySQL核心参数配置
连接相关参数:
[mysqld]
# 最大连接数
max_connections = 2000
# 每个连接的最大请求数
max_connections_per_user = 1000
# 连接超时时间
connect_timeout = 10
# 交互式超时时间
interactive_timeout = 300
# 非交互式超时时间
wait_timeout = 300
InnoDB引擎参数:
# 缓冲池大小(建议物理内存的50-70%)
innodb_buffer_pool_size = 16G
# 缓冲池实例数(建议与CPU核数相同)
innodb_buffer_pool_instances = 8
# 日志文件大小
innodb_log_file_size = 2G
# 刷新日志策略
innodb_flush_log_at_trx_commit = 1 # 1=每次提交刷盘,0=每秒刷盘
# 最大并发线程数
innodb_thread_concurrency = 32
# 启用死锁检测
innodb_deadlock_detect = ON
# 锁等待超时
innodb_lock_wait_timeout = 50
5.2 操作系统参数调优
文件描述符限制:
# 查看当前限制
ulimit -n
# 临时修改
ulimit -n 65535
# 永久修改(/etc/security/limits.conf)
* soft nofile 65535
* hard nofile 65535
TCP参数优化:
# 增加端口范围
sysctl -w net.ipv4.ip_local_port_range="1024 65535"
# 增大TCP队列
sysctl -w net.core.somaxconn=4096
# 开启TIME_WAIT重用
sysctl -w net.ipv4.tcp_tw_reuse=1
# 减少TIME_WAIT超时
sysctl -w net.ipv4.tcp_fin_timeout=30
5.3 监控与告警
关键监控指标:
- QPS/TPS:每秒查询/事务数
- 连接数:当前连接数 vs 最大连接数
- 慢查询:执行时间超过阈值的查询
- 锁等待:锁等待时间和数量
- 缓存命中率:InnoDB缓冲池命中率
监控SQL示例:
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';
-- 查看InnoDB缓冲池命中率
SELECT
(1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM performance_schema.global_status
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';
-- 查看锁等待
SELECT * FROM performance_schema.data_lock_waits;
-- 查看当前运行的事务
SELECT * FROM information_schema.INNODB_TRX;
六、实战案例:秒杀系统设计
6.1 秒杀业务特点
- 瞬时高并发:百万级QPS
- 库存有限:超卖问题严重
- 逻辑简单:减库存+下单
6.2 架构设计
分层架构:
客户端 → CDN → Nginx → 应用服务 → 缓存 → 消息队列 → 数据库
核心代码实现:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderRepository orderRepository;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:order:";
/**
* 秒杀流程
*/
public SeckillResult seckill(Long userId, Long productId) {
// 1. 参数校验
if (userId == null || productId == null) {
return SeckillResult.error("参数错误");
}
// 2. 预扣减库存(Redis)
String stockKey = STOCK_KEY + productId;
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
redisTemplate.opsForValue().increment(stockKey); // 回滚
return SeckillResult.error("库存不足");
}
// 3. 检查是否已购买(防重复)
String orderKey = ORDER_KEY + productId + ":" + userId;
Boolean existed = redisTemplate.hasKey(orderKey);
if (Boolean.TRUE.equals(existed)) {
return SeckillResult.error("您已参与过秒杀");
}
// 4. 发送消息到MQ(异步下单)
SeckillMessage message = new SeckillMessage();
message.setUserId(userId);
message.setProductId(productId);
message.setTimestamp(System.currentTimeMillis());
rocketMQTemplate.sendOneWay("seckill-topic", message);
// 5. 标记已参与
redisTemplate.opsForValue().set(orderKey, 1, 30, TimeUnit.MINUTES);
return SeckillResult.success("秒杀成功,正在处理订单");
}
/**
* MQ消费端:真正创建订单
*/
@RocketMQMessageListener(topic = "seckill-topic", consumerGroup = "seckill-group")
public void createOrder(SeckillMessage message) {
try {
// 1. 幂等性检查
if (orderRepository.existsByUserIdAndProductId(message.getUserId(), message.getProductId())) {
return;
}
// 2. 创建订单
Order order = new Order();
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setStatus(OrderStatus.PENDING);
order.setCreateTime(new Date());
orderRepository.save(order);
// 3. 记录日志
log.info("秒杀订单创建成功: {}", order.getId());
} catch (Exception e) {
log.error("创建订单失败", e);
// 失败补偿:恢复库存
String stockKey = STOCK_KEY + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
}
}
}
6.3 数据库表设计
-- 秒杀商品表
CREATE TABLE seckill_products (
id BIGINT PRIMARY KEY,
name VARCHAR(200) NOT NULL,
original_price DECIMAL(10,2) NOT NULL,
seckill_price DECIMAL(10,2) NOT NULL,
stock INT NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NOT NULL,
version INT DEFAULT 0, -- 乐观锁
INDEX idx_time (start_time, end_time)
) ENGINE=InnoDB;
-- 秒杀订单表(分表)
CREATE TABLE seckill_orders_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status TINYINT NOT NULL,
create_time DATETIME NOT NULL,
INDEX idx_user (user_id),
INDEX idx_product (product_id)
) ENGINE=InnoDB;
-- 库存流水表(用于对账)
CREATE TABLE stock_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
product_id BIGINT NOT NULL,
change_amount INT NOT NULL,
type TINYINT NOT NULL, -- 1:扣减, 2:回滚
ref_id VARCHAR(100), -- 关联业务ID
create_time DATETIME NOT NULL,
INDEX idx_product (product_id)
) ENGINE=InnoDB;
6.4 防刷与限流
限流实现:
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 令牌桶算法
*/
public boolean tryAcquire(String key, int permits, int maxPermits, int refillRate) {
String luaScript =
"local current = redis.call('GET', KEYS[1]) " +
"if current == false then " +
" current = tonumber(ARGV[1]) " +
"else " +
" current = tonumber(current) " +
"end " +
"local capacity = tonumber(ARGV[1]) " +
"local rate = tonumber(ARGV[2]) " +
"local now = tonumber(ARGV[3]) " +
"local lastRefill = tonumber(redis.call('GET', KEYS[1] .. ':last') or now) " +
"local elapsed = now - lastRefill " +
"local refill = math.floor(elapsed * rate / 1000) " +
"current = math.min(capacity, current + refill) " +
"if current >= tonumber(ARGV[4]) then " +
" current = current - tonumber(ARGV[4]) " +
" redis.call('SET', KEYS[1], current) " +
" redis.call('SET', KEYS[1] .. ':last', now) " +
" redis.call('EXPIRE', KEYS[1], 60) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(key),
maxPermits, refillRate, System.currentTimeMillis(), permits
);
return result != null && result == 1;
}
}
// 使用示例
@RestController
public class SeckillController {
@Autowired
private RateLimiter rateLimiter;
@Autowired
private SeckillService seckillService;
@PostMapping("/seckill")
public SeckillResult seckill(@RequestBody SeckillRequest request) {
// 限流:每个用户每秒最多3次请求
String rateKey = "rate_limit:user:" + request.getUserId();
if (!rateLimiter.tryAcquire(rateKey, 1, 3, 3)) {
return SeckillResult.error("请求过于频繁,请稍后再试");
}
// 业务处理
return seckillService.seckill(request.getUserId(), request.getProductId());
}
}
七、总结与最佳实践清单
7.1 索引优化检查清单
- [ ] 所有查询都使用索引
- [ ] 避免索引失效的写法
- [ ] 定期清理无用索引
- [ ] 复合索引遵循最左前缀原则
- [ ] 高频查询使用覆盖索引
7.2 查询优化检查清单
- [ ] 禁止SELECT *
- [ ] 分页使用延迟关联或位置记录
- [ ] 大表JOIN谨慎使用
- [ ] 批量操作替代循环单条
- [ ] 合理使用EXISTS和IN
7.3 事务与锁检查清单
- [ ] 事务粒度最小化
- [ ] 固定加锁顺序
- [ ] 使用SELECT FOR UPDATE
- [ ] 设置合理超时时间
- [ ] 监控死锁日志
7.4 架构优化检查清单
- [ ] 实现读写分离
- [ ] 合理使用缓存
- [ ] 消息队列解耦
- [ ] 分库分表规划
- [ ] 监控告警体系
7.5 配置调优检查清单
- [ ] 缓冲池大小合理
- [ ] 连接数配置恰当
- [ ] 日志文件大小合适
- [ ] 操作系统参数优化
- [ ] 监控指标全覆盖
结语
MySQL高并发优化是一个系统工程,需要从索引、查询、事务、架构等多个维度综合考虑。没有银弹,只有最适合业务场景的方案。建议从监控入手,找到性能瓶颈,然后针对性优化。记住:先测量,再优化。
在实际生产环境中,还需要结合业务特点进行定制化优化。希望本文能为您的数据库优化之路提供有价值的参考。
