在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交网络还是在线支付系统,MySQL作为最流行的开源关系型数据库,常常面临巨大的并发压力。本文将从数据库优化、应用层优化、架构升级三个层面,系统性地探讨MySQL高并发处理的实战策略,并提供详细的代码示例和配置说明。
一、数据库层优化:榨取MySQL的每一滴性能
1.1 索引优化:高并发的基石
索引是数据库性能的基石,不当的索引设计会导致全表扫描,严重拖累高并发场景下的性能。
1.1.1 索引设计原则
- 覆盖索引:查询所需字段全部包含在索引中,避免回表操作
- 最左前缀原则:复合索引必须从左到右连续使用
- 避免冗余索引:定期检查并删除重复索引
1.1.2 实战案例:订单表索引优化
假设我们有一个订单表 orders,包含以下字段:
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
order_no VARCHAR(64) NOT NULL,
status TINYINT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_user_id (user_id),
INDEX idx_order_no (order_no),
INDEX idx_status_create_time (status, create_time)
);
问题场景:用户查询自己的订单列表,按创建时间倒序排列
-- 原始查询(可能使用idx_user_id索引,但需要回表)
SELECT * FROM orders WHERE user_id = 12345 ORDER BY create_time DESC;
-- 优化后的查询(使用覆盖索引)
SELECT order_no, status, amount, create_time
FROM orders
WHERE user_id = 12345
ORDER BY create_time DESC;
索引优化方案:
-- 创建覆盖索引
ALTER TABLE orders ADD INDEX idx_user_create_cover (user_id, create_time, order_no, status, amount);
验证索引使用情况:
EXPLAIN SELECT order_no, status, amount, create_time
FROM orders
WHERE user_id = 12345
ORDER BY create_time DESC;
输出结果中 key 字段应显示 idx_user_create_cover,Extra 字段显示 Using index,表示使用了覆盖索引。
1.2 查询优化:避免慢查询
1.2.1 慢查询日志配置
# my.cnf 配置
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 # 记录执行时间超过1秒的查询
log_queries_not_using_indexes = 1 # 记录未使用索引的查询
1.2.2 使用EXPLAIN分析查询
-- 示例:分析一个复杂查询
EXPLAIN
SELECT o.order_no, u.username, p.product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1
AND o.create_time >= '2024-01-01'
AND u.region = '北京'
ORDER BY o.amount DESC
LIMIT 100;
关键指标解读:
type:访问类型(ALL > index > range > ref > eq_ref > const > system)rows:预估扫描行数Extra:额外信息(Using filesort, Using temporary等)
1.2.3 避免常见性能陷阱
*陷阱1:SELECT **
-- 不推荐
SELECT * FROM orders WHERE user_id = 12345;
-- 推荐(明确指定字段)
SELECT id, order_no, status, amount FROM orders WHERE user_id = 12345;
陷阱2:在WHERE子句中使用函数
-- 不推荐(无法使用索引)
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 推荐(使用范围查询)
SELECT * FROM orders
WHERE create_time >= '2024-01-01 00:00:00'
AND create_time < '2024-01-02 00:00:00';
陷阱3:隐式类型转换
-- 不推荐(order_no是VARCHAR,传入数字会导致全表扫描)
SELECT * FROM orders WHERE order_no = 123456789;
-- 推荐(使用正确的类型)
SELECT * FROM orders WHERE order_no = '123456789';
1.3 表结构优化
1.3.1 数据类型选择
-- 不推荐(使用过大的数据类型)
CREATE TABLE user_info (
id BIGINT PRIMARY KEY,
age INT, -- 年龄用TINYINT足够(0-255)
status TINYINT, -- 状态用TINYINT足够
price DECIMAL(20,10) -- 价格用DECIMAL(10,2)足够
);
-- 推荐(选择合适的数据类型)
CREATE TABLE user_info (
id BIGINT PRIMARY KEY,
age TINYINT UNSIGNED, -- 0-255
status TINYINT,
price DECIMAL(10,2)
);
1.3.2 分区表设计
对于数据量大的表,可以使用分区表来提升查询性能:
-- 按时间范围分区
CREATE TABLE logs (
id BIGINT AUTO_INCREMENT,
log_time DATETIME NOT NULL,
content TEXT,
PRIMARY KEY (id, log_time)
) PARTITION BY RANGE (YEAR(log_time) * 100 + MONTH(log_time)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p_max VALUES LESS THAN MAXVALUE
);
-- 查询时会自动选择分区
SELECT * FROM logs WHERE log_time >= '2024-01-01' AND log_time < '2024-02-01';
1.4 配置优化
1.4.1 InnoDB关键参数配置
# my.cnf 配置示例(根据服务器内存调整)
[mysqld]
# 内存配置
innodb_buffer_pool_size = 16G # 建议设置为总内存的70-80%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,建议4-8
# 日志配置
innodb_log_file_size = 2G # 日志文件大小
innodb_log_buffer_size = 64M # 日志缓冲区大小
innodb_flush_log_at_trx_commit = 1 # 1:每次提交都刷盘(安全),2:每秒刷盘(性能更好)
# 并发配置
innodb_thread_concurrency = 32 # InnoDB线程并发数
innodb_read_io_threads = 8 # 读线程数
innodb_write_io_threads = 8 # 写线程数
# 锁配置
innodb_lock_wait_timeout = 50 # 锁等待超时时间(秒)
innodb_rollback_on_timeout = 0 # 超时是否回滚
# 连接配置
max_connections = 1000 # 最大连接数
thread_cache_size = 100 # 线程缓存
1.4.2 连接池配置
使用连接池避免频繁创建连接的开销:
// HikariCP 连接池配置示例
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
config.setUsername("root");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池配置
config.setMaximumPoolSize(100); // 最大连接数
config.setMinimumIdle(20); // 最小空闲连接
config.setConnectionTimeout(30000); // 连接超时时间(毫秒)
config.setIdleTimeout(600000); // 空闲连接超时时间
config.setMaxLifetime(1800000); // 连接最大存活时间
config.setLeakDetectionThreshold(2000); // 连接泄漏检测阈值
// 性能优化
config.setReadOnly(false); // 是否只读
config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
config.setConnectionTestQuery("SELECT 1");
HikariDataSource dataSource = new HikariDataSource(config);
二、应用层优化:减轻数据库压力
2.1 缓存策略
2.1.1 多级缓存架构
用户请求 → 本地缓存 → 分布式缓存 → 数据库
2.1.2 Redis缓存实战
// Spring Boot + Redis 缓存示例
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 缓存配置
private static final String ORDER_CACHE_PREFIX = "order:";
private static final long CACHE_TTL = 300; // 5分钟
/**
* 获取订单详情(带缓存)
*/
public Order getOrderWithCache(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 1. 先从缓存获取
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 2. 缓存未命中,查询数据库
order = orderMapper.selectById(orderId);
if (order != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
}
return order;
}
/**
* 更新订单(更新缓存)
*/
@Transactional
public void updateOrder(Order order) {
// 1. 更新数据库
orderMapper.updateById(order);
// 2. 删除缓存(或更新缓存)
String cacheKey = ORDER_CACHE_PREFIX + order.getId();
redisTemplate.delete(cacheKey);
// 3. 异步更新缓存(避免缓存穿透)
CompletableFuture.runAsync(() -> {
Order freshOrder = orderMapper.selectById(order.getId());
if (freshOrder != null) {
redisTemplate.opsForValue().set(cacheKey, freshOrder, CACHE_TTL, TimeUnit.SECONDS);
}
});
}
}
2.1.3 缓存穿透、击穿、雪崩解决方案
缓存穿透(查询不存在的数据):
// 布隆过滤器 + 空值缓存
public Order getOrderWithBloomFilter(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 1. 检查布隆过滤器(防止查询不存在的数据)
if (!bloomFilter.mightContain(orderId)) {
return null;
}
// 2. 查询缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 3. 查询数据库
order = orderMapper.selectById(orderId);
// 4. 缓存结果(包括null)
if (order != null) {
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
} else {
// 缓存空值,设置较短的TTL
redisTemplate.opsForValue().set(cacheKey, "NULL", 60, TimeUnit.SECONDS);
}
return order;
}
缓存击穿(热点key过期):
// 使用分布式锁
public Order getOrderWithLock(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
String lockKey = "lock:" + orderId;
// 1. 查询缓存
Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 2. 获取分布式锁
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待3秒,锁持有时间10秒
if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
// 双重检查
order = (Order) redisTemplate.opsForValue().get(cacheKey);
if (order != null) {
return order;
}
// 查询数据库
order = orderMapper.selectById(orderId);
if (order != null) {
redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return order;
}
缓存雪崩(大量key同时过期):
// 随机化TTL
public void setCacheWithRandomTTL(String key, Object value) {
// 基础TTL + 随机值(避免同时过期)
long ttl = CACHE_TTL + ThreadLocalRandom.current().nextLong(0, 300);
redisTemplate.opsForValue().set(key, value, ttl, TimeUnit.SECONDS);
}
// 热点数据永不过期 + 后台更新
public void updateHotData(Long orderId) {
String cacheKey = ORDER_CACHE_PREFIX + orderId;
// 1. 更新数据库
Order order = orderMapper.selectById(orderId);
// 2. 异步更新缓存(永不过期)
CompletableFuture.runAsync(() -> {
redisTemplate.opsForValue().set(cacheKey, order);
});
}
2.2 读写分离
2.2.1 主从复制配置
-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read_only = 1 # 从库只读
2.2.2 应用层读写分离实现
// Spring Boot + ShardingSphere 读写分离配置
@Configuration
public class DataSourceConfig {
@Bean
public DataSource masterDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://master:3306/mydb");
config.setUsername("root");
config.setPassword("password");
return new HikariDataSource(config);
}
@Bean
public DataSource slaveDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://slave:3306/mydb");
config.setUsername("root");
config.setPassword("password");
return new HikariDataSource(config);
}
@Bean
public DataSource routingDataSource() {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
RoutingDataSource routingDataSource = new RoutingDataSource();
routingDataSource.setDefaultTargetDataSource(masterDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
return routingDataSource;
}
@Bean
public DataSourceTransactionManager transactionManager(DataSource routingDataSource) {
return new DataSourceTransactionManager(routingDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource routingDataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(routingDataSource);
return bean.getObject();
}
}
// 自定义路由数据源
public class RoutingDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setMaster() {
CONTEXT_HOLDER.set("master");
}
public static void setSlave() {
CONTEXT_HOLDER.set("slave");
}
public static void clear() {
CONTEXT_HOLDER.remove();
}
@Override
protected Object determineCurrentLookupKey() {
return CONTEXT_HOLDER.get();
}
}
// 服务层使用
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RoutingDataSource routingDataSource;
/**
* 写操作使用主库
*/
@Transactional
public void createOrder(Order order) {
RoutingDataSource.setMaster();
try {
orderMapper.insert(order);
} finally {
RoutingDataSource.clear();
}
}
/**
* 读操作使用从库
*/
public Order getOrder(Long orderId) {
RoutingDataSource.setSlave();
try {
return orderMapper.selectById(orderId);
} finally {
RoutingDataSource.clear();
}
}
}
2.3 异步处理
2.3.1 消息队列解耦
// RabbitMQ 异步处理订单
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
/**
* 创建订单(异步处理)
*/
@Transactional
public void createOrderAsync(Order order) {
// 1. 保存订单到数据库(核心数据)
orderMapper.insert(order);
// 2. 发送消息到队列(异步处理非核心业务)
OrderMessage message = new OrderMessage();
message.setOrderId(order.getId());
message.setAction("CREATE");
rabbitTemplate.convertAndSend("order.exchange", "order.create", message);
// 3. 返回响应(不等待异步任务完成)
}
/**
* 消费消息处理订单
*/
@RabbitListener(queues = "order.queue")
public void processOrderMessage(OrderMessage message) {
// 异步处理:发送邮件、更新统计、通知库存等
switch (message.getAction()) {
case "CREATE":
sendOrderEmail(message.getOrderId());
updateOrderStatistics(message.getOrderId());
notifyInventory(message.getOrderId());
break;
case "UPDATE":
// 处理更新逻辑
break;
}
}
private void sendOrderEmail(Long orderId) {
// 发送邮件逻辑
}
private void updateOrderStatistics(Long orderId) {
// 更新统计信息
}
private void notifyInventory(Long orderId) {
// 通知库存系统
}
}
2.3.2 批量处理
// 批量插入优化
@Service
public class BatchService {
@Autowired
private OrderMapper orderMapper;
/**
* 批量插入订单(使用MyBatis批量插入)
*/
public void batchInsertOrders(List<Order> orders) {
// 分批处理,避免单次SQL过大
int batchSize = 1000;
for (int i = 0; i < orders.size(); i += batchSize) {
int end = Math.min(i + batchSize, orders.size());
List<Order> batch = orders.subList(i, end);
// 使用MyBatis批量插入
orderMapper.batchInsert(batch);
// 每批提交后清理session,释放内存
if (i % 10000 == 0) {
System.gc();
}
}
}
/**
* 批量更新(使用ON DUPLICATE KEY UPDATE)
*/
public void batchUpdateOrders(List<Order> orders) {
// 生成批量更新SQL
StringBuilder sql = new StringBuilder();
sql.append("INSERT INTO orders (id, order_no, status, amount) VALUES ");
for (int i = 0; i < orders.size(); i++) {
Order order = orders.get(i);
if (i > 0) sql.append(",");
sql.append(String.format("(%d, '%s', %d, %.2f)",
order.getId(), order.getOrderNo(), order.getStatus(), order.getAmount()));
}
sql.append(" ON DUPLICATE KEY UPDATE status=VALUES(status), amount=VALUES(amount)");
// 执行批量更新
orderMapper.batchUpdate(sql.toString());
}
}
三、架构升级:从单机到分布式
3.1 读写分离架构
3.1.1 架构图
应用层
↓
负载均衡(Nginx)
↓
应用服务器集群
↓
数据库代理层(ShardingSphere/ProxySQL)
↓
主库(写) ← 同步 → 从库1(读)
← 同步 → 从库2(读)
← 同步 → 从库3(读)
3.1.2 使用ShardingSphere实现读写分离
# sharding.yaml 配置
dataSources:
ds_0:
url: jdbc:mysql://master:3306/mydb
username: root
password: password
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 600000
maxLifetimeMilliseconds: 1800000
maximumPoolSize: 50
ds_1:
url: jdbc:mysql://slave1:3306/mydb
username: root
password: password
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 600000
maxLifetimeMilliseconds: 1800000
maximumPoolSize: 50
ds_2:
url: jdbc:mysql://slave2:3306/mydb
username: root
password: password
connectionTimeoutMilliseconds: 30000
idleTimeoutMilliseconds: 600000
maxLifetimeMilliseconds: 1800000
maximumPoolSize: 50
shardingRule:
tables:
orders:
actualDataNodes: ds_0.orders
tableStrategy:
standard:
shardingColumn: id
preciseAlgorithmClassName: com.example.OrderPreciseShardingAlgorithm
databaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
defaultDatabaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
defaultTableStrategy:
complex:
shardingColumns: id, user_id
algorithmClassName: com.example.OrderComplexShardingAlgorithm
# 读写分离配置
masterSlaveRules:
ds_0:
name: ds_0
masterDataSourceName: ds_0
slaveDataSourceNames:
- ds_1
- ds_2
loadBalanceAlgorithmType: ROUND_ROBIN # 轮询负载均衡
# 分片算法实现
public class UserDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
// 根据user_id取模决定数据源
int index = (int) (userId % availableTargetNames.size());
return availableTargetNames.toArray(new String[0])[index];
}
}
3.2 分库分表架构
3.2.1 分库分表策略
垂直分库:按业务模块拆分数据库
用户库(user_db):用户表、用户画像表
订单库(order_db):订单表、订单详情表
商品库(product_db):商品表、库存表
水平分表:按数据特征拆分表
订单表按用户ID分片:
orders_0: user_id % 4 == 0
orders_1: user_id % 4 == 1
orders_2: user_id % 4 == 2
orders_3: user_id % 4 == 3
3.2.2 使用ShardingSphere实现分库分表
# sharding.yaml 分库分表配置
dataSources:
ds_0:
url: jdbc:mysql://db0:3306/mydb
username: root
password: password
ds_1:
url: jdbc:mysql://db1:3306/mydb
username: root
password: password
ds_2:
url: jdbc:mysql://db2:3306/mydb
username: root
password: password
ds_3:
url: jdbc:mysql://db3:3306/mydb
username: root
password: password
shardingRule:
tables:
orders:
actualDataNodes: ds_${0..3}.orders_${0..3}
tableStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
databaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.OrderDatabaseShardingAlgorithm
keyGenerator:
column: id
type: SNOWFLAKE # 使用雪花算法生成分布式ID
props:
worker.id: 1
bindingTables:
- orders,order_items # 绑定表,避免跨库join
defaultDatabaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.DefaultDatabaseShardingAlgorithm
# 分片算法实现
public class OrderDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
// 根据user_id取模决定数据库
int dbIndex = (int) (userId % 4);
return "ds_" + dbIndex;
}
}
public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
// 根据user_id取模决定表
int tableIndex = (int) (userId % 4);
return "orders_" + tableIndex;
}
}
3.2.3 分布式ID生成
// 雪花算法实现
public class SnowflakeIdGenerator {
private final long datacenterId;
private final long workerId;
private long sequence = 0L;
private long lastTimestamp = -1L;
// 起始时间戳(2020-01-01 00:00:00)
private final long twepoch = 1577836800000L;
// 位数分配:41位时间戳 + 10位机器ID + 12位序列号
private final long workerIdBits = 10L;
private final long datacenterIdBits = 5L;
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
private final long sequenceBits = 12L;
private final long workerIdShift = sequenceBits;
private final long datacenterIdShift = sequenceBits + workerIdBits;
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
public SnowflakeIdGenerator(long datacenterId, long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift)
| (workerId << workerIdShift)
| sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
private long timeGen() {
return System.currentTimeMillis();
}
}
// 使用示例
public class IdGeneratorService {
private final SnowflakeIdGenerator idGenerator;
public IdGeneratorService() {
// 根据机器ID和数据中心ID初始化
this.idGenerator = new SnowflakeIdGenerator(1, 1);
}
public Long generateOrderId() {
return idGenerator.nextId();
}
}
3.3 分布式事务
3.3.1 Seata分布式事务框架
// Seata AT模式示例
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryService inventoryService;
@Autowired
private AccountService accountService;
/**
* 创建订单(分布式事务)
*/
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrderWithDistributedTransaction(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 扣减库存(跨服务调用)
inventoryService.deductStock(order.getProductId(), order.getQuantity());
// 3. 扣减账户余额(跨服务调用)
accountService.deductBalance(order.getUserId(), order.getAmount());
// 4. 发送消息(异步,不参与分布式事务)
sendOrderMessage(order);
}
/**
* 扣减库存服务
*/
@Transactional
public void deductStock(Long productId, Integer quantity) {
// 更新库存
inventoryMapper.deductStock(productId, quantity);
}
/**
* 扣减账户余额服务
*/
@Transactional
public void deductBalance(Long userId, BigDecimal amount) {
// 更新账户余额
accountMapper.deductBalance(userId, amount);
}
}
3.3.2 Saga模式实现
// Saga模式:补偿事务
@Service
public class OrderSagaService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryService inventoryService;
@Autowired
private AccountService accountService;
/**
* Saga模式创建订单
*/
public void createOrderWithSaga(Order order) {
try {
// 1. 创建订单(主事务)
orderMapper.insert(order);
// 2. 扣减库存(补偿事务:恢复库存)
inventoryService.deductStockWithCompensation(order.getProductId(), order.getQuantity());
// 3. 扣减账户余额(补偿事务:恢复余额)
accountService.deductBalanceWithCompensation(order.getUserId(), order.getAmount());
// 4. 提交事务
commitSaga(order);
} catch (Exception e) {
// 5. 异常时执行补偿操作
compensateSaga(order);
throw e;
}
}
private void commitSaga(Order order) {
// 更新订单状态为成功
orderMapper.updateStatus(order.getId(), 1);
}
private void compensateSaga(Order order) {
// 1. 恢复库存
inventoryService.restoreStock(order.getProductId(), order.getQuantity());
// 2. 恢复账户余额
accountService.restoreBalance(order.getUserId(), order.getAmount());
// 3. 更新订单状态为失败
orderMapper.updateStatus(order.getId(), -1);
}
}
3.4 数据库中间件
3.4.1 使用ProxySQL实现读写分离
-- ProxySQL配置示例
-- 1. 添加后端MySQL服务器
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) VALUES
(10, 'master', 3306, 100), -- 主库,hostgroup_id=10
(20, 'slave1', 3306, 50), -- 从库1,hostgroup_id=20
(20, 'slave2', 3306, 50); -- 从库2,hostgroup_id=20
-- 2. 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*FOR UPDATE', 10, 1), -- SELECT FOR UPDATE走主库
(2, 1, '^SELECT', 20, 1), -- 普通SELECT走从库
(3, 1, '^(INSERT|UPDATE|DELETE)', 10, 1); -- 写操作走主库
-- 3. 配置健康检查
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_connect_string';
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_ping_string';
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_read_only_string';
-- 4. 加载配置
LOAD MYSQL VARIABLES TO RUNTIME;
LOAD MYSQL SERVERS TO RUNTIME;
LOAD MYSQL QUERY RULES TO RUNTIME;
LOAD MYSQL USERS TO RUNTIME;
-- 5. 保存配置到磁盘
SAVE MYSQL VARIABLES TO DISK;
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE MYSQL USERS TO DISK;
3.4.2 使用ShardingSphere Proxy
# server.yaml
mode:
type: Standalone
repository:
type: File
props:
path: ./conf
dataSources:
ds_0:
url: jdbc:mysql://master:3306/mydb
username: root
password: password
ds_1:
url: jdbc:mysql://slave1:3306/mydb
username: root
password: password
ds_2:
url: jdbc:mysql://slave2:3306/mydb
username: root
password: password
shardingRule:
tables:
orders:
actualDataNodes: ds_0.orders
tableStrategy:
standard:
shardingColumn: id
preciseAlgorithmClassName: com.example.OrderShardingAlgorithm
databaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
masterSlaveRules:
ds_0:
name: ds_0
masterDataSourceName: ds_0
slaveDataSourceNames:
- ds_1
- ds_2
loadBalanceAlgorithmType: ROUND_ROBIN
# 配置ShardingSphere Proxy
# 启动命令:sharding-proxy -c server.yaml
四、监控与调优
4.1 监控指标
4.1.1 关键监控指标
-- 查看数据库连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';
-- 查看InnoDB缓冲池命中率
SHOW STATUS LIKE 'Innodb_buffer_pool_read%';
-- 计算缓冲池命中率
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100%
-- 查看锁等待情况
SHOW ENGINE INNODB STATUS\G
-- 查看表锁情况
SHOW OPEN TABLES WHERE In_use > 0;
4.1.2 使用Prometheus + Grafana监控
# prometheus.yml 配置
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysql-exporter:9104']
metrics_path: /metrics
scrape_interval: 15s
# mysqld_exporter 配置
# 启动命令:mysqld_exporter --config.my-cnf=/etc/mysql/my.cnf
// Spring Boot Actuator + Micrometer 集成
@Configuration
public class MetricsConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config().commonTags("application", "order-service");
}
@Bean
public DataSourcePoolMetrics dataSourcePoolMetrics(DataSource dataSource) {
return new DataSourcePoolMetrics((HikariDataSource) dataSource, "hikari", Tags.empty());
}
}
// 自定义监控指标
@Service
public class OrderMetricsService {
@Autowired
private MeterRegistry meterRegistry;
/**
* 记录订单创建耗时
*/
public void recordOrderCreationTime(long duration) {
Timer.builder("order.creation.time")
.description("Order creation time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry)
.record(duration, TimeUnit.MILLISECONDS);
}
/**
* 记录数据库查询次数
*/
public void recordDatabaseQuery(String queryType) {
Counter.builder("database.query.count")
.description("Database query count")
.tag("type", queryType)
.register(meterRegistry)
.increment();
}
}
4.2 性能调优工具
4.2.1 MySQL性能分析工具
# 1. 使用pt-query-digest分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 2. 使用Percona Toolkit
pt-mysql-summary --user=root --password=password
# 3. 使用MySQLTuner
perl mysqltuner.pl --user root --password password
# 4. 使用sys schema(MySQL 5.7+)
-- 查看最耗时的查询
SELECT * FROM sys.statements_with_temp_tables;
-- 查看未使用索引的表
SELECT * FROM sys.schema_unused_indexes;
-- 查看锁等待
SELECT * FROM sys.innodb_lock_waits;
4.2.2 性能测试工具
// JMeter 性能测试脚本示例
// 1. 创建线程组:100个线程,循环1000次
// 2. 添加HTTP请求:POST /api/orders
// 3. 添加监听器:查看结果树、聚合报告、图形结果
// 2. 使用Gatling进行高并发测试
public class OrderSimulation extends Simulation {
HttpProtocolBuilder httpProtocol = http
.baseUrl("http://localhost:8080")
.acceptHeader("application/json")
.contentTypeHeader("application/json");
ScenarioBuilder scn = scenario("Order Creation")
.exec(http("Create Order")
.post("/api/orders")
.body(StringBody("""
{
"userId": 12345,
"productId": 1001,
"quantity": 2,
"amount": 199.99
}
"""))
.check(status().is(200)));
public OrderSimulation() {
setUp(
scn.injectOpen(
atOnceUsers(100), // 立即启动100个用户
rampUsersPerSec(100).to(500).during(60), // 60秒内从100增加到500用户/秒
constantUsersPerSec(500).during(120) // 保持500用户/秒持续120秒
)
).protocols(httpProtocol);
}
}
五、实战案例:电商秒杀系统
5.1 系统架构设计
用户请求 → CDN → 负载均衡 → 应用服务器集群
↓
Redis集群(缓存库存)
↓
消息队列(RabbitMQ/Kafka)
↓
MySQL分库分表(订单库、商品库)
↓
异步处理(邮件、通知、统计)
5.2 核心代码实现
5.2.1 秒杀服务
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductMapper productMapper;
// 秒杀商品库存Key前缀
private static final String SECKILL_STOCK_PREFIX = "seckill:stock:";
// 秒杀订单Key前缀
private static final String SECKILL_ORDER_PREFIX = "seckill:order:";
/**
* 秒杀下单
*/
@Transactional
public SeckillResult seckill(Long productId, Long userId) {
String stockKey = SECKILL_STOCK_PREFIX + productId;
String orderKey = SECKILL_ORDER_PREFIX + productId + ":" + userId;
// 1. 检查是否已秒杀过
if (redisTemplate.hasKey(orderKey)) {
return SeckillResult.fail("您已秒杀过该商品");
}
// 2. 扣减库存(使用Redis原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 恢复库存
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("库存不足");
}
// 3. 生成秒杀订单
Long orderId = generateSeckillOrderId();
SeckillOrder order = new SeckillOrder();
order.setId(orderId);
order.setProductId(productId);
order.setUserId(userId);
order.setQuantity(1);
order.setStatus(0); // 待支付
// 4. 保存订单到Redis(临时存储)
redisTemplate.opsForValue().set(orderKey, order, 30, TimeUnit.MINUTES);
// 5. 发送消息到队列(异步持久化到数据库)
SeckillMessage message = new SeckillMessage();
message.setOrderId(orderId);
message.setProductId(productId);
message.setUserId(userId);
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
return SeckillResult.success(orderId);
}
/**
* 异步处理秒杀订单
*/
@RabbitListener(queues = "seckill.queue")
public void processSeckillOrder(SeckillMessage message) {
try {
// 1. 从Redis获取订单
String orderKey = SECKILL_ORDER_PREFIX + message.getProductId() + ":" + message.getUserId();
SeckillOrder order = (SeckillOrder) redisTemplate.opsForValue().get(orderKey);
if (order == null) {
log.error("秒杀订单不存在: {}", message);
return;
}
// 2. 持久化到数据库
orderMapper.insert(order);
// 3. 扣减数据库库存(异步)
productMapper.decrementStock(message.getProductId(), 1);
// 4. 发送支付通知
sendPaymentNotification(order);
// 5. 删除Redis临时订单(可选,保留一段时间用于查询)
// redisTemplate.delete(orderKey);
} catch (Exception e) {
log.error("处理秒杀订单失败", e);
// 异常处理:恢复库存等
}
}
/**
* 生成分布式订单ID
*/
private Long generateSeckillOrderId() {
// 使用雪花算法生成ID
SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(1, 1);
return idGenerator.nextId();
}
/**
* 发送支付通知
*/
private void sendPaymentNotification(SeckillOrder order) {
// 发送短信、邮件等通知
}
}
5.2.2 库存预热
@Service
public class StockPreheatService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductMapper productMapper;
/**
* 秒杀前预热库存到Redis
*/
public void preheatStock(Long productId, Integer stock) {
String stockKey = SECKILL_STOCK_PREFIX + productId;
// 1. 检查Redis中是否已有库存
Long currentStock = (Long) redisTemplate.opsForValue().get(stockKey);
if (currentStock != null && currentStock > 0) {
log.warn("Redis中已存在库存,跳过预热");
return;
}
// 2. 预热库存到Redis
redisTemplate.opsForValue().set(stockKey, stock, 30, TimeUnit.MINUTES);
// 3. 设置库存预警(当库存低于阈值时报警)
redisTemplate.opsForValue().set("seckill:stock:alert:" + productId, stock, 30, TimeUnit.MINUTES);
}
/**
* 定时同步库存到数据库
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void syncStockToDatabase() {
// 获取所有秒杀商品
Set<String> keys = redisTemplate.keys(SECKILL_STOCK_PREFIX + "*");
if (keys == null || keys.isEmpty()) {
return;
}
for (String key : keys) {
Long stock = (Long) redisTemplate.opsForValue().get(key);
if (stock == null) {
continue;
}
// 解析商品ID
Long productId = Long.parseLong(key.substring(SECKILL_STOCK_PREFIX.length()));
// 更新数据库库存
productMapper.updateStock(productId, stock);
}
}
}
5.2.3 限流与降级
@Component
public class RateLimiter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 令牌桶限流
*/
public boolean tryAcquire(String key, int permits, long timeout) {
String luaScript = """
local key = KEYS[1]
local permits = tonumber(ARGV[1])
local timeout = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current == false then
redis.call('SET', key, permits)
redis.call('EXPIRE', key, timeout)
return 1
end
local currentNum = tonumber(current)
if currentNum >= permits then
redis.call('DECRBY', key, permits)
return 1
else
return 0
end
""";
RedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), permits, timeout);
return result != null && result == 1;
}
/**
* 限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimit {
String key() default "";
int permits() default 100;
long timeout() default 60;
String message() default "请求过于频繁,请稍后再试";
}
/**
* AOP切面处理限流
*/
@Aspect
@Component
public class RateLimitAspect {
@Autowired
private RateLimiter rateLimiter;
@Around("@annotation(rateLimit)")
public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
String key = rateLimit.key();
if (StringUtils.isEmpty(key)) {
// 生成默认key:类名+方法名
String className = joinPoint.getTarget().getClass().getSimpleName();
String methodName = joinPoint.getSignature().getName();
key = className + ":" + methodName;
}
if (!rateLimiter.tryAcquire(key, rateLimit.permits(), rateLimit.timeout())) {
throw new RuntimeException(rateLimit.message());
}
return joinPoint.proceed();
}
}
}
// 使用示例
@Service
public class SeckillController {
@Autowired
private SeckillService seckillService;
@RateLimit(key = "seckill:limit", permits = 1000, timeout = 60, message = "秒杀过于火爆,请稍后再试")
@PostMapping("/seckill")
public SeckillResult seckill(@RequestBody SeckillRequest request) {
return seckillService.seckill(request.getProductId(), request.getUserId());
}
}
六、总结与最佳实践
6.1 优化优先级
- 索引优化:成本最低,效果最明显
- 查询优化:避免慢查询,减少数据库压力
- 缓存策略:减少数据库访问次数
- 读写分离:提升读性能,分担写压力
- 分库分表:解决数据量和并发量瓶颈
- 架构升级:从单机到分布式,从同步到异步
6.2 常见误区
- 过早优化:不要在没有性能问题时进行复杂的优化
- 过度分片:分片会增加复杂度,应根据实际数据量决定
- 缓存滥用:缓存不是万能的,需要考虑数据一致性
- 忽略监控:没有监控的优化是盲目的
6.3 持续优化
- 定期审查:每季度审查慢查询和索引使用情况
- 压力测试:定期进行压力测试,发现性能瓶颈
- 监控告警:建立完善的监控体系,及时发现问题
- 技术演进:关注MySQL新版本特性,适时升级
通过以上策略的综合运用,可以有效提升MySQL在高并发场景下的性能表现。记住,没有银弹,需要根据具体业务场景选择合适的优化方案,并持续监控和调整。
