引言:理解高并发场景下的MySQL挑战
在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库之一,在高并发环境下常常面临性能瓶颈和稳定性挑战。
高并发对MySQL的主要影响包括:
- 连接数耗尽:大量并发请求导致数据库连接池耗尽
- CPU负载过高:复杂的查询和频繁的索引查找消耗大量CPU资源
- I/O瓶颈:频繁的磁盘读写操作导致I/O等待
- 锁竞争:行锁、表锁等资源竞争导致事务阻塞
- 内存不足:缓冲池命中率下降,频繁的磁盘交换
本文将从架构设计、SQL优化、配置调优、缓存策略等多个维度,提供一套完整的MySQL高并发处理实战指南。
一、架构层面的优化策略
1.1 读写分离架构
读写分离是应对高并发最有效的架构模式之一。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库压力。
实现方案:
- 主库(Master):处理所有写操作(INSERT、UPDATE、DELETE)
- 从库(Slave):处理所有读操作(SELECT)
- 中间件:使用ProxySQL、MyCat或ShardingSphere进行路由
代码示例:使用Spring Boot实现读写分离
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(routingDataSource());
return sessionFactory.getObject();
}
}
// 数据源上下文管理器
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSource(String dataSource) {
contextHolder.set(dataSource);
}
public static String getDataSource() {
return contextHolder.get("master");
}
public static void clear() {
contextHolder.remove();
}
}
// AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
@Before("execution(* com.example.service.*.*(..))")
public void before(JoinPoint joinPoint) {
String methodName = joinPoint.getSignature().getName();
if (methodName.startsWith("get") || methodName.startsWith("list") || methodName.startsWith("find")) {
DataSourceContextHolder.setDataSource("slave");
} else {
DataSourceContextHolder.setDataSource("master");
}
}
@After("execution(* com.example.service.*.*(..))")
public void after(JoinPoint joinPoint) {
DataSourceContextHolder.clear();
}
}
1.2 分库分表策略
当单表数据量超过千万级别或并发量极高时,需要考虑分库分表。
水平分表示例:
-- 用户表按用户ID取模分表
-- user_0, user_1, user_2, user_3
-- 创建分表结构
CREATE TABLE user_0 (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
created_at TIMESTAMP,
INDEX idx_username (username)
) ENGINE=InnoDB;
-- 分表规则:user_id % 4
-- 查询时根据user_id计算表名
分库分表中间件配置(ShardingSphere):
# sharding.yaml
dataSources:
ds_0: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_0
username: root
password: password
ds_1: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_1
username: root
password: password
shardingRule:
tables:
user:
actualDataNodes: ds_${0..1}.user_${0..3}
tableStrategy:
inline:
shardingColumn: user_id
algorithmExpression: user_${user_id % 4}
databaseStrategy:
inline:
shardingColumn: user_id
algorithmExpression: ds_${user_id % 2}
bindingTables:
- user
1.3 连接池优化
连接池是应用与数据库之间的缓冲层,合理的配置可以显著提升性能。
HikariCP最佳配置示例:
@Configuration
public class HikariConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
// 连接池大小配置
dataSource.setMaximumPoolSize(50); // 最大连接数
dataSource.setMinimumIdle(10); // 最小空闲连接
dataSource.setConnectionTimeout(30000); // 连接超时30秒
dataSource.setIdleTimeout(600000); // 空闲超时10分钟
dataSource.setMaxLifetime(1800000); // 连接最大存活时间30分钟
// 性能优化配置
dataSource.setLeakDetectionThreshold(60000); // 泄漏检测60秒
dataSource.setInitializationFailTimeout(1); // 初始化失败立即抛出异常
// 连接测试
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
dataSource.setValidationTimeout(3000);
dataSource.setConnectionTestQuery("SELECT 1");
return dataSource;
}
}
连接池配置参数说明:
| 参数 | 推荐值 | 说明 |
|---|---|---|
| maximumPoolSize | CPU核心数 * 2 + 1 | 根据实际业务调整,一般不超过100 |
| minimumIdle | 与maximumPoolSize相同 | 避免频繁创建销毁连接 |
| connectionTimeout | 30秒 | 连接获取超时时间 |
| idleTimeout | 10分钟 | 空闲连接回收时间 |
| maxLifetime | 30分钟 | 连接最大存活时间 |
二、SQL优化策略
2.1 索引优化
索引是提升查询性能的关键,但不当的索引会成为性能杀手。
索引优化原则:
- 遵循最左前缀原则
- 避免索引失效的写法
- 控制索引数量(单表不超过5个)
- 使用覆盖索引减少回表
索引优化实战:
-- 原始低效查询
SELECT * FROM orders WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';
-- 优化步骤1:创建复合索引
CREATE INDEX idx_user_status_created ON orders(user_id, status, created_at);
-- 优化步骤2:使用覆盖索引(避免SELECT *)
SELECT order_id, amount, created_at
FROM orders
WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';
-- 优化步骤3:使用EXPLAIN分析
EXPLAIN SELECT order_id, amount, created_at
FROM orders
WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';
索引失效的常见情况:
-- 1. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000; -- phone是VARCHAR类型,索引失效
-- 正确写法
SELECT * FROM users WHERE phone = '13800138000';
-- 2. 函数操作
SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01'; -- 索引失效
-- 正确写法
SELECT * FROM orders WHERE created_at >= '2024-01-01 00:00:00' AND created_at < '2024-01-02 00:00:00';
-- 3. LIKE以%开头
SELECT * FROM users WHERE username LIKE '%john%'; -- 索引失效
-- 正确写法(如果必须这样,考虑全文索引)
SELECT * FROM users WHERE username LIKE 'john%'; -- 索引有效
-- 4. OR条件(部分版本优化不佳)
SELECT * FROM orders WHERE user_id = 123 OR amount > 1000; -- 可能索引失效
-- 优化写法
SELECT * FROM orders WHERE user_id = 123
UNION ALL
SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;
2.2 查询语句优化
避免SELECT *:
-- 反例:查询所有列
SELECT * FROM products WHERE category_id = 1;
-- 正例:只查询需要的列
SELECT product_id, name, price, stock FROM products WHERE category_id = 1;
-- 为什么?减少网络传输和内存消耗,可能使用覆盖索引
分页查询优化:
-- 普通分页(大数据量时性能差)
SELECT * FROM orders ORDER BY created_at DESC LIMIT 1000000, 20;
-- 优化方案1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
ORDER BY created_at DESC
LIMIT 1000000, 20
) t ON o.order_id = t.order_id;
-- 优化方案2:记录上次位置(游标分页)
SELECT * FROM orders
WHERE created_at < '2024-01-01 10:00:00' -- 上次最后一条的时间
ORDER BY created_at DESC
LIMIT 20;
JOIN优化:
-- 反例:多表JOIN且没有索引
SELECT u.*, o.*, p.*
FROM users u
JOIN orders o ON u.user_id = o.user_id
JOIN products p ON o.product_id = p.product_id
WHERE u.username = 'john';
-- 正例:小表驱动大表,确保JOIN字段有索引
-- 步骤1:先过滤小表
SELECT u.user_id FROM users u WHERE u.username = 'john';
-- 步骤2:再关联大表
SELECT o.*, p.*
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.user_id IN (SELECT user_id FROM users WHERE username = 'john');
2.3 事务优化
事务设计原则:
- 事务尽可能短
- 避免在事务中执行耗时操作
- 合理设置隔离级别
- 避免大事务
事务优化示例:
// 反例:大事务(长时间持有锁)
@Transactional
public void processOrder(Long orderId) {
// 1. 查询订单
Order order = orderRepository.findById(orderId);
// 2. 调用外部API(耗时操作)
paymentService.verifyPayment(order.getPaymentId()); // 网络IO,耗时
// 3. 更新库存
inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
// 4. 更新订单状态
order.setStatus("COMPLETED");
orderRepository.save(order);
}
// 正例:拆分事务,移出耗时操作
public void processOrder(Long orderId) {
// 1. 先执行非事务性操作
paymentService.verifyPayment(order.getPaymentId());
// 2. 小事务处理核心逻辑
transactionTemplate.execute(status -> {
Order order = orderRepository.findById(orderId);
inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
order.setStatus("COMPLETED");
orderRepository.save(order);
return null;
});
}
三、MySQL配置调优
3.1 核心参数配置
my.cnf 关键配置:
[mysqld]
# === 连接配置 ===
max_connections = 1000 # 最大连接数,根据业务调整
back_log = 500 # 连接队列长度
wait_timeout = 600 # 非交互连接超时时间(秒)
interactive_timeout = 600 # 交互连接超时时间(秒)
# === 缓冲池配置 ===
innodb_buffer_pool_size = 8G # 缓冲池大小(物理内存的50-75%)
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G # 重做日志文件大小
innodb_log_buffer_size = 64M # 重做日志缓冲区大小
# === 性能配置 ===
innodb_flush_log_at_trx_commit = 1 # 1:每次提交刷盘(安全)2:每秒刷盘(高性能)
innodb_flush_method = O_DIRECT # 绕过OS缓存,直接IO
innodb_io_capacity = 2000 # InnoDB每秒IO操作数(SSD可设更高)
innodb_read_io_threads = 8 # 读线程数
innodb_write_io_threads = 8 # 写线程数
# === 并发配置 ===
innodb_thread_concurrency = 32 # InnoDB并发线程数
innodb_lock_wait_timeout = 50 # 锁等待超时(秒)
innodb_rollback_on_timeout = OFF # 超时是否回滚
# === 查询缓存(MySQL 8.0已移除)===
query_cache_type = 0 # 关闭查询缓存
query_cache_size = 0
# === 日志配置 ===
slow_query_log = 1 # 开启慢查询日志
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2 # 慢查询阈值(秒)
log_queries_not_using_indexes = 1 # 记录未使用索引的查询
# === 临时表配置 ===
tmp_table_size = 256M # 临时表大小
max_heap_table_size = 256M # 内存表最大大小
3.2 监控与诊断
实时监控脚本:
#!/bin/bash
# MySQL性能监控脚本
MYSQL_CMD="mysql -u root -p'password' -e"
while true; do
echo "=== MySQL Performance Monitor $(date) ==="
# 1. 当前连接数
$MYSQL_CMD "SHOW STATUS LIKE 'Threads_connected';"
# 2. 活跃线程数
$MYSQL_CMD "SHOW STATUS LIKE 'Threads_running';"
# 3. 缓冲池命中率
$MYSQL_CMD "SELECT
(1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS buffer_hit_rate
FROM performance_schema.global_status
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';"
# 4. 慢查询数量
$MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Slow_queries';"
# 5. 锁等待情况
$MYSQL_CMD "SELECT * FROM performance_schema.data_lock_waits LIMIT 5;"
sleep 10
done
慢查询分析:
-- 开启慢查询日志后,使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
-- 或使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
-- 在MySQL中分析慢查询
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 as avg_time_sec,
SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
四、缓存策略
4.1 多级缓存架构
缓存层次:
- 客户端缓存(浏览器/APP)
- CDN缓存
- 应用层缓存(Redis)
- 数据库缓存(Query Cache,MySQL 8.0已移除)
4.2 Redis缓存实战
缓存设计模式:
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String PRODUCT_CACHE_KEY = "product:%s";
private static final long CACHE_TTL = 3600; // 1小时
// 方案1:Cache Aside模式
public Product getProductById(Long id) {
String key = String.format(PRODUCT_CACHE_KEY, id);
// 1. 先读缓存
Product product = (Product) redisTemplate.opsForValue().get(key);
if (product != null) {
return product;
}
// 2. 缓存未命中,读数据库
product = productRepository.findById(id);
if (product != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(key, product, CACHE_TTL, TimeUnit.SECONDS);
}
return product;
}
// 更新数据时的缓存处理
@Transactional
public void updateProduct(Product product) {
// 1. 更新数据库
productRepository.save(product);
// 2. 删除缓存(避免并发问题)
String key = String.format(PRODUCT_CACHE_KEY, product.getId());
redisTemplate.delete(key);
}
// 方案2:缓存穿透保护
public Product getProductWithPenetrationProtection(Long id) {
String key = String.format(PRODUCT_CACHE_KEY, id);
// 1. 查询缓存
Object cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
// 如果是空值标记,返回null
if (cached instanceof String && "NULL".equals(cached)) {
return null;
}
return (Product) cached;
}
// 2. 查询数据库
Product product = productRepository.findById(id);
// 3. 缓存结果(包括空值)
if (product != null) {
redisTemplate.opsForValue().set(key, product, CACHE_TTL, TimeUnit.SECONDS);
} else {
// 缓存空值,防止缓存穿透(设置较短TTL)
redisTemplate.opsForValue().set(key, "NULL", 60, TimeUnit.SECONDS);
}
return product;
}
// 方案3:缓存雪崩保护(随机TTL)
public void setProductCacheWithRandomTTL(Long id, Product product) {
String key = String.format(PRODUCT_CACHE_KEY, id);
// 基础TTL + 随机值(0-300秒)
long ttl = CACHE_TTL + (long) (Math.random() * 300);
redisTemplate.opsForValue().set(key, product, ttl, TimeUnit.SECONDS);
}
}
Redis配置优化:
# application-redis.yml
spring:
redis:
host: localhost
port: 6379
password: your_password
database: 0
timeout: 5000ms
lettuce:
pool:
max-active: 100
max-idle: 50
min-idle: 10
max-wait: 5000ms
4.3 缓存与数据库一致性
最终一致性方案:
// 使用消息队列保证最终一致性
@Service
public class CacheUpdateService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 更新数据库后发送消息
public void updateProductWithCache(Product product) {
// 1. 更新数据库
productRepository.save(product);
// 2. 发送缓存更新消息
CacheMessage message = new CacheMessage();
message.setEntityType("PRODUCT");
message.setEntityId(product.getId());
message.setAction("UPDATE");
kafkaTemplate.send("cache-update-topic", JSON.toJSONString(message));
}
// 消费者监听消息,更新缓存
@KafkaListener(topics = "cache-update-topic")
public void handleCacheUpdate(String messageStr) {
CacheMessage message = JSON.parseObject(messageStr, CacheMessage.class);
if ("PRODUCT".equals(message.getEntityId())) {
String key = String.format(PRODUCT_CACHE_KEY, message.getEntityId());
// 延迟双删策略
redisTemplate.delete(key);
// 延迟后再次删除(防止并发写入时的旧数据)
scheduledExecutorService.schedule(() -> {
redisTemplate.delete(key);
}, 500, TimeUnit.MILLISECONDS);
}
}
}
五、高级优化技巧
5.1 并发控制
乐观锁实现:
-- 表结构增加版本号字段
CREATE TABLE product (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
stock INT,
version INT DEFAULT 0 -- 版本号
);
-- 更新时检查版本号
UPDATE product
SET stock = stock - 1, version = version + 1
WHERE id = 123 AND version = 5; -- 原子操作
-- 如果更新失败(版本号不匹配),重试或返回错误
悲观锁实现:
-- 显式加锁
BEGIN;
SELECT * FROM inventory WHERE product_id = 123 FOR UPDATE;
-- 执行业务逻辑
UPDATE inventory SET stock = stock - 1 WHERE product_id = 123;
COMMIT;
-- 注意:FOR UPDATE会阻塞其他事务的写操作,需谨慎使用
5.2 分区表
Range分区示例:
-- 按时间分区
CREATE TABLE logs (
id BIGINT,
log_time DATETIME,
message TEXT,
INDEX idx_time (log_time)
) PARTITION BY RANGE (YEAR(log_time) * 100 + MONTH(log_time)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 查询时自动分区裁剪
SELECT * FROM logs WHERE log_time >= '2024-02-01' AND log_time < '2024-03-01';
-- 只扫描p202402分区
5.3 并发插入优化
批量插入优化:
// 反例:逐条插入
for (Order order : orders) {
orderRepository.save(order); // 每次都要网络往返
}
// 正例:批量插入
public void batchInsert(List<Order> orders) {
String sql = "INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
conn.setAutoCommit(false); // 关闭自动提交
for (int i = 0; i < orders.size(); i++) {
Order order = orders.get(i);
ps.setLong(1, order.getOrderId());
ps.setLong(2, order.getUserId());
ps.setBigDecimal(3, order.getAmount());
ps.setString(4, order.getStatus());
ps.addBatch();
// 每1000条提交一次
if (i % 1000 == 0) {
ps.executeBatch();
conn.commit();
}
}
// 提交剩余
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
// 异常处理
}
}
LOAD DATA INFILE(最高效):
-- 准备CSV文件
-- orders.csv
-- 1,1001,99.99,PAID
-- 2,1002,199.99,PAID
-- 批量导入
LOAD DATA LOCAL INFILE '/path/to/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(order_id, user_id, amount, status);
六、压力测试与监控
6.1 压力测试工具
sysbench测试:
# 安装sysbench
sudo apt-get install sysbench
# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=testdb --tables=10 --table-size=100000 \
/usr/share/sysbench/oltp_read_write.lua prepare
# 执行测试
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=testdb --tables=10 --table-size=100000 \
--threads=100 --time=300 --report-interval=10 \
/usr/share/sysbench/oltp_read_write.lua run
# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
--mysql-db=testdb \
/usr/share/sysbench/oltp_read_write.lua cleanup
6.2 监控指标
关键监控指标:
-- 1. QPS(每秒查询数)
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Questions';
-- 2. TPS(每秒事务数)
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';
-- 3. 缓冲池命中率
SELECT
(1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM performance_schema.global_status
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';
-- 4. 锁等待情况
SELECT * FROM performance_schema.data_lock_waits;
-- 5. 慢查询统计
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 as avg_time_sec
FROM performance_schema.events_statements_summary_by_digest
ORDER BY COUNT_STAR DESC
LIMIT 10;
6.3 自动化监控脚本
#!/usr/bin/env python3
# MySQL监控脚本
import pymysql
import time
import smtplib
from email.mime.text import MIMEText
class MySQLMonitor:
def __init__(self, host, user, password, db):
self.conn = pymysql.connect(
host=host, user=user, password=password, db=db
)
self.thresholds = {
'threads_connected': 800, # 连接数阈值
'slow_queries': 10, # 慢查询阈值(每分钟)
'buffer_hit_rate': 95 # 缓冲池命中率阈值
}
def check_metrics(self):
cursor = self.conn.cursor()
# 检查连接数
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
threads_connected = int(cursor.fetchone()[1])
# 检查慢查询
cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")
slow_queries = int(cursor.fetchone()[1])
# 检查缓冲池命中率
cursor.execute("""
SELECT (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100
FROM performance_schema.global_status
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads'
""")
hit_rate = float(cursor.fetchone()[0])
alerts = []
if threads_connected > self.thresholds['threads_connected']:
alerts.append(f"连接数过高: {threads_connected}")
if slow_queries > self.thresholds['slow_queries']:
alerts.append(f"慢查询过多: {slow_queries}")
if hit_rate < self.thresholds['buffer_hit_rate']:
alerts.append(f"缓冲池命中率过低: {hit_rate:.2f}%")
return alerts
def send_alert(self, message):
# 发送邮件告警
msg = MIMEText(message)
msg['Subject'] = 'MySQL性能告警'
msg['From'] = 'monitor@example.com'
msg['To'] = 'admin@example.com'
# 配置SMTP(示例)
# server = smtplib.SMTP('smtp.example.com')
# server.send_message(msg)
print(f"ALERT: {message}")
def run(self):
while True:
alerts = self.check_metrics()
if alerts:
self.send_alert('\n'.join(alerts))
time.sleep(60) # 每分钟检查一次
if __name__ == '__main__':
monitor = MySQLMonitor('localhost', 'root', 'password', 'testdb')
monitor.run()
七、实战案例:秒杀系统优化
7.1 秒杀场景特点
- 瞬时高并发:QPS可达数十万
- 库存有限:容易超卖
- 读多写少:查询商品信息远多于下单
7.2 完整优化方案
1. 架构设计:
用户请求 → Nginx → 应用服务器 → Redis缓存 → MySQL
2. 核心代码实现:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private OrderRepository orderRepository;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String STOCK_KEY = "seckill:stock:%s";
private static final String ORDER_KEY = "seckill:order:%s:%s"; // userId:productId
/**
* 秒杀下单
*/
public SeckillResult seckill(Long productId, Long userId) {
String stockKey = String.format(STOCK_KEY, productId);
String orderKey = String.format(ORDER_KEY, userId, productId);
// 1. 检查是否已下单(防重复)
if (redisTemplate.hasKey(orderKey)) {
return SeckillResult.fail("您已参与过秒杀");
}
// 2. 预扣库存(Redis原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("库存不足");
}
// 3. 标记已下单
redisTemplate.opsForValue().set(orderKey, "1", 30, TimeUnit.MINUTES);
// 4. 发送消息到队列,异步创建订单
OrderMessage message = new OrderMessage();
message.setProductId(productId);
message.setUserId(userId);
message.setTimestamp(System.currentTimeMillis());
kafkaTemplate.send("seckill-order", JSON.toJSONString(message));
return SeckillResult.success("秒杀成功,订单处理中");
}
/**
* 异步处理订单(消费者)
*/
@KafkaListener(topics = "seckill-order")
public void processOrder(String messageStr) {
OrderMessage message = JSON.parseObject(messageStr, OrderMessage.class);
try {
// 1. 检查Redis库存(双重验证)
String stockKey = String.format(STOCK_KEY, message.getProductId());
Long stock = redisTemplate.opsForValue().get(stockKey);
if (stock == null || stock < 0) {
return; // 库存不足,丢弃消息
}
// 2. 创建数据库事务
transactionTemplate.execute(status -> {
// 检查数据库库存
Product product = productRepository.findById(message.getProductId());
if (product.getStock() <= 0) {
status.setRollbackOnly();
return null;
}
// 扣减数据库库存
int updated = productRepository.decreaseStock(message.getProductId());
if (updated == 0) {
status.setRollbackOnly();
return null;
}
// 创建订单
Order order = new Order();
order.setProductId(message.getProductId());
order.setUserId(message.getUserId());
order.setStatus("CREATED");
order.setAmount(product.getPrice());
orderRepository.save(order);
return order;
});
} catch (Exception e) {
// 异常时回滚Redis库存
redisTemplate.opsForValue().increment(stockKey);
log.error("订单处理失败", e);
}
}
}
3. 数据库表结构优化:
-- 商品表
CREATE TABLE product (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10,2),
stock INT,
version INT DEFAULT 0, -- 乐观锁版本号
INDEX idx_stock (stock) -- 库存索引
) ENGINE=InnoDB;
-- 订单表(分表)
CREATE TABLE order_0 (
order_id BIGINT PRIMARY KEY,
product_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP,
INDEX idx_user (user_id),
INDEX idx_product (product_id)
) ENGINE=InnoDB;
-- 秒杀记录表(只记录成功订单)
CREATE TABLE seckill_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
product_id BIGINT,
user_id BIGINT,
order_id BIGINT,
created_at TIMESTAMP,
UNIQUE KEY uk_user_product (user_id, product_id)
) ENGINE=InnoDB;
4. MySQL配置调整(秒杀场景):
[mysqld]
# 降低连接超时,快速回收连接
wait_timeout = 30
interactive_timeout = 30
# 增大重做日志,减少刷盘频率
innodb_log_file_size = 4G
innodb_log_buffer_size = 128M
# 提升I/O性能
innodb_flush_log_at_trx_commit = 2 # 每秒刷盘(性能优先)
innodb_io_capacity = 5000 # SSD配置
# 连接数
max_connections = 2000
max_connect_errors = 100000
八、总结与最佳实践
8.1 高并发处理黄金法则
- 缓存为王:90%的查询应该在缓存层解决
- 异步化:耗时操作异步处理,提升响应速度
- 限流降级:保护数据库不被压垮
- 读写分离:分散读压力
- 分库分表:数据量过大时的终极方案
8.2 性能优化检查清单
- [ ] 是否使用了合适的索引?
- [ ] 是否避免了SELECT *?
- [ ] 是否使用了连接池?
- [ ] 是否配置了合适的缓冲池大小?
- [ ] 是否开启了慢查询日志?
- [ ] 是否使用了Redis缓存?
- [ ] 是否实现了读写分离?
- [ ] 是否进行了压力测试?
- [ ] 是否有监控告警?
8.3 持续优化建议
- 定期分析慢查询:每周review慢查询日志
- 监控关键指标:QPS、TPS、连接数、缓冲池命中率
- 压力测试:每次大促前进行全链路压测
- 容量规划:根据业务增长提前扩容
- 故障演练:定期进行故障注入测试
通过以上策略的综合应用,可以有效应对高并发场景下的MySQL性能瓶颈,保障系统稳定运行。记住,优化是一个持续的过程,需要根据实际业务场景不断调整和完善。
