引言
在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交网络还是金融交易系统,都面临着海量用户同时访问数据库的挑战。MySQL作为最流行的开源关系型数据库,在高并发环境下容易出现性能瓶颈,如连接数耗尽、锁竞争、I/O瓶颈等问题。本文将从架构优化、数据库设计、缓存机制等多个维度,系统性地探讨MySQL高并发处理策略,并提供实战代码示例。
一、架构层面的优化策略
1.1 读写分离架构
读写分离是提升MySQL并发能力的基础架构优化。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库压力。
实现方式:
- 主库(Master):处理所有写操作(INSERT/UPDATE/DELETE)
- 从库(Slave):处理所有读操作(SELECT)
- 中间件:使用ProxySQL、MyCat或ShardingSphere等中间件进行路由
代码示例(使用Spring Boot + MyBatis实现读写分离):
// 1. 配置多数据源
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
// 2. 自定义路由数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
// 3. 数据源上下文管理器
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get() != null ? contextHolder.get() : "master";
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
// 4. AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
@Before("execution(* com.example.service.*.*(..))")
public void before(JoinPoint joinPoint) {
String methodName = joinPoint.getSignature().getName();
if (methodName.startsWith("get") || methodName.startsWith("list") ||
methodName.startsWith("query") || methodName.startsWith("find")) {
DataSourceContextHolder.setDataSourceType("slave");
} else {
DataSourceContextHolder.setDataSourceType("master");
}
}
@After("execution(* com.example.service.*.*(..))")
public void after() {
DataSourceContextHolder.clearDataSourceType();
}
}
注意事项:
- 主从同步延迟可能导致数据不一致,对于强一致性要求的场景需谨慎使用
- 可以结合半同步复制降低延迟
- 读写分离中间件可以提供更灵活的路由策略
1.2 分库分表策略
当单表数据量超过千万级别时,需要考虑分库分表。分库分表分为垂直拆分和水平拆分。
垂直拆分:
- 按业务模块拆分:用户表、订单表、商品表分到不同数据库
- 按列拆分:将大字段(如文章内容)拆分到单独的表
水平拆分:
- 按范围拆分:按时间范围(如按月分表)
- 按哈希拆分:按用户ID取模分表
代码示例(使用ShardingSphere实现分表):
# sharding.yaml 配置文件
dataSources:
ds_0: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_0
username: root
password: root
ds_1: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_1
username: root
password: root
shardingRule:
tables:
order:
actualDataNodes: ds_${0..1}.order_${0..3}
tableStrategy:
standard:
shardingColumn: order_id
preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
defaultDatabaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.DatabaseShardingAlgorithm
// 分表算法实现
public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long orderId = shardingValue.getValue();
// 根据订单ID取模分表
int tableIndex = (int) (orderId % 4);
String tableName = "order_" + tableIndex;
// 检查目标表是否存在
if (availableTargetNames.contains(tableName)) {
return tableName;
}
throw new IllegalArgumentException("无法找到分表:" + tableName);
}
}
// 分库算法实现
public class DatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
// 根据用户ID取模分库
int dbIndex = (int) (userId % 2);
String dbName = "ds_" + dbIndex;
if (availableTargetNames.contains(dbName)) {
return dbName;
}
throw new IllegalArgumentException("无法找到分库:" + dbName);
}
}
分库分表后的查询优化:
- 避免跨库JOIN,尽量在业务层组装数据
- 使用全局表(如字典表)进行冗余存储
- 分布式事务处理(使用Seata等框架)
1.3 连接池优化
连接池是数据库访问的入口,合理的配置能显著提升并发能力。
常用连接池对比:
- HikariCP:性能最佳,推荐使用
- Druid:功能丰富,监控完善
- C3P0:老牌连接池,性能一般
HikariCP配置示例:
spring:
datasource:
hikari:
# 连接池大小
maximum-pool-size: 20
minimum-idle: 5
# 连接超时
connection-timeout: 30000
# 空闲连接存活时间
idle-timeout: 600000
# 连接最大生命周期
max-lifetime: 1800000
# 连接测试查询
connection-test-query: SELECT 1
# 连接预热
initialization-fail-timeout: 1
# 连接泄漏检测
leak-detection-threshold: 60000
连接池大小计算公式:
连接数 = (核心数 * 2) + 有效磁盘数
对于高并发读场景,可以适当增加连接数;对于写密集型场景,需要控制连接数避免锁竞争。
二、数据库设计优化
2.1 索引优化策略
索引是提升查询性能的关键。在高并发场景下,索引设计需要特别谨慎。
索引设计原则:
- 覆盖索引:查询字段都在索引中,避免回表
- 最左前缀原则:复合索引的使用顺序
- 避免冗余索引和重复索引
代码示例(索引优化实战):
-- 1. 原始查询(性能差)
SELECT user_id, username, email
FROM users
WHERE age > 25 AND city = 'Beijing'
ORDER BY create_time DESC
LIMIT 10;
-- 2. 优化后的索引设计
-- 创建复合索引(注意顺序:等值查询在前,范围查询在后)
CREATE INDEX idx_age_city_create_time ON users(age, city, create_time);
-- 3. 覆盖索引示例
-- 查询只需要索引字段,无需回表
SELECT user_id, age, city
FROM users
WHERE age > 25 AND city = 'Beijing';
-- 4. 索引提示(强制使用特定索引)
SELECT /*+ INDEX(users idx_age_city_create_time) */ *
FROM users
WHERE age > 25 AND city = 'Beijing';
-- 5. 索引下推优化(MySQL 5.6+)
-- 索引下推可以在存储引擎层过滤数据,减少回表次数
EXPLAIN SELECT * FROM users WHERE age > 25 AND city = 'Beijing';
-- 查看Extra列是否包含"Using index condition"
索引监控与维护:
-- 查看索引使用情况
SELECT * FROM sys.schema_unused_indexes;
-- 查看索引大小
SELECT
table_name,
index_name,
ROUND(index_length / 1024 / 1024, 2) AS index_size_mb
FROM information_schema.tables
WHERE table_schema = 'your_database';
-- 重建索引(避免在业务高峰期执行)
ALTER TABLE users DROP INDEX idx_old, ADD INDEX idx_new(column1, column2);
2.2 表结构设计优化
数据类型选择:
- 使用最小的数据类型:TINYINT代替INT,VARCHAR(100)代替VARCHAR(255)
- 避免使用TEXT/BLOB类型,大字段单独存储
- 使用ENUM代替字符串类型
表结构优化示例:
-- 优化前(数据类型过大)
CREATE TABLE user_info (
id INT PRIMARY KEY,
username VARCHAR(255),
age INT,
status TINYINT,
create_time DATETIME,
description TEXT
);
-- 优化后(使用合适的数据类型)
CREATE TABLE user_info (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(64) NOT NULL,
age TINYINT UNSIGNED,
status TINYINT DEFAULT 0,
create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
description VARCHAR(1000),
INDEX idx_username (username),
INDEX idx_age_status (age, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 大字段单独存储
CREATE TABLE user_description (
user_id BIGINT PRIMARY KEY,
description TEXT,
FOREIGN KEY (user_id) REFERENCES user_info(id)
);
2.3 事务优化
高并发场景下,事务设计直接影响系统性能。
事务优化策略:
- 缩短事务时间:减少锁持有时间
- 合理设置隔离级别:根据业务需求选择
- 避免长事务
代码示例(事务优化):
// 1. 优化前(长事务)
@Transactional
public void processOrder(Long orderId) {
// 查询订单
Order order = orderMapper.selectById(orderId);
// 模拟耗时操作(如调用外部API)
Thread.sleep(5000); // 5秒
// 更新订单状态
order.setStatus(2);
orderMapper.updateById(order);
}
// 2. 优化后(拆分事务)
public void processOrder(Long orderId) {
// 1. 查询订单(非事务操作)
Order order = orderMapper.selectById(orderId);
// 2. 调用外部API(非事务操作)
callExternalAPI(order);
// 3. 更新订单状态(短事务)
updateOrderStatus(orderId);
}
@Transactional
public void updateOrderStatus(Long orderId) {
Order order = orderMapper.selectById(orderId);
order.setStatus(2);
orderMapper.updateById(order);
}
// 3. 使用事务传播行为控制
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
// 外层事务,内层独立事务
@Transactional(propagation = Propagation.REQUIRED)
public void createOrderWithPayment(Order order) {
// 创建订单(主事务)
orderMapper.insert(order);
// 调用支付服务(独立事务,即使支付失败也不影响订单创建)
processPayment(order);
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void processPayment(Order order) {
// 支付逻辑
paymentService.pay(order);
}
}
三、缓存机制实战
3.1 多级缓存架构
在高并发场景下,缓存是减轻数据库压力的最有效手段。
缓存架构设计:
客户端 → CDN → 应用缓存(Redis) → 数据库缓存(MySQL Buffer Pool)
代码示例(Spring Boot + Redis缓存):
// 1. 配置Redis缓存
@Configuration
@EnableCaching
public class RedisCacheConfig {
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}
// 2. 缓存注解使用
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
// 缓存查询结果
@Cacheable(value = "users", key = "#userId")
public User getUserById(Long userId) {
return userMapper.selectById(userId);
}
// 更新缓存
@CachePut(value = "users", key = "#user.id")
public User updateUser(User user) {
userMapper.updateById(user);
return user;
}
// 删除缓存
@CacheEvict(value = "users", key = "#userId")
public void deleteUser(Long userId) {
userMapper.deleteById(userId);
}
// 复杂缓存逻辑
@Cacheable(value = "users", key = "#userId", unless = "#result == null")
public User getUserWithCache(Long userId) {
// 先查缓存
User user = redisTemplate.opsForValue().get("user:" + userId);
if (user != null) {
return user;
}
// 缓存未命中,查数据库
user = userMapper.selectById(userId);
// 写入缓存
if (user != null) {
redisTemplate.opsForValue().set("user:" + userId, user, Duration.ofMinutes(10));
}
return user;
}
}
3.2 缓存穿透、击穿、雪崩防护
缓存穿透防护:
// 布隆过滤器防止缓存穿透
@Component
public class BloomFilterCache {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private BloomFilter<String> bloomFilter;
@PostConstruct
public void init() {
// 初始化布隆过滤器
bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000, // 预期元素数量
0.01 // 误判率
);
}
public User getUser(Long userId) {
String key = "user:" + userId;
// 1. 布隆过滤器检查
if (!bloomFilter.mightContain(key)) {
// 不存在,直接返回null
return null;
}
// 2. 查询缓存
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 3. 查询数据库
user = userMapper.selectById(userId);
// 4. 更新缓存和布隆过滤器
if (user != null) {
redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(10));
bloomFilter.put(key);
} else {
// 缓存空值,防止重复查询
redisTemplate.opsForValue().set(key, null, Duration.ofMinutes(1));
}
return user;
}
}
缓存击穿防护:
// 使用分布式锁防止缓存击穿
@Service
public class CacheBreakdownService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserMapper userMapper;
private static final String LOCK_PREFIX = "lock:";
private static final String CACHE_PREFIX = "user:";
public User getUserWithLock(Long userId) {
String cacheKey = CACHE_PREFIX + userId;
String lockKey = LOCK_PREFIX + userId;
// 1. 查询缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 2. 获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", Duration.ofSeconds(30));
if (Boolean.TRUE.equals(lockAcquired)) {
try {
// 3. 双重检查(防止锁竞争)
user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 4. 查询数据库
user = userMapper.selectById(userId);
// 5. 更新缓存
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, Duration.ofMinutes(10));
} else {
// 缓存空值
redisTemplate.opsForValue().set(cacheKey, null, Duration.ofMinutes(1));
}
return user;
} finally {
// 6. 释放锁
redisTemplate.delete(lockKey);
}
} else {
// 等待并重试
try {
Thread.sleep(100);
return getUserWithLock(userId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
}
缓存雪崩防护:
// 缓存过期时间随机化
@Service
public class CacheAvalancheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void setCacheWithRandomTTL(String key, Object value) {
// 基础过期时间
int baseTTL = 600; // 10分钟
// 随机因子(±30%)
Random random = new Random();
int randomFactor = random.nextInt(600) - 300; // -300 ~ 300秒
// 最终过期时间
int finalTTL = baseTTL + randomFactor;
redisTemplate.opsForValue().set(key, value, Duration.ofSeconds(finalTTL));
}
// 缓存预热
@PostConstruct
public void cacheWarmUp() {
// 定时任务预热热点数据
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
// 预热热点数据
List<Long> hotUserIds = getHotUserIds();
for (Long userId : hotUserIds) {
User user = userMapper.selectById(userId);
if (user != null) {
setCacheWithRandomTTL("user:" + userId, user);
}
}
}, 0, 5, TimeUnit.MINUTES);
}
}
3.3 缓存与数据库一致性
最终一致性方案:
// 1. 更新数据库后异步更新缓存
@Service
public class CacheUpdateService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserMapper userMapper;
// 更新数据库
@Transactional
public void updateUser(User user) {
userMapper.updateById(user);
// 异步更新缓存(使用消息队列或线程池)
CompletableFuture.runAsync(() -> {
try {
// 延迟更新,避免缓存穿透
Thread.sleep(1000);
redisTemplate.delete("user:" + user.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
// 2. 使用Canal监听数据库变更
@Component
public class CanalListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 监听数据库binlog变更
@CanalEventListener
public void handleUpdate(CanalEntry.Event event) {
if (event.getTableName().equals("user")) {
// 解析变更数据
List<CanalEntry.RowData> rowDatas = event.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatas) {
// 获取变更后的数据
Map<String, String> afterData = parseRowData(rowData.getAfterColumnsList());
String userId = afterData.get("id");
// 更新缓存
User user = userMapper.selectById(Long.parseLong(userId));
if (user != null) {
redisTemplate.opsForValue().set("user:" + userId, user, Duration.ofMinutes(10));
}
}
}
}
}
四、SQL优化与执行计划
4.1 慢查询分析
慢查询日志配置:
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录
-- 查看慢查询统计
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';
-- 分析慢查询日志
mysqldumpslow /var/log/mysql/slow.log
使用Percona Toolkit分析:
# 安装Percona Toolkit
sudo apt-get install percona-toolkit
# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 分析特定时间段的慢查询
pt-query-digest --since='2024-01-01 00:00:00' --until='2024-01-02 00:00:00' /var/log/mysql/slow.log
4.2 执行计划分析
EXPLAIN命令详解:
-- 基本使用
EXPLAIN SELECT * FROM users WHERE age > 25 AND city = 'Beijing';
-- 查看详细执行计划
EXPLAIN FORMAT=JSON SELECT * FROM users WHERE age > 25 AND city = 'Beijing';
-- 查看优化器建议
EXPLAIN ANALYZE SELECT * FROM users WHERE age > 25 AND city = 'Beijing';
执行计划关键字段解读:
- type:访问类型(ALL > index > range > ref > eq_ref > const > system)
- key:实际使用的索引
- rows:预估扫描行数
- Extra:额外信息(Using index, Using where, Using filesort等)
优化示例:
-- 优化前(全表扫描)
EXPLAIN SELECT * FROM orders WHERE status = 'pending' AND create_time > '2024-01-01';
-- 优化后(使用索引)
CREATE INDEX idx_status_create_time ON orders(status, create_time);
EXPLAIN SELECT * FROM orders WHERE status = 'pending' AND create_time > '2024-01-01';
-- 查看索引使用情况
SHOW INDEX FROM orders;
4.3 SQL重写技巧
避免使用SELECT *:
-- 优化前
SELECT * FROM users WHERE id = 1;
-- 优化后(只查询需要的字段)
SELECT id, username, email FROM users WHERE id = 1;
使用JOIN替代子查询:
-- 优化前(子查询)
SELECT * FROM orders
WHERE user_id IN (SELECT id FROM users WHERE status = 'active');
-- 优化后(JOIN)
SELECT o.* FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE u.status = 'active';
使用UNION ALL替代UNION:
-- 优化前(去重,性能差)
SELECT * FROM orders WHERE status = 'pending'
UNION
SELECT * FROM orders WHERE status = 'processing';
-- 优化后(不去重,性能好)
SELECT * FROM orders WHERE status = 'pending'
UNION ALL
SELECT * FROM orders WHERE status = 'processing';
五、监控与调优
5.1 监控指标
关键监控指标:
- QPS/TPS:每秒查询/事务数
- 连接数:当前连接数 vs 最大连接数
- 缓存命中率:InnoDB Buffer Pool命中率
- 锁等待:锁等待时间
- 慢查询数:慢查询数量
监控SQL示例:
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 查看InnoDB Buffer Pool命中率
SELECT
(1 - (SELECT variable_value FROM information_schema.global_status
WHERE variable_name = 'Innodb_buffer_pool_reads') /
(SELECT variable_value FROM information_schema.global_status
WHERE variable_name = 'Innodb_buffer_pool_read_requests')) * 100
AS buffer_pool_hit_rate;
-- 查看锁等待
SELECT * FROM information_schema.innodb_locks;
SELECT * FROM information_schema.innodb_lock_waits;
5.2 性能调优工具
Percona Monitoring and Management (PMM):
# 安装PMM客户端
sudo apt-get install pmm2-client
# 连接到PMM服务器
pmm-admin config --server-url=http://pmm-server:80 --server-insecure-tls
# 添加MySQL监控
pmm-admin add mysql --username=pmm --password=pmm --query-source=perfschema
MySQL Workbench性能分析:
- 使用Performance Dashboard查看实时性能
- 使用Explain Plan可视化执行计划
- 使用Schema Inspector分析表结构
5.3 自动化调优
使用MySQL 8.0的自动索引建议:
-- 开启索引建议(MySQL 8.0+)
SET GLOBAL innodb_adaptive_hash_index = ON;
-- 查看索引建议
SELECT * FROM sys.schema_unused_indexes;
SELECT * FROM sys.schema_redundant_indexes;
使用pt-archiver进行数据归档:
# 归档旧数据,减少表大小
pt-archiver --source h=localhost,u=root,p=password,D=test,t=orders \
--dest h=localhost,u=root,p=password,D=test_archive,t=orders_archive \
--where "create_time < '2023-01-01'" \
--bulk-delete --bulk-insert --commit-each
六、实战案例:电商秒杀系统
6.1 架构设计
用户请求 → Nginx → 应用服务器 → Redis集群 → MySQL主库
↓
消息队列 → 异步处理 → MySQL从库
6.2 核心代码实现
// 秒杀服务
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
// 秒杀商品库存key
private static final String STOCK_KEY = "seckill:stock:";
// 秒杀订单key
private static final String ORDER_KEY = "seckill:order:";
/**
* 秒杀下单
*/
@Transactional
public SeckillResult seckill(Long userId, Long productId) {
String stockKey = STOCK_KEY + productId;
String orderKey = ORDER_KEY + userId + ":" + productId;
// 1. 检查是否已秒杀
if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
return SeckillResult.fail("已秒杀成功");
}
// 2. 预减库存(使用Redis原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,恢复库存
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("库存不足");
}
// 3. 生成秒杀订单(异步)
SeckillOrder order = new SeckillOrder();
order.setUserId(userId);
order.setProductId(productId);
order.setStatus(0); // 待支付
order.setCreateTime(new Date());
// 4. 发送消息到MQ,异步创建正式订单
rabbitTemplate.convertAndSend("seckill.order", order);
// 5. 记录已秒杀
redisTemplate.opsForValue().set(orderKey, order, Duration.ofMinutes(30));
return SeckillResult.success("秒杀成功,订单处理中");
}
/**
* 异步创建订单(消费者)
*/
@RabbitListener(queues = "seckill.order")
public void createOrder(SeckillOrder order) {
try {
// 1. 检查库存(数据库)
Product product = productMapper.selectById(order.getProductId());
if (product.getStock() <= 0) {
// 库存不足,回滚Redis库存
redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
return;
}
// 2. 扣减数据库库存
int update = productMapper.decreaseStock(order.getProductId());
if (update == 0) {
// 库存不足,回滚Redis库存
redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
return;
}
// 3. 创建订单
orderMapper.insert(order);
// 4. 发送支付通知
sendPaymentNotification(order);
} catch (Exception e) {
// 异常处理,回滚库存
redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
log.error("创建订单失败", e);
}
}
}
6.3 数据库表设计
-- 秒杀商品表
CREATE TABLE seckill_product (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) NOT NULL,
stock INT NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME NOT NULL,
version INT DEFAULT 0, -- 乐观锁版本号
INDEX idx_time (start_time, end_time)
) ENGINE=InnoDB;
-- 秒杀订单表
CREATE TABLE seckill_order (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
status TINYINT NOT NULL DEFAULT 0,
create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
update_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
INDEX idx_user_product (user_id, product_id),
INDEX idx_status (status)
) ENGINE=InnoDB;
-- 库存流水表(用于对账)
CREATE TABLE stock_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
product_id BIGINT NOT NULL,
change_amount INT NOT NULL, -- 正数增加,负数减少
type TINYINT NOT NULL, -- 1:秒杀扣减,2:回滚,3:手动调整
order_id BIGINT,
create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
INDEX idx_product_time (product_id, create_time)
) ENGINE=InnoDB;
七、总结
MySQL高并发处理是一个系统工程,需要从架构、设计、缓存、监控等多个维度综合考虑。关键要点包括:
- 架构层面:读写分离、分库分表、连接池优化
- 数据库设计:合理的索引、表结构、事务设计
- 缓存机制:多级缓存、防护策略、一致性保证
- SQL优化:执行计划分析、慢查询优化
- 监控调优:实时监控、自动化调优
在实际应用中,需要根据业务特点和性能瓶颈选择合适的策略。建议先通过监控工具定位瓶颈,再针对性地优化,避免过度设计。同时,保持架构的可扩展性,为未来的业务增长预留空间。
最后,高并发优化是一个持续的过程,需要不断地监控、分析和调整,才能达到最佳的性能表现。
