在当今互联网应用中,高并发场景已成为常态。无论是电商平台的秒杀活动、社交媒体的实时互动,还是金融系统的交易处理,MySQL作为最流行的关系型数据库之一,常常面临海量请求冲击和性能瓶颈的挑战。本文将深入探讨MySQL在高并发环境下的处理策略,从架构设计、配置优化到代码实现,提供一套完整的解决方案。
一、高并发场景下的MySQL性能瓶颈分析
1.1 常见性能瓶颈点
在高并发场景下,MySQL通常会在以下几个方面遇到瓶颈:
- 连接数瓶颈:当并发连接数超过MySQL的最大连接数(max_connections)时,新连接会被拒绝
- CPU瓶颈:复杂查询、大量排序或临时表操作会导致CPU使用率飙升
- I/O瓶颈:频繁的磁盘读写操作,特别是随机I/O,会成为性能瓶颈
- 锁竞争:行锁、表锁、间隙锁等导致的等待和死锁
- 内存瓶颈:缓冲池(Buffer Pool)不足导致频繁的磁盘I/O
1.2 性能监控与诊断
在优化之前,我们需要先诊断问题。以下是常用的监控工具和方法:
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query_log';
-- 查看InnoDB缓冲池状态
SHOW ENGINE INNODB STATUS\G
-- 查看当前正在运行的查询
SHOW PROCESSLIST;
示例:使用Percona Toolkit进行性能分析
# 安装Percona Toolkit
sudo apt-get install percona-toolkit
# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log
# 分析表结构
pt-table-checksum h=localhost,u=root,p=password
# 检查复制延迟
pt-heartbeat --update --host=localhost --user=root --password=password
二、架构层面的优化策略
2.1 读写分离架构
读写分离是应对高并发读操作的有效策略。通过将读请求分发到多个从库,减轻主库压力。
实现方案:
- 使用ProxySQL作为中间件
-- ProxySQL配置示例
-- 添加主库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight)
VALUES (10, 'master.example.com', 3306, 100);
-- 添加从库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight)
VALUES (20, 'slave1.example.com', 3306, 100);
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight)
VALUES (20, 'slave2.example.com', 3306, 100);
-- 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply)
VALUES (1, 1, '^SELECT.*FOR UPDATE', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply)
VALUES (2, 1, '^SELECT', 20, 1);
- 使用Spring Boot + ShardingSphere实现读写分离
@Configuration
public class DataSourceConfig {
@Bean
public DataSource masterDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://master:3306/mydb");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public DataSource slaveDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl("jdbc:mysql://slave:3306/mydb");
dataSource.setUsername("root");
dataSource.setPassword("password");
return dataSource;
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
2.2 分库分表策略
当单表数据量超过千万级别时,需要考虑分库分表。
垂直分表示例:
-- 原始表结构
CREATE TABLE user (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
address TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
-- 垂直分表后
CREATE TABLE user_basic (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE TABLE user_address (
id BIGINT PRIMARY KEY,
user_id BIGINT,
address TEXT,
FOREIGN KEY (user_id) REFERENCES user_basic(id)
);
水平分表示例(按用户ID哈希):
-- 分片规则:user_id % 4
-- 表结构
CREATE TABLE user_0 (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
created_at TIMESTAMP
);
CREATE TABLE user_1 (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
created_at TIMESTAMP
);
-- ... user_2, user_3
使用ShardingSphere实现分库分表:
# sharding.yaml
dataSources:
ds_0: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_0
username: root
password: password
ds_1: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_1
username: root
password: password
shardingRule:
tables:
user:
actualDataNodes: ds_${0..1}.user_${0..3}
tableStrategy:
inline:
shardingColumn: id
algorithmExpression: user_${id % 4}
databaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
bindingTables:
- user
2.3 缓存策略
引入缓存层可以显著减少数据库压力。
Redis缓存示例:
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String USER_CACHE_KEY = "user:";
private static final long CACHE_TTL = 3600; // 1小时
public User getUserById(Long id) {
String cacheKey = USER_CACHE_KEY + id;
// 1. 先从缓存获取
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 2. 缓存未命中,查询数据库
user = userRepository.findById(id).orElse(null);
// 3. 写入缓存
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
}
return user;
}
public void updateUser(User user) {
// 更新数据库
userRepository.save(user);
// 删除缓存
String cacheKey = USER_CACHE_KEY + user.getId();
redisTemplate.delete(cacheKey);
}
}
缓存穿透、击穿、雪崩解决方案:
// 解决缓存穿透:使用空值缓存
public User getUserByIdWithNullCache(Long id) {
String cacheKey = USER_CACHE_KEY + id;
String nullKey = cacheKey + ":null";
// 检查空值缓存
if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
return null;
}
// 检查正常缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 查询数据库
user = userRepository.findById(id).orElse(null);
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
} else {
// 缓存空值,防止缓存穿透
redisTemplate.opsForValue().set(nullKey, "", 60, TimeUnit.SECONDS);
}
return user;
}
// 解决缓存击穿:使用分布式锁
public User getUserByIdWithLock(Long id) {
String cacheKey = USER_CACHE_KEY + id;
String lockKey = "lock:" + cacheKey;
// 尝试获取缓存
User user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(lockAcquired)) {
try {
// 双重检查
user = (User) redisTemplate.opsForValue().get(cacheKey);
if (user != null) {
return user;
}
// 查询数据库
user = userRepository.findById(id).orElse(null);
// 写入缓存
if (user != null) {
redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
// 等待并重试
try {
Thread.sleep(100);
return getUserByIdWithLock(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return user;
}
三、MySQL配置优化
3.1 关键参数调优
InnoDB缓冲池配置:
# my.cnf 配置示例
[mysqld]
# 缓冲池大小,通常设置为物理内存的50%-70%
innodb_buffer_pool_size = 16G
# 缓冲池实例数,根据CPU核心数调整
innodb_buffer_pool_instances = 8
# 缓冲池预热,重启后自动加载热点数据
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON
# 日志文件大小,建议1-2G
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
# 刷新策略
innodb_flush_log_at_trx_commit = 1 # 事务提交时立即刷盘(ACID保证)
innodb_flush_method = O_DIRECT # 直接I/O,避免双缓冲
# 并发设置
innodb_thread_concurrency = 0 # 0表示自动管理
innodb_read_io_threads = 8
innodb_write_io_threads = 8
# 锁相关
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = OFF
连接相关配置:
# 连接数配置
max_connections = 2000
max_user_connections = 1800
# 连接超时
wait_timeout = 600
interactive_timeout = 600
# 连接池配置
thread_cache_size = 100
thread_handling = pool-of-threads
# 线程栈大小(根据业务调整)
thread_stack = 256K
查询缓存配置(MySQL 8.0已移除,5.7及以下版本):
# 查询缓存(MySQL 5.7及以下)
query_cache_type = 1
query_cache_size = 128M
query_cache_limit = 2M
3.2 索引优化策略
复合索引设计原则:
-- 错误示例:索引列顺序不合理
CREATE INDEX idx_user ON orders (user_id, order_date, status);
-- 正确示例:遵循最左前缀原则
CREATE INDEX idx_user_date_status ON orders (user_id, order_date, status);
-- 查询示例
-- 能用到索引
SELECT * FROM orders WHERE user_id = 123 AND order_date = '2023-01-01';
SELECT * FROM orders WHERE user_id = 123;
-- 不能用到索引
SELECT * FROM orders WHERE order_date = '2023-01-01';
覆盖索引优化:
-- 原始查询(需要回表)
SELECT user_id, username, email FROM users WHERE age > 25;
-- 创建覆盖索引
CREATE INDEX idx_age_cover ON users (age, user_id, username, email);
-- 优化后的查询(索引覆盖,无需回表)
SELECT user_id, username, email FROM users WHERE age > 25;
索引下推优化(ICP):
-- MySQL 5.6+ 支持索引下推
-- 原始查询
SELECT * FROM users WHERE age > 25 AND name LIKE '张%';
-- 创建索引
CREATE INDEX idx_age_name ON users (age, name);
-- 索引下推会先在索引中过滤name条件,再回表
-- 通过EXPLAIN查看是否使用ICP
EXPLAIN SELECT * FROM users WHERE age > 25 AND name LIKE '张%';
-- 查看Extra列是否包含"Using index condition"
3.3 查询优化技巧
*避免SELECT **:**
-- 错误示例
SELECT * FROM orders WHERE user_id = 123;
-- 正确示例:只选择需要的列
SELECT order_id, order_date, total_amount FROM orders WHERE user_id = 123;
使用LIMIT分页优化:
-- 传统分页(深度分页性能差)
SELECT * FROM orders ORDER BY order_date DESC LIMIT 1000000, 10;
-- 优化方案1:使用子查询
SELECT * FROM orders
WHERE order_id IN (
SELECT order_id FROM orders
ORDER BY order_date DESC
LIMIT 1000000, 10
);
-- 优化方案2:使用游标(推荐)
SELECT * FROM orders
WHERE order_date < '2023-01-01'
ORDER BY order_date DESC
LIMIT 10;
-- 优化方案3:使用ES或ClickHouse等搜索引擎
JOIN优化:
-- 错误示例:多表JOIN
SELECT o.*, u.username, p.product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id
WHERE o.order_date > '2023-01-01';
-- 优化方案:拆分查询
-- 第一步:查询订单
SELECT o.* FROM orders o WHERE o.order_date > '2023-01-01';
-- 第二步:批量查询用户信息
SELECT u.id, u.username FROM users u WHERE u.id IN (1,2,3,...);
-- 第三步:批量查询产品信息
SELECT p.id, p.product_name FROM products p WHERE p.id IN (1,2,3,...);
四、应用层优化策略
4.1 连接池配置
HikariCP连接池配置示例:
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
// 基本配置
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
dataSource.setUsername("root");
dataSource.setPassword("password");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池配置
dataSource.setMaximumPoolSize(50); // 最大连接数
dataSource.setMinimumIdle(10); // 最小空闲连接
dataSource.setConnectionTimeout(30000); // 连接超时30秒
dataSource.setIdleTimeout(600000); // 空闲超时10分钟
dataSource.setMaxLifetime(1800000); // 连接最大存活时间30分钟
// 性能优化
dataSource.setConnectionTestQuery("SELECT 1");
dataSource.setValidationTimeout(5000);
dataSource.setLeakDetectionThreshold(60000);
// 连接预热
dataSource.setInitializationFailTimeout(1);
return dataSource;
}
}
连接池监控:
@Component
public class ConnectionPoolMonitor {
@Autowired
private HikariDataSource dataSource;
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorConnectionPool() {
HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
System.out.println("=== 连接池监控 ===");
System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
System.out.println("总连接数: " + poolMXBean.getTotalConnections());
System.out.println("等待连接数: " + poolMXBean.getThreadsAwaitingConnection());
// 如果等待连接数过多,发出告警
if (poolMXBean.getThreadsAwaitingConnection() > 10) {
System.err.println("警告:连接池等待队列过长!");
}
}
}
4.2 批量操作优化
批量插入优化:
// 批量插入示例
public void batchInsert(List<User> users) {
String sql = "INSERT INTO user (username, email, age) VALUES (?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
// 关闭自动提交
conn.setAutoCommit(false);
int batchSize = 1000; // 每批1000条
for (int i = 0; i < users.size(); i++) {
User user = users.get(i);
ps.setString(1, user.getUsername());
ps.setString(2, user.getEmail());
ps.setInt(3, user.getAge());
ps.addBatch();
// 达到批次大小时执行
if ((i + 1) % batchSize == 0) {
ps.executeBatch();
conn.commit();
}
}
// 执行剩余记录
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
}
批量更新优化:
// 批量更新示例
public void batchUpdate(List<User> users) {
String sql = "UPDATE user SET username = ?, email = ? WHERE id = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
conn.setAutoCommit(false);
for (User user : users) {
ps.setString(1, user.getUsername());
ps.setString(2, user.getEmail());
ps.setLong(3, user.getId());
ps.addBatch();
}
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
}
4.3 异步处理与消息队列
使用消息队列削峰填谷:
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderRepository orderRepository;
// 同步接口(快速响应)
@PostMapping("/orders")
public ResponseEntity<String> createOrder(@RequestBody Order order) {
// 1. 基本验证
if (order == null || order.getUserId() == null) {
return ResponseEntity.badRequest().body("参数错误");
}
// 2. 生成订单ID
order.setId(generateOrderId());
order.setStatus("PENDING");
order.setCreatedAt(new Date());
// 3. 发送到消息队列(异步处理)
rabbitTemplate.convertAndSend("order.exchange", "order.create", order);
// 4. 立即返回,不等待数据库写入
return ResponseEntity.ok("订单已接收,ID: " + order.getId());
}
// 消费者处理消息
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
try {
// 1. 业务逻辑处理
validateOrder(order);
calculateTotalAmount(order);
// 2. 保存到数据库
orderRepository.save(order);
// 3. 发送成功事件
rabbitTemplate.convertAndSend("order.exchange", "order.success", order);
} catch (Exception e) {
// 4. 失败处理
order.setStatus("FAILED");
order.setErrorMessage(e.getMessage());
orderRepository.save(order);
// 发送到死信队列
rabbitTemplate.convertAndSend("order.exchange", "order.dead", order);
}
}
}
五、高并发场景下的特殊优化
5.1 秒杀场景优化
秒杀系统架构设计:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private OrderRepository orderRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String SECKILL_KEY = "seckill:stock:";
private static final String SECKILL_USER_KEY = "seckill:user:";
// 秒杀接口
@Transactional
public SeckillResult seckill(Long userId, Long productId) {
String stockKey = SECKILL_KEY + productId;
String userKey = SECKILL_USER_KEY + productId;
// 1. 校验用户是否已参与
if (Boolean.TRUE.equals(redisTemplate.hasKey(userKey + ":" + userId))) {
return SeckillResult.fail("您已参与过该秒杀");
}
// 2. 检查库存(Redis预减库存)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("库存不足");
}
// 3. 标记用户已参与
redisTemplate.opsForValue().set(userKey + ":" + userId, "1", 30, TimeUnit.MINUTES);
// 4. 发送消息到队列异步创建订单
SeckillOrder order = new SeckillOrder();
order.setUserId(userId);
order.setProductId(productId);
order.setSeckillTime(new Date());
order.setStatus("PROCESSING");
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.create", order);
return SeckillResult.success("秒杀成功,订单处理中");
}
// 消费者处理秒杀订单
@RabbitListener(queues = "seckill.queue")
public void processSeckillOrder(SeckillOrder order) {
try {
// 1. 检查库存(数据库)
Product product = productRepository.findById(order.getProductId());
if (product == null || product.getStock() <= 0) {
// 库存不足,回滚Redis库存
redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
return;
}
// 2. 扣减数据库库存
int updated = productRepository.decrementStock(order.getProductId());
if (updated == 0) {
// 扣减失败,回滚Redis库存
redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
return;
}
// 3. 创建订单
order.setStatus("SUCCESS");
orderRepository.save(order);
// 4. 发送成功通知
sendSuccessNotification(order);
} catch (Exception e) {
// 异常处理:回滚Redis库存
redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
}
}
}
5.2 实时统计场景优化
使用物化视图或汇总表:
-- 创建汇总表
CREATE TABLE daily_sales_summary (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
date DATE NOT NULL,
product_id BIGINT NOT NULL,
total_sales DECIMAL(10,2) NOT NULL,
total_orders INT NOT NULL,
UNIQUE KEY uk_date_product (date, product_id)
);
-- 创建定时任务更新汇总表
DELIMITER $$
CREATE EVENT update_daily_sales_summary
ON SCHEDULE EVERY 1 DAY
STARTS '2023-01-01 00:00:00'
DO
BEGIN
-- 清空当日数据
DELETE FROM daily_sales_summary WHERE date = CURDATE();
-- 插入新数据
INSERT INTO daily_sales_summary (date, product_id, total_sales, total_orders)
SELECT
DATE(o.created_at) as date,
o.product_id,
SUM(o.amount) as total_sales,
COUNT(o.id) as total_orders
FROM orders o
WHERE DATE(o.created_at) = CURDATE()
GROUP BY DATE(o.created_at), o.product_id;
END$$
DELIMITER ;
使用ClickHouse进行实时分析:
-- ClickHouse表结构
CREATE TABLE orders_analytics (
order_id UInt64,
user_id UInt64,
product_id UInt64,
amount Decimal(10,2),
created_at DateTime,
status String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY (user_id, created_at);
-- 实时查询
SELECT
toStartOfHour(created_at) as hour,
count() as order_count,
sum(amount) as total_amount
FROM orders_analytics
WHERE created_at >= now() - INTERVAL 1 HOUR
GROUP BY hour
ORDER BY hour;
六、监控与告警体系
6.1 监控指标
关键监控指标:
- 连接数监控
-- 监控连接数
SELECT
COUNT(*) as total_connections,
SUM(CASE WHEN COMMAND = 'Sleep' THEN 1 ELSE 0 END) as sleep_connections,
SUM(CASE WHEN COMMAND = 'Query' THEN 1 ELSE 0 END) as query_connections
FROM INFORMATION_SCHEMA.PROCESSLIST;
- 慢查询监控
-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query_log%';
SHOW VARIABLES LIKE 'long_query_time';
-- 分析慢查询
SELECT
DIGEST_TEXT,
COUNT_STAR,
SUM_TIMER_WAIT/1000000000 as total_time_sec,
AVG_TIMER_WAIT/1000000000 as avg_time_sec
FROM performance_schema.events_statements_summary_by_digest
WHERE SUM_TIMER_WAIT > 0
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
- InnoDB状态监控
-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G
-- 重点关注:
-- - Transactions: 事务状态
-- - Row operations: 行操作统计
-- - Buffer pool hit rate: 缓冲池命中率
6.2 监控工具集成
Prometheus + Grafana监控方案:
# prometheus.yml 配置
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysql-exporter:9104']
metrics_path: '/metrics'
params:
collect[]:
- global_status
- global_variables
- slave_status
- innodb_metrics
- performance_schema.tablelocks
- performance_schema.table_io_waits_summary_by_table
MySQL Exporter配置:
# 启动MySQL Exporter
docker run -d \
--name mysql-exporter \
-p 9104:9104 \
-e DATA_SOURCE_NAME="user:password@(mysql:3306)/" \
prom/mysqld-exporter \
--collect.global_status \
--collect.global_variables \
--collect.slave_status \
--collect.innodb_metrics \
--collect.performance_schema.tablelocks \
--collect.performance_schema.table_io_waits_summary_by_table
Grafana Dashboard配置示例:
{
"dashboard": {
"title": "MySQL Performance Dashboard",
"panels": [
{
"title": "Connections",
"targets": [
{
"expr": "mysql_global_status_threads_connected",
"legendFormat": "Connected"
},
{
"expr": "mysql_global_variables_max_connections",
"legendFormat": "Max"
}
]
},
{
"title": "Query Performance",
"targets": [
{
"expr": "rate(mysql_global_status_queries[5m])",
"legendFormat": "Queries/sec"
},
{
"expr": "rate(mysql_global_status_slow_queries[5m])",
"legendFormat": "Slow Queries/sec"
}
]
}
]
}
}
6.3 告警规则
Prometheus告警规则示例:
# mysql_alerts.yml
groups:
- name: mysql_alerts
rules:
- alert: MySQLHighConnections
expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL连接数过高"
description: "MySQL连接数已达到最大连接数的80%以上"
- alert: MySQLSlowQueries
expr: rate(mysql_global_status_slow_queries[5m]) > 10
for: 2m
labels:
severity: critical
annotations:
summary: "MySQL慢查询过多"
description: "MySQL每秒慢查询超过10个"
- alert: MySQLReplicationLag
expr: mysql_slave_lag_seconds > 30
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL复制延迟"
description: "MySQL从库复制延迟超过30秒"
七、实战案例:电商秒杀系统优化
7.1 系统架构设计
用户请求 → Nginx负载均衡 → 应用服务器集群 → Redis集群 → MySQL主从集群
↓
消息队列(RabbitMQ/Kafka)
↓
异步订单处理服务
↓
MySQL分库分表(订单表)
7.2 关键代码实现
Redis预减库存:
@Component
public class RedisStockService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 预减库存
* @param productId 商品ID
* @param quantity 数量
* @return true: 成功,false: 失败
*/
public boolean preReduceStock(Long productId, int quantity) {
String key = "seckill:stock:" + productId;
// 使用Lua脚本保证原子性
String luaScript =
"if redis.call('exists', KEYS[1]) == 1 then " +
" local stock = tonumber(redis.call('get', KEYS[1])); " +
" if stock >= tonumber(ARGV[1]) then " +
" redis.call('decrby', KEYS[1], ARGV[1]); " +
" return 1; " +
" else " +
" return 0; " +
" end " +
"else " +
" return 0; " +
"end";
RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), String.valueOf(quantity));
return result != null && result == 1;
}
/**
* 回滚库存
* @param productId 商品ID
* @param quantity 数量
*/
public void rollbackStock(Long productId, int quantity) {
String key = "seckill:stock:" + productId;
redisTemplate.opsForValue().increment(key, quantity);
}
}
分布式锁实现:
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String LOCK_PREFIX = "lock:";
private static final long DEFAULT_TIMEOUT = 30000; // 30秒
/**
* 获取分布式锁
* @param lockKey 锁键
* @param timeout 超时时间(毫秒)
* @return 锁标识
*/
public String acquireLock(String lockKey, long timeout) {
String lockId = UUID.randomUUID().toString();
String fullLockKey = LOCK_PREFIX + lockKey;
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(fullLockKey, lockId, 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(acquired)) {
return lockId;
}
// 等待一段时间后重试
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return null;
}
/**
* 释放分布式锁
* @param lockKey 锁键
* @param lockId 锁标识
*/
public void releaseLock(String lockKey, String lockId) {
String fullLockKey = LOCK_PREFIX + lockKey;
// 使用Lua脚本保证原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]); " +
"else " +
" return 0; " +
"end";
RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
redisTemplate.execute(script, Collections.singletonList(fullLockKey), lockId);
}
}
订单分片策略:
@Component
public class OrderShardingService {
/**
* 根据用户ID计算分片
* @param userId 用户ID
* @return 分片索引
*/
public int calculateShard(Long userId) {
// 使用一致性哈希算法
int hash = userId.hashCode();
int shard = Math.abs(hash) % 4; // 4个分片
return shard;
}
/**
* 获取分片数据源
* @param shard 分片索引
* @return 数据源名称
*/
public String getDataSourceName(int shard) {
return "order_ds_" + shard;
}
/**
* 获取分片表名
* @param shard 分片索引
* @return 表名
*/
public String getTableName(int shard) {
return "order_" + shard;
}
}
八、总结与最佳实践
8.1 高并发优化检查清单
架构层面
- [ ] 是否采用读写分离?
- [ ] 是否需要分库分表?
- [ ] 是否引入缓存层?
- [ ] 是否使用消息队列削峰?
数据库配置
- [ ] 缓冲池大小是否合理?
- [ ] 连接数配置是否足够?
- [ ] 日志文件大小是否合适?
- [ ] 索引是否优化?
应用层
- [ ] 连接池配置是否合理?
- [ ] 是否使用批量操作?
- [ ] 是否避免SELECT *?
- [ ] 是否优化了JOIN查询?
监控告警
- [ ] 是否监控连接数?
- [ ] 是否监控慢查询?
- [ ] 是否监控复制延迟?
- [ ] 是否设置告警规则?
8.2 性能优化原则
- 先监控,后优化:没有数据支撑的优化是盲目的
- 分层优化:从架构、配置、代码逐层优化
- 渐进式优化:每次只优化一个点,验证效果
- 保持简单:避免过度设计,选择最合适的方案
- 持续迭代:性能优化是一个持续的过程
8.3 常见误区
- 盲目增加硬件:硬件升级不能解决所有问题
- 过度索引:索引会增加写操作开销
- 忽视锁竞争:锁是高并发的隐形杀手
- 缓存滥用:缓存不是银弹,需要合理设计
- 忽略监控:没有监控的系统是不可靠的
通过以上策略的综合应用,可以显著提升MySQL在高并发场景下的性能表现。记住,没有一劳永逸的解决方案,需要根据具体业务场景和数据特点,持续监控、分析和优化。
