引言
在当今互联网应用中,高并发场景已成为常态。无论是电商平台的秒杀活动、社交网络的热点事件,还是金融系统的交易高峰,MySQL作为最流行的关系型数据库之一,都面临着巨大的性能挑战。当并发请求量激增时,数据库响应时间变长、CPU和I/O资源紧张、甚至出现连接数耗尽等问题,严重影响用户体验和系统稳定性。
本文将从实战角度出发,系统性地探讨MySQL高并发处理的完整策略。我们将从最基础的索引优化开始,逐步深入到查询优化、架构升级等高级话题,并结合具体案例和代码示例,帮助您构建一个能够应对高并发挑战的MySQL系统。
一、索引优化:高并发的基石
1.1 索引的基本原理与类型
索引是MySQL中提高查询性能最有效的手段之一。它类似于书籍的目录,能够帮助数据库快速定位到所需的数据行,避免全表扫描。
MySQL支持多种索引类型:
- B+树索引:最常用的索引类型,适用于等值查询和范围查询
- 哈希索引:仅适用于精确匹配查询,不支持范围查询
- 全文索引:用于文本内容的搜索
- 空间索引:用于地理空间数据
1.2 索引优化实战
1.2.1 选择合适的列创建索引
原则:选择WHERE子句中频繁使用的列、JOIN条件中的列、ORDER BY和GROUP BY中的列。
案例:电商系统的订单查询
-- 原始查询(无索引)
SELECT * FROM orders
WHERE user_id = 12345 AND status = 'paid'
ORDER BY create_time DESC
LIMIT 20;
-- 优化方案1:创建复合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time);
-- 优化方案2:如果查询经常变化,可以创建覆盖索引
CREATE INDEX idx_cover_user_status_time ON orders(user_id, status, create_time, order_id, amount);
1.2.2 避免索引失效的常见场景
- 索引列参与运算 “`sql – 错误示例:索引失效 SELECT * FROM users WHERE YEAR(create_time) = 2023;
– 正确示例:使用范围查询 SELECT * FROM users WHERE create_time >= ‘2023-01-01’ AND create_time < ‘2024-01-01’;
2. **前缀模糊查询**
```sql
-- 错误示例:索引失效
SELECT * FROM products WHERE name LIKE '%手机';
-- 正确示例:使用前缀索引
SELECT * FROM products WHERE name LIKE '手机%';
- OR条件导致索引失效 “`sql – 错误示例:索引可能失效 SELECT * FROM orders WHERE user_id = 123 OR amount > 1000;
– 正确示例:使用UNION ALL SELECT * FROM orders WHERE user_id = 123 UNION ALL SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;
### 1.3 索引维护与监控
定期检查索引的使用情况,删除冗余索引:
```sql
-- 查看索引使用情况
SELECT
table_name,
index_name,
stat_value,
stat_description
FROM mysql.innodb_index_stats
WHERE database_name = 'your_database'
AND stat_name = 'size';
-- 查看未使用的索引(需要开启performance_schema)
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_READ,
COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
AND INDEX_NAME IS NOT NULL
ORDER BY COUNT_READ + COUNT_WRITE DESC;
二、查询优化:让SQL飞起来
2.1 避免全表扫描
全表扫描是高并发场景下的性能杀手。通过EXPLAIN命令可以分析查询计划:
EXPLAIN SELECT * FROM orders WHERE user_id = 12345;
输出示例:
+----+-------------+--------+------+---------------+------+---------+------+------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+--------+------+---------------+------+---------+------+------+-------------+
| 1 | SIMPLE | orders | ref | idx_user_id | idx_user_id | 5 | const| 100 | Using where |
+----+-------------+--------+------+---------------+------+---------+------+------+-------------+
关键指标:
- type:访问类型,ALL表示全表扫描,应避免
- key:实际使用的索引
- rows:预估扫描行数
- Extra:额外信息,如Using filesort(文件排序)需要优化
2.2 分页查询优化
高并发场景下,深度分页(如LIMIT 100000, 20)会导致严重性能问题:
-- 问题SQL:深度分页性能差
SELECT * FROM orders ORDER BY create_time DESC LIMIT 100000, 20;
-- 优化方案1:使用覆盖索引+延迟关联
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
ORDER BY create_time DESC
LIMIT 100000, 20
) t ON o.order_id = t.order_id;
-- 优化方案2:使用游标分页(推荐)
-- 假设上一页最后一条记录的create_time是'2023-10-01 12:00:00'
SELECT * FROM orders
WHERE create_time < '2023-10-01 12:00:00'
ORDER BY create_time DESC
LIMIT 20;
2.3 JOIN优化
在高并发场景下,JOIN操作容易成为瓶颈:
-- 问题SQL:多表JOIN性能差
SELECT o.*, u.username, p.product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id
WHERE o.status = 'paid';
-- 优化方案1:减少JOIN表数量,使用冗余字段
-- 在orders表中冗余username和product_name字段
-- 优化方案2:使用EXISTS替代JOIN
SELECT o.*
FROM orders o
WHERE o.status = 'paid'
AND EXISTS (SELECT 1 FROM users u WHERE u.id = o.user_id)
AND EXISTS (SELECT 1 FROM products p WHERE p.id = o.product_id);
-- 优化方案3:使用临时表
CREATE TEMPORARY TABLE temp_orders AS
SELECT * FROM orders WHERE status = 'paid';
SELECT t.*, u.username, p.product_name
FROM temp_orders t
JOIN users u ON t.user_id = u.id
JOIN products p ON t.product_id = p.id;
三、事务与锁优化
3.1 事务隔离级别选择
MySQL支持四种事务隔离级别:
- READ UNCOMMITTED:读未提交,可能出现脏读
- READ COMMITTED:读已提交,MySQL默认级别
- REPEATABLE READ:可重复读,InnoDB默认级别
- SERIALIZABLE:串行化,性能最差
高并发场景建议:
- 一般业务:READ COMMITTED(减少锁竞争)
- 金融业务:REPEATABLE READ(保证数据一致性)
- 避免使用SERIALIZABLE
-- 设置事务隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 查看当前隔离级别
SELECT @@transaction_isolation;
3.2 锁优化策略
3.2.1 减少锁持有时间
-- 问题示例:长时间持有锁
START TRANSACTION;
-- 复杂业务逻辑
UPDATE orders SET status = 'processed' WHERE id = 123;
-- 其他耗时操作
COMMIT;
-- 优化方案:拆分事务
-- 步骤1:快速更新
UPDATE orders SET status = 'processed' WHERE id = 123;
-- 步骤2:异步处理其他业务
-- 使用消息队列或定时任务
3.2.2 避免死锁
-- 死锁示例:两个事务交叉更新
-- 事务A
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
-- 事务B(同时执行)
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
COMMIT;
-- 优化方案:固定更新顺序
-- 事务A和B都按id升序更新
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
3.3 悲观锁与乐观锁
悲观锁:使用SELECT … FOR UPDATE
-- 悲观锁示例:秒杀场景
START TRANSACTION;
-- 锁定记录
SELECT * FROM products WHERE id = 1 FOR UPDATE;
-- 检查库存
IF stock > 0 THEN
UPDATE products SET stock = stock - 1 WHERE id = 1;
INSERT INTO orders (...) VALUES (...);
END IF;
COMMIT;
乐观锁:使用版本号
-- 乐观锁示例
-- 表结构增加version字段
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(100),
stock INT,
version INT DEFAULT 0
);
-- 更新操作
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 0;
-- 检查影响行数
-- 如果影响行数为0,说明数据已被修改,需要重试
四、架构升级:从单机到分布式
4.1 读写分离
读写分离是提升MySQL并发能力的基础架构优化。
架构图:
应用层
↓
负载均衡器
↓
主库(写) ← 同步 → 从库(读)
实现方案:
- 使用中间件:如ProxySQL、MyCat
- 应用层实现:使用Spring的AbstractRoutingDataSource
代码示例(Spring Boot + MyBatis):
// 数据源配置
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
DynamicDataSource routingDataSource = new DynamicDataSource();
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
// 动态数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
// 数据源上下文
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
// 使用示例
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
// 写操作使用主库
@Transactional
public void createOrder(Order order) {
DataSourceContextHolder.setDataSourceType("master");
orderMapper.insert(order);
DataSourceContextHolder.clearDataSourceType();
}
// 读操作使用从库
public Order getOrder(Long id) {
DataSourceContextHolder.setDataSourceType("slave");
Order order = orderMapper.selectById(id);
DataSourceContextHolder.clearDataSourceType();
return order;
}
}
4.2 分库分表
当单表数据量超过千万级时,需要考虑分库分表。
4.2.1 垂直分表
将大表拆分为多个小表,按业务维度拆分:
-- 原始大表
CREATE TABLE user_info (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
password VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(20),
address VARCHAR(200),
-- 其他字段...
create_time DATETIME
);
-- 垂直拆分后
CREATE TABLE user_base (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
password VARCHAR(100),
email VARCHAR(100),
phone VARCHAR(20)
);
CREATE TABLE user_profile (
id BIGINT PRIMARY KEY,
address VARCHAR(200),
-- 其他扩展字段...
create_time DATETIME
);
4.2.2 水平分表(分片)
按某个维度(如用户ID、时间)将数据分布到多个表中。
分片策略:
- 范围分片:按ID范围分片
- 哈希分片:按用户ID哈希取模
- 一致性哈希:解决扩容问题
代码示例(分片路由):
public class ShardingRouter {
// 分片数量
private static final int SHARD_COUNT = 4;
/**
* 根据用户ID计算分片表名
* @param userId 用户ID
* @return 分片表名,如 orders_0, orders_1, ...
*/
public static String getShardTableName(String tableName, Long userId) {
int shardIndex = (int) (userId % SHARD_COUNT);
return tableName + "_" + shardIndex;
}
/**
* 批量查询时,需要查询所有分片
*/
public static List<String> getAllShardTableNames(String tableName) {
List<String> tableNames = new ArrayList<>();
for (int i = 0; i < SHARD_COUNT; i++) {
tableNames.add(tableName + "_" + i);
}
return tableNames;
}
}
// 使用示例
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
public void createOrder(Long userId, Order order) {
String tableName = ShardingRouter.getShardTableName("orders", userId);
String sql = String.format(
"INSERT INTO %s (user_id, amount, status) VALUES (?, ?, ?)",
tableName
);
jdbcTemplate.update(sql, userId, order.getAmount(), order.getStatus());
}
public List<Order> getUserOrders(Long userId) {
String tableName = ShardingRouter.getShardTableName("orders", userId);
String sql = String.format("SELECT * FROM %s WHERE user_id = ?", tableName);
return jdbcTemplate.query(sql, new Object[]{userId}, new OrderRowMapper());
}
}
4.3 分布式事务
在分库分表场景下,需要处理分布式事务问题。
4.3.1 两阶段提交(2PC)
// 伪代码示例
public class TwoPhaseCommit {
public boolean executeTransaction(List<DatabaseOperation> operations) {
// 阶段1:准备
List<Boolean> prepareResults = new ArrayList<>();
for (DatabaseOperation op : operations) {
try {
boolean prepared = op.prepare();
prepareResults.add(prepared);
} catch (Exception e) {
// 回滚所有已准备的操作
rollbackAll(operations);
return false;
}
}
// 阶段2:提交或回滚
if (prepareResults.contains(false)) {
rollbackAll(operations);
return false;
} else {
commitAll(operations);
return true;
}
}
}
4.3.2 最终一致性方案
使用消息队列实现最终一致性:
// 伪代码示例
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void createOrder(Order order) {
// 1. 本地事务:创建订单
orderMapper.insert(order);
// 2. 发送消息到消息队列
OrderCreatedEvent event = new OrderCreatedEvent(order.getId());
rabbitTemplate.convertAndSend("order.exchange", "order.created", event);
// 3. 消费者处理其他业务(库存扣减、积分增加等)
}
}
// 消费者
@Component
public class OrderEventListener {
@RabbitListener(queues = "order.queue")
public void handleOrderCreated(OrderCreatedEvent event) {
// 扣减库存
inventoryService.decreaseStock(event.getOrderId());
// 增加积分
pointService.addPoints(event.getOrderId());
// 发送通知
notificationService.sendOrderNotification(event.getOrderId());
}
}
五、缓存策略:减轻数据库压力
5.1 多级缓存架构
应用层缓存(本地缓存)
↓
分布式缓存(Redis)
↓
数据库
5.2 缓存穿透、击穿、雪崩解决方案
5.2.1 缓存穿透
问题:查询不存在的数据,导致请求直接打到数据库。
解决方案:
public class CacheService {
private static final String NULL_VALUE = "NULL";
public String getProductFromCache(Long productId) {
String cacheKey = "product:" + productId;
// 1. 先查缓存
String cachedValue = redisTemplate.opsForValue().get(cacheKey);
if (cachedValue != null) {
return cachedValue.equals(NULL_VALUE) ? null : cachedValue;
}
// 2. 缓存未命中,查数据库
Product product = productMapper.selectById(productId);
// 3. 缓存结果(包括空值)
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product.toJson(), 3600, TimeUnit.SECONDS);
return product.toJson();
} else {
// 缓存空值,防止穿透
redisTemplate.opsForValue().set(cacheKey, NULL_VALUE, 60, TimeUnit.SECONDS);
return null;
}
}
}
5.2.2 缓存击穿
问题:热点key过期瞬间,大量请求同时打到数据库。
解决方案:
public class CacheService {
public String getHotProduct(Long productId) {
String cacheKey = "product:" + productId;
// 1. 先查缓存
String cachedValue = redisTemplate.opsForValue().get(cacheKey);
if (cachedValue != null) {
return cachedValue;
}
// 2. 使用分布式锁防止缓存击穿
String lockKey = "lock:" + productId;
String lockValue = UUID.randomUUID().toString();
try {
// 尝试获取锁(设置过期时间,防止死锁)
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
// 获取锁成功,查询数据库并更新缓存
Product product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, product.toJson(), 3600, TimeUnit.SECONDS);
return product.toJson();
}
} else {
// 未获取锁,等待并重试
Thread.sleep(100);
return getHotProduct(productId); // 递归重试
}
} catch (Exception e) {
// 异常处理
} finally {
// 释放锁(使用Lua脚本保证原子性)
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey), lockValue);
}
return null;
}
}
5.2.3 缓存雪崩
问题:大量缓存同时过期,导致数据库压力激增。
解决方案:
public class CacheService {
public void setProductCache(Long productId, Product product) {
String cacheKey = "product:" + productId;
// 1. 设置基础过期时间(如3600秒)
int baseExpire = 3600;
// 2. 添加随机值(如±300秒),避免同时过期
int randomExpire = baseExpire + (int)(Math.random() * 600) - 300;
// 3. 设置缓存
redisTemplate.opsForValue().set(cacheKey, product.toJson(), randomExpire, TimeUnit.SECONDS);
}
// 使用布隆过滤器防止缓存雪崩
public void initBloomFilter() {
// 初始化布隆过滤器
BloomFilter<Long> bloomFilter = BloomFilter.create(
Funnels.longFunnel(),
1000000, // 预计插入数量
0.01 // 误判率
);
// 加载所有有效ID到布隆过滤器
List<Long> allProductIds = productMapper.getAllIds();
for (Long id : allProductIds) {
bloomFilter.put(id);
}
// 存储到Redis
redisTemplate.opsForValue().set("bloom:products", bloomFilter);
}
public boolean mightContain(Long productId) {
BloomFilter<Long> bloomFilter = (BloomFilter<Long>) redisTemplate.opsForValue().get("bloom:products");
return bloomFilter != null && bloomFilter.mightContain(productId);
}
}
5.3 缓存与数据库一致性
5.3.1 Cache Aside模式
@Service
public class CacheAsideService {
public Product getProduct(Long id) {
// 1. 先查缓存
String cacheKey = "product:" + id;
String cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return JSON.parseObject(cached, Product.class);
}
// 2. 缓存未命中,查数据库
Product product = productMapper.selectById(id);
// 3. 写入缓存
if (product != null) {
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(product), 3600, TimeUnit.SECONDS);
}
return product;
}
@Transactional
public void updateProduct(Product product) {
// 1. 更新数据库
productMapper.updateById(product);
// 2. 删除缓存(而不是更新缓存)
String cacheKey = "product:" + product.getId();
redisTemplate.delete(cacheKey);
}
}
5.3.2 延迟双删策略
@Service
public class DelayDoubleDeleteService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Transactional
public void updateProductWithDelay(Product product) {
// 1. 删除缓存
String cacheKey = "product:" + product.getId();
redisTemplate.delete(cacheKey);
// 2. 更新数据库
productMapper.updateById(product);
// 3. 延迟再次删除缓存(防止主从延迟导致脏数据)
taskExecutor.submit(() -> {
try {
Thread.sleep(500); // 延迟500ms
redisTemplate.delete(cacheKey);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
六、连接池优化
6.1 连接池参数调优
HikariCP配置示例(application.yml):
spring:
datasource:
hikari:
# 连接池名称
pool-name: HikariCP-Connection-Pool
# 最小空闲连接数
minimum-idle: 10
# 最大连接数(根据业务调整,一般为CPU核心数*2+1)
maximum-pool-size: 50
# 连接超时时间(毫秒)
connection-timeout: 30000
# 空闲连接最大存活时间(毫秒)
idle-timeout: 600000
# 连接最大生命周期(毫秒)
max-lifetime: 1800000
# 连接测试查询
connection-test-query: SELECT 1
# 连接泄漏检测阈值(毫秒)
leak-detection-threshold: 60000
# 连接初始化SQL
initialization-fail-timeout: 1
# 是否自动提交
auto-commit: true
# 数据库元数据
metadata:
# 数据库名称
database-name: test
# 数据库版本
database-version: 8.0.28
# 驱动版本
driver-version: 8.0.28
6.2 连接池监控
@Component
public class ConnectionPoolMonitor {
@Autowired
private DataSource dataSource;
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void monitorConnectionPool() {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
System.out.println("=== 连接池监控 ===");
System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
System.out.println("总连接数: " + poolMXBean.getTotalConnections());
System.out.println("等待连接的线程数: " + poolMXBean.getThreadsAwaitingConnection());
System.out.println("连接获取超时次数: " + poolMXBean.getConnectionTimeoutRate());
System.out.println("=================");
// 告警逻辑
if (poolMXBean.getThreadsAwaitingConnection() > 10) {
// 发送告警
sendAlert("连接池等待线程数过高: " + poolMXBean.getThreadsAwaitingConnection());
}
}
}
private void sendAlert(String message) {
// 实现告警逻辑,如发送邮件、短信等
System.out.println("ALERT: " + message);
}
}
七、监控与告警
7.1 关键指标监控
7.1.1 性能指标
-- 查看MySQL性能指标
SHOW GLOBAL STATUS LIKE 'Threads_%';
SHOW GLOBAL STATUS LIKE 'Connections';
SHOW GLOBAL STATUS LIKE 'Slow_queries';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_%';
SHOW GLOBAL STATUS LIKE 'Qps%';
SHOW GLOBAL STATUS LIKE 'Tps%';
7.1.2 慢查询监控
-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 2; -- 超过2秒的查询记录
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
7.2 监控工具集成
7.2.1 Prometheus + Grafana
Prometheus配置(prometheus.yml):
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysql-exporter:9104']
MySQL Exporter配置:
# 启动MySQL Exporter
docker run -d \
--name mysql-exporter \
-p 9104:9104 \
-e DATA_SOURCE_NAME="user:password@(mysql:3306)/" \
prom/mysqld-exporter
7.2.2 自定义监控脚本
#!/usr/bin/env python3
# mysql_monitor.py
import pymysql
import time
import json
from datetime import datetime
class MySQLMonitor:
def __init__(self, host, user, password, database):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
charset='utf8mb4'
)
def get_connection_stats(self):
"""获取连接统计信息"""
cursor = self.connection.cursor()
cursor.execute("SHOW STATUS LIKE 'Threads_%'")
result = cursor.fetchall()
stats = {row[0]: row[1] for row in result}
cursor.close()
return stats
def get_slow_queries(self):
"""获取慢查询信息"""
cursor = self.connection.cursor()
cursor.execute("""
SELECT
COUNT(*) as slow_query_count,
AVG(query_time) as avg_query_time,
MAX(query_time) as max_query_time
FROM mysql.slow_log
WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
""")
result = cursor.fetchone()
cursor.close()
return {
'slow_query_count': result[0],
'avg_query_time': result[1],
'max_query_time': result[2]
}
def get_buffer_pool_stats(self):
"""获取InnoDB缓冲池统计"""
cursor = self.connection.cursor()
cursor.execute("""
SELECT
variable_name,
variable_value
FROM performance_schema.global_status
WHERE variable_name LIKE 'Innodb_buffer_pool%'
""")
result = cursor.fetchall()
cursor.close()
return {row[0]: row[1] for row in result}
def monitor(self):
"""执行监控并输出JSON格式结果"""
timestamp = datetime.now().isoformat()
metrics = {
'timestamp': timestamp,
'connection_stats': self.get_connection_stats(),
'slow_queries': self.get_slow_queries(),
'buffer_pool_stats': self.get_buffer_pool_stats()
}
# 输出JSON格式
print(json.dumps(metrics, indent=2))
# 可以发送到监控系统
# self.send_to_prometheus(metrics)
def send_to_prometheus(self, metrics):
"""发送指标到Prometheus Pushgateway"""
import requests
pushgateway_url = "http://pushgateway:9091/metrics/job/mysql"
# 转换为Prometheus格式
prometheus_metrics = []
for key, value in metrics['connection_stats'].items():
prometheus_metrics.append(f'mysql_{key} {value}')
for key, value in metrics['slow_queries'].items():
prometheus_metrics.append(f'mysql_{key} {value}')
for key, value in metrics['buffer_pool_stats'].items():
prometheus_metrics.append(f'mysql_{key} {value}')
# 发送数据
requests.post(pushgateway_url, data='\n'.join(prometheus_metrics))
if __name__ == '__main__':
monitor = MySQLMonitor(
host='localhost',
user='monitor',
password='password',
database='mysql'
)
# 每分钟执行一次监控
while True:
monitor.monitor()
time.sleep(60)
八、实战案例:秒杀系统设计
8.1 系统架构设计
用户请求 → Nginx → 应用服务器集群 → 消息队列 → 库存服务 → MySQL
↓
Redis缓存
8.2 核心代码实现
8.2.1 库存预热
@Service
public class StockPreheatService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ProductMapper productMapper;
/**
* 秒杀开始前,将库存预热到Redis
*/
public void preheatStock(Long productId, int stock) {
String stockKey = "seckill:stock:" + productId;
String stockLockKey = "seckill:lock:" + productId;
// 1. 设置库存到Redis
redisTemplate.opsForValue().set(stockKey, String.valueOf(stock), 2, TimeUnit.HOURS);
// 2. 设置分布式锁(防止并发预热)
redisTemplate.opsForValue().setIfAbsent(stockLockKey, "1", 30, TimeUnit.SECONDS);
// 3. 同步到数据库(异步)
CompletableFuture.runAsync(() -> {
Product product = new Product();
product.setId(productId);
product.setStock(stock);
productMapper.updateById(product);
});
}
}
8.2.2 秒杀下单流程
@Service
public class SeckillOrderService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderMapper orderMapper;
/**
* 秒杀下单(Redis预扣库存)
*/
@Transactional
public SeckillResult seckill(Long productId, Long userId) {
String stockKey = "seckill:stock:" + productId;
String orderKey = "seckill:order:" + productId + ":" + userId;
// 1. 检查是否已下单
if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
return SeckillResult.fail("您已参与过秒杀");
}
// 2. 预扣库存(使用Lua脚本保证原子性)
String luaScript =
"local stock = redis.call('get', KEYS[1]) " +
"if not stock or tonumber(stock) <= 0 then " +
" return 0 " +
"end " +
"redis.call('decr', KEYS[1]) " +
"return 1";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(stockKey)
);
if (result == 0) {
return SeckillResult.fail("库存不足");
}
// 3. 记录已下单
redisTemplate.opsForValue().set(orderKey, "1", 2, TimeUnit.HOURS);
// 4. 发送消息到消息队列,异步创建订单
SeckillMessage message = new SeckillMessage(productId, userId);
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
return SeckillResult.success("秒杀成功,订单处理中");
}
/**
* 消费消息队列,创建正式订单
*/
@RabbitListener(queues = "seckill.queue")
public void createOrder(SeckillMessage message) {
try {
// 1. 检查库存(数据库)
Product product = productMapper.selectById(message.getProductId());
if (product == null || product.getStock() <= 0) {
// 库存不足,回滚Redis库存
String stockKey = "seckill:stock:" + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
return;
}
// 2. 扣减数据库库存
int updated = productMapper.decreaseStock(message.getProductId());
if (updated == 0) {
// 库存不足,回滚Redis库存
String stockKey = "seckill:stock:" + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
return;
}
// 3. 创建订单
Order order = new Order();
order.setProductId(message.getProductId());
order.setUserId(message.getUserId());
order.setStatus("PAID");
order.setCreateTime(new Date());
orderMapper.insert(order);
// 4. 发送订单创建成功事件
OrderCreatedEvent event = new OrderCreatedEvent(order.getId());
rabbitTemplate.convertAndSend("order.exchange", "order.created", event);
} catch (Exception e) {
// 异常处理:回滚Redis库存
String stockKey = "seckill:stock:" + message.getProductId();
redisTemplate.opsForValue().increment(stockKey);
throw e;
}
}
}
8.2.3 Lua脚本优化
-- seckill.lua
-- 秒杀Lua脚本:检查库存并扣减
local stockKey = KEYS[1]
local orderKey = KEYS[2]
local userId = ARGV[1]
local productId = ARGV[2]
-- 检查是否已下单
if redis.call('exists', orderKey) == 1 then
return {0, "您已参与过秒杀"}
end
-- 检查库存
local stock = redis.call('get', stockKey)
if not stock or tonumber(stock) <= 0 then
return {0, "库存不足"}
end
-- 扣减库存
redis.call('decr', stockKey)
-- 记录已下单
redis.call('set', orderKey, "1", "EX", 7200)
return {1, "秒杀成功"}
Java调用Lua脚本:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public SeckillResult seckillWithLua(Long productId, Long userId) {
String stockKey = "seckill:stock:" + productId;
String orderKey = "seckill:order:" + productId + ":" + userId;
// 加载Lua脚本
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
redisScript.setLocation(new ClassPathResource("seckill.lua"));
redisScript.setResultType(List.class);
// 执行脚本
List result = redisTemplate.execute(
redisScript,
Arrays.asList(stockKey, orderKey),
String.valueOf(userId),
String.valueOf(productId)
);
Long code = (Long) result.get(0);
String message = (String) result.get(1);
if (code == 1) {
// 发送消息到队列
sendToQueue(productId, userId);
return SeckillResult.success(message);
} else {
return SeckillResult.fail(message);
}
}
}
九、总结与最佳实践
9.1 性能优化检查清单
索引优化
- [ ] 检查慢查询日志,优化全表扫描
- [ ] 确保WHERE、JOIN、ORDER BY字段有索引
- [ ] 避免索引失效的写法
- [ ] 定期清理冗余索引
查询优化
- [ ] 使用EXPLAIN分析查询计划
- [ ] 避免SELECT *
- [ ] 优化分页查询
- [ ] 减少JOIN操作
事务与锁
- [ ] 选择合适的事务隔离级别
- [ ] 缩短事务持有时间
- [ ] 避免长事务
- [ ] 使用乐观锁替代悲观锁
架构优化
- [ ] 实现读写分离
- [ ] 考虑分库分表
- [ ] 使用缓存减轻数据库压力
- [ ] 引入消息队列解耦
监控与告警
- [ ] 监控关键性能指标
- [ ] 设置慢查询告警
- [ ] 监控连接池状态
- [ ] 定期性能测试
9.2 常见误区与注意事项
- 过度索引:索引不是越多越好,每个索引都会增加写操作的开销
- 忽视事务:高并发下事务管理不当会导致死锁和性能问题
- 缓存滥用:缓存不是银弹,需要考虑一致性、穿透、击穿、雪崩等问题
- 盲目分库分表:分库分表会增加系统复杂度,应在单机优化达到极限后再考虑
- 忽视监控:没有监控的系统就像盲人摸象,无法及时发现问题
9.3 未来演进方向
- 云原生数据库:考虑使用云数据库服务(如RDS、Aurora),自动处理高可用和扩展性
- NewSQL数据库:对于极致性能要求,可考虑TiDB、CockroachDB等分布式数据库
- HTAP混合事务分析处理:使用TiDB、OceanBase等支持实时分析的数据库
- AI驱动的优化:利用机器学习自动优化索引和查询
结语
MySQL高并发处理是一个系统工程,需要从多个层面进行优化。本文从索引优化、查询优化、事务锁优化、架构升级、缓存策略、连接池优化、监控告警等多个维度进行了详细阐述,并提供了丰富的实战案例和代码示例。
记住,没有银弹。每个系统的优化策略都需要根据具体的业务场景、数据规模、并发量等因素来定制。建议从最简单的索引优化开始,逐步深入到架构优化,同时建立完善的监控体系,持续迭代优化。
在实际应用中,建议采用以下步骤:
- 基准测试:了解当前系统的性能瓶颈
- 逐步优化:从索引和查询优化开始
- 架构演进:当单机优化达到极限时,考虑架构升级
- 持续监控:建立完善的监控告警体系
- 定期复盘:定期回顾优化效果,调整策略
通过系统性的优化,MySQL完全能够应对高并发场景的挑战,为业务提供稳定、高效的数据库服务。
