在现代互联网应用中,高并发场景下数据库往往成为系统瓶颈。MySQL作为最流行的关系型数据库,其性能优化需要从多个层面综合考虑。本文将深入探讨从基础的索引优化到高级的读写分离策略,帮助您系统性地解决MySQL高并发瓶颈问题。
一、高并发场景下的数据库瓶颈分析
1.1 什么是数据库瓶颈
数据库瓶颈是指在系统负载增加时,数据库无法线性扩展处理能力,导致响应时间急剧增加或吞吐量下降的现象。在高并发场景下,主要表现为:
- CPU瓶颈:大量复杂查询导致CPU饱和
- I/O瓶颈:磁盘读写速度跟不上请求速度
- 内存瓶颈:缓冲池不足导致频繁磁盘I/O
- 锁竞争:行锁、表锁导致的等待和死锁
1.2 识别瓶颈的方法
在优化之前,我们需要准确识别瓶颈所在:
-- 查看MySQL当前连接和运行状态
SHOW STATUS LIKE 'Threads_%';
SHOW STATUS LIKE 'Connections';
SHOW STATUS LIKE 'Slow_queries';
-- 查看InnoDB引擎状态
SHOW ENGINE INNODB STATUS\G
-- 查看当前正在执行的查询
SHOW PROCESSLIST;
-- 查看性能模式数据(MySQL 5.6+)
SELECT * FROM performance_schema.events_statements_summary_by_digest
ORDER BY SUM_TIMER_WAIT DESC LIMIT 10;
通过这些命令,我们可以获取数据库的实时状态,识别慢查询、锁等待等性能问题。
二、索引优化策略
2.1 索引基础与最佳实践
索引是提升查询性能最有效的手段。合理使用索引可以将查询性能提升几个数量级。
2.1.1 索引类型选择
-- 主键索引(自动创建)
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 唯一索引
CREATE UNIQUE INDEX idx_email ON users(email);
-- 普通索引
CREATE INDEX idx_username ON users(username);
-- 联合索引(最左前缀原则)
CREATE INDEX idx_username_email ON users(username, email);
-- 全文索引(用于文本搜索)
CREATE TABLE articles (
id INT PRIMARY KEY,
title VARCHAR(200),
content TEXT,
FULLTEXT INDEX idx_content (content)
);
2.1.2 索引设计原则
原则1:选择性高的列优先建立索引
-- 好的索引:选择性高
CREATE INDEX idx_email ON users(email); -- 通常每个邮箱唯一
-- 差的索引:选择性低
CREATE INDEX idx_gender ON users(gender); -- 只有几种值
原则2:覆盖索引减少回表
-- 原始查询(需要回表)
SELECT username, email FROM users WHERE username = 'john';
-- 优化:创建覆盖索引
CREATE INDEX idx_username_email ON users(username, email);
-- 查询计划将显示"Using index",避免回表操作
EXPLAIN SELECT username, email FROM users WHERE username = 'john';
原则3:前缀索引处理长文本
-- 对长VARCHAR列使用前缀索引
CREATE INDEX idx_email_prefix ON users(email(20));
-- 计算合适的前缀长度
SELECT
COUNT(DISTINCT LEFT(email, 10)) / COUNT(*) AS sel_10,
COUNT(DISTINCT LEFT(email, 15)) / COUNT(*) AS sel_15,
COUNT(DISTINCT LEFT(email, 20)) / COUNT(*) AS sel_20,
COUNT(DISTINCT email) / COUNT(*) AS sel_full
FROM users;
2.2 索引优化实战案例
案例1:电商订单查询优化
问题场景:查询用户最近30天的订单,性能缓慢。
-- 原始表结构
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id INT NOT NULL,
order_status TINYINT NOT NULL,
order_amount DECIMAL(10,2),
create_time DATETIME NOT NULL,
update_time DATETIME
);
-- 原始查询
SELECT * FROM orders
WHERE user_id = 12345
AND order_status = 1
AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY create_time DESC;
优化步骤:
- 分析查询计划
EXPLAIN SELECT * FROM orders
WHERE user_id = 12345
AND order_status = 1
AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY create_time DESC;
- 创建合适的索引
-- 方案A:联合索引(推荐)
CREATE INDEX idx_user_status_time ON orders(user_id, order_status, create_time);
-- 方案B:如果只需要部分字段,使用覆盖索引
CREATE INDEX idx_user_status_time_cover ON orders(user_id, order_status, create_time, order_amount);
- 验证优化效果
-- 查看索引使用情况
SHOW INDEX FROM orders;
-- 查看执行计划
EXPLAIN SELECT order_id, order_amount FROM orders
WHERE user_id = 12345
AND order_status = 1
AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY);
案例2:多条件组合查询
问题场景:商品搜索功能,支持多条件筛选。
-- 商品表
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(100),
category_id INT,
brand_id INT,
price DECIMAL(10,2),
stock INT,
status TINYINT,
created_at DATETIME
);
-- 动态查询(根据用户选择的条件)
SELECT * FROM products
WHERE category_id = ?
AND brand_id = ?
AND price BETWEEN ? AND ?
AND status = 1
ORDER BY created_at DESC;
优化策略:
-- 1. 分析查询模式,创建多个索引应对不同场景
CREATE INDEX idx_category_brand_price ON products(category_id, brand_id, price);
CREATE INDEX idx_category_price ON products(category_id, price);
CREATE INDEX idx_brand_price ON products(brand_id, price);
-- 2. 使用索引提示(强制使用特定索引)
SELECT * FROM products USE INDEX (idx_category_brand_price)
WHERE category_id = 1
AND brand_id = 5
AND price BETWEEN 100 AND 1000;
-- 3. 对于排序,确保索引包含排序列
CREATE INDEX idx_category_status_created ON products(category_id, status, created_at DESC);
2.3 索引维护与监控
监控索引使用情况
-- 查看索引使用统计(MySQL 5.6+)
SELECT * FROM sys.schema_index_statistics WHERE table_schema = 'your_database';
-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;
-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;
索引维护操作
-- 分析表更新统计信息
ANALYZE TABLE orders;
-- 优化表(整理碎片)
OPTIMIZE TABLE orders;
-- 删除不再使用的索引
DROP INDEX idx_old ON orders;
三、查询语句优化
3.1 避免全表扫描
3.1.1 WHERE条件优化
-- 反例:导致全表扫描
SELECT * FROM users WHERE YEAR(created_at) = 2023;
-- 正例:使用索引列
SELECT * FROM users
WHERE created_at >= '2023-01-01'
AND created_at < '2024-01-01';
-- 反例:函数操作索引列
SELECT * FROM orders WHERE LOWER(email) = 'user@example.com';
-- 正例:保持索引列原样
SELECT * FROM orders WHERE email = 'user@example.com';
3.1.2 模糊查询优化
-- 反例:前缀通配符导致索引失效
SELECT * FROM users WHERE username LIKE '%john';
-- 正例:后缀通配符可以使用索引
SELECT * FROM users WHERE username LIKE 'john%';
-- 全文搜索替代方案
SELECT * FROM articles
WHERE MATCH(title, content) AGAINST ('database' IN NATURAL LANGUAGE MODE);
3.2 避免SELECT *
-- 反例:查询所有列
SELECT * FROM orders WHERE user_id = 123;
-- 正例:只查询需要的列
SELECT order_id, order_amount, create_time
FROM orders
WHERE user_id = 123;
-- 更好的做法:使用覆盖索引
CREATE INDEX idx_user_cover ON orders(user_id, order_id, order_amount, create_time);
SELECT order_id, order_amount, create_time
FROM orders
WHERE user_id = 123; -- 使用覆盖索引,无需回表
3.3 分页查询优化
3.3.1 传统分页的问题
-- 传统分页(深度分页问题)
SELECT * FROM orders
WHERE user_id = 123
ORDER BY create_time DESC
LIMIT 1000000, 20; -- 扫描1000020条记录,返回20条
3.3.2 优化方案
方案1:延迟关联
-- 先获取主键,再关联查询
SELECT o.* FROM orders o
INNER JOIN (
SELECT order_id FROM orders
WHERE user_id = 123
ORDER BY create_time DESC
LIMIT 1000000, 20
) AS tmp ON o.order_id = tmp.order_id;
方案2:位置记录法
-- 记录上一页最后一条记录的位置
SELECT * FROM orders
WHERE user_id = 123
AND create_time < '2023-12-01 10:00:00' -- 上一页最后一条的时间
ORDER BY create_time DESC
LIMIT 20;
方案3:使用子查询
-- MySQL 8.0+ 使用窗口函数
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (ORDER BY create_time DESC) as row_num
FROM orders WHERE user_id = 123
) AS tmp
WHERE row_num BETWEEN 1000001 AND 1000020;
3.4 JOIN优化
3.4.1 JOIN类型选择
-- 内连接(推荐)
SELECT o.*, u.username
FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE o.user_id = 123;
-- 左连接(需要时使用)
SELECT o.*, u.username
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE o.create_time >= '2023-01-01';
3.4.2 JOIN优化技巧
-- 1. 小表驱动大表
SELECT * FROM small_table s
JOIN large_table l ON s.id = l.small_id;
-- 2. 确保JOIN字段有索引
CREATE INDEX idx_user_id ON orders(user_id);
CREATE INDEX idx_id ON users(id);
-- 3. 避免笛卡尔积
SELECT * FROM a, b WHERE a.id = b.id; -- 错误
SELECT * FROM a JOIN b ON a.id = b.id; -- 正确
-- 4. 使用STRAIGHT_JOIN强制表顺序
SELECT STRAIGHT_JOIN * FROM small_table s
JOIN large_table l ON s.id = l.small_id;
四、数据库架构优化
4.1 分库分表
4.1.1 垂直分表
-- 原始大表
CREATE TABLE user_info (
id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
profile TEXT, -- 大文本字段
settings TEXT, -- 大文本字段
created_at DATETIME
);
-- 垂直拆分
CREATE TABLE user_base (
id INT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
created_at DATETIME
);
CREATE TABLE user_details (
user_id INT PRIMARY KEY,
profile TEXT,
settings TEXT,
FOREIGN KEY (user_id) REFERENCES user_base(id)
);
4.1.2 水平分表(分片)
-- 按用户ID取模分表
-- orders_0, orders_1, orders_2, ..., orders_9
-- 分表后的查询需要应用层路由
-- 伪代码示例:
function getOrdersByUserId(userId) {
const tableIndex = userId % 10;
const tableName = `orders_${tableIndex}`;
return db.query(`SELECT * FROM ${tableName} WHERE user_id = ?`, [userId]);
}
-- 分表后的聚合查询需要UNION ALL
SELECT * FROM orders_0 WHERE user_id = 123
UNION ALL
SELECT * FROM orders_1 WHERE user_id = 123
-- ... 遍历所有分表
4.2 分区表
-- 按时间范围分区
CREATE TABLE logs (
id BIGINT NOT NULL AUTO_INCREMENT,
log_time DATETIME NOT NULL,
message TEXT,
PRIMARY KEY (id, log_time)
) PARTITION BY RANGE COLUMNS(log_time) (
PARTITION p202301 VALUES LESS THAN ('2023-02-01'),
PARTITION p202302 VALUES LESS THAN ('2023-03-01'),
PARTITION p202303 VALUES LESS THAN ('2023-04-01'),
PARTITION p202304 VALUES LESS THAN ('2023-05-01'),
PARTITION pmax VALUES LESS THAN (MAXVALUE)
);
-- 查询时自动分区裁剪
SELECT * FROM logs
WHERE log_time >= '2023-03-01'
AND log_time < '2023-04-01'; -- 只扫描p202303分区
五、读写分离架构
5.1 读写分离原理
读写分离的核心思想是将写操作(INSERT/UPDATE/DELETE)发送到主库,读操作(SELECT)发送到从库,从而分散数据库压力。
应用层
↓
中间件/代理层
↓
主库(写) ←→ 从库1(读)
↓
从库2(读)
↓
从库3(读)
5.2 MySQL主从复制配置
5.2.1 主库配置(Master)
# my.cnf 或 my.ini
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M
# 需要同步的数据库
binlog_do_db = myapp
# 不需要同步的数据库
binlog_ignore_db = mysql
binlog_ignore_db = information_schema
创建复制用户:
CREATE USER 'repl'@'%' IDENTIFIED BY 'ReplPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;
-- 锁表并记录位置
FLUSH TABLES WITH READ LOCK;
SHOW MASTER STATUS;
-- 记录 File 和 Position 值,例如:mysql-bin.000001, 154
5.2.2 从库配置(Slave)
# my.cnf
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
log_bin = mysql-bin
read_only = 1 # 只读模式,防止误写
配置复制:
-- 停止从库复制
STOP SLAVE;
-- 配置主库信息
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='ReplPassword123!',
MASTER_LOG_FILE='mysql-bin.000001', -- 从主库SHOW MASTER STATUS获取
MASTER_LOG_POS=154; -- 从主库SHOW MASTER STATUS获取
-- 启动复制
START SLAVE;
-- 查看复制状态
SHOW SLAVE STATUS\G
-- 关键指标:
-- Slave_IO_Running: Yes
-- Slave_SQL_Running: Yes
-- Seconds_Behind_Master: 0(延迟秒数)
5.2.3 验证主从同步
-- 在主库创建测试数据
CREATE DATABASE test_replication;
USE test_replication;
CREATE TABLE test (id INT PRIMARY KEY, value VARCHAR(50));
INSERT INTO test VALUES (1, 'master');
INSERT INTO test VALUES (2, 'master');
-- 在从库查询(应该能看到相同数据)
SELECT * FROM test_replication.test;
-- 在主库更新
UPDATE test_replication.test SET value = 'updated' WHERE id = 1;
-- 在从库验证
SELECT * FROM test_replication.test;
5.3 应用层实现读写分离
5.3.1 使用Spring Boot + ShardingSphere
// 1. 添加依赖
// pom.xml
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>5.1.1</version>
</dependency>
// 2. 配置文件 application.yml
spring:
shardingsphere:
datasource:
names: master,slave0,slave1
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master-ip:3306/myapp?useSSL=false
username: root
password: password
slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave0-ip:3306/myapp?useSSL=false
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1-ip:3306/myapp?useSSL=false
username: root
password: password
rules:
readwrite-splitting:
data-sources:
myds:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave0,slave1
load-balance-algorithm-name: round_robin
props:
sql-show: true # 打印SQL路由信息
// 3. 业务代码(无需特殊处理)
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
// 写操作自动路由到主库
@Transactional
public Order createOrder(Order order) {
return orderRepository.save(order);
}
// 读操作自动路由到从库
public Order getOrderById(Long id) {
return orderRepository.findById(id).orElse(null);
}
// 复杂查询可能需要强制走主库
@ReadwriteSplittingHint(type = "write")
public Order getRecentOrderWithLock(Long userId) {
// 使用主库查询,确保数据一致性
return orderRepository.findRecentByUserId(userId);
}
}
5.3.2 使用MyCat中间件
<!-- MyCat schema.xml 配置 -->
<schema name="myapp" checkSQLschema="false" sqlMaxLimit="100">
<table name="orders" dataNode="dn1,dn2,dn3" rule="mod-long" />
</schema>
<dataNode name="dn1" dataHost="host1" database="myapp" />
<dataNode name="dn2" dataHost="host1" database="myapp" />
<dataNode name="dn3" dataHost="host1" database="myapp" />
<dataHost name="host1" maxCon="1000" minCon="10" balance="1" writeType="0" dbType="mysql" dbDriver="native">
<heartbeat>select user()</heartbeat>
<!-- 写节点 -->
<writeHost host="hostM1" url="master-ip:3306" user="root" password="password">
<!-- 读节点 -->
<readHost host="hostS1" url="slave0-ip:3306" user="root" password="password" />
<readHost host="hostS2" url="slave1-ip:3306" user="root" password="password" />
</writeHost>
</dataHost>
5.3.3 使用ShardingSphere-Proxy
# config-sharding.yaml
schemaName: myapp
dataSources:
ds_0:
url: jdbc:mysql://master-ip:3306/myapp?serverTimezone=UTC&useSSL=false
username: root
password: password
connectionTimeoutMilliseconds: 3000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
ds_1:
url: jdbc:mysql://slave0-ip:3306/myapp?serverTimezone=UTC&useSSL=false
username: root
password: password
connectionTimeoutMilliseconds: 3000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
ds_2:
url: jdbc:mysql://slave1-ip:3306/myapp?serverTimezone=UTC&useSSL=false
username: root
password: password
connectionTimeoutMilliseconds: 3000
idleTimeoutMilliseconds: 60000
maxLifetimeMilliseconds: 1800000
maxPoolSize: 50
rules:
- !READWRITE_SPLITTING
dataSources:
myds:
type: Static
props:
write-data-source-name: ds_0
read-data-source-names: ds_1,ds_2
load-balance-algorithm-name: round_robin
- !SHARDING
tables:
orders:
actualDataNodes: myds.orders_$->{0..9}
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: mod
shardingAlgorithms:
mod:
type: MOD
props:
sharding-count: 10
5.4 读写分离的注意事项
5.4.1 数据延迟问题
// 方案1:关键业务强制走主库
public Order getOrderAfterCreate(Long orderId) {
// 创建订单后立即查询,强制走主库避免读到延迟数据
return orderRepository.findFromMaster(orderId);
}
// 方案2:使用缓存标记
public void createOrder(Order order) {
orderRepository.save(order);
// 设置缓存标记,5秒内走主库
redisTemplate.opsForValue().set("order:recent:" + order.getUserId(), "1", 5, TimeUnit.SECONDS);
}
public Order getOrder(Long orderId, Long userId) {
// 检查是否有最近创建标记
if (redisTemplate.hasKey("order:recent:" + userId)) {
return orderRepository.findFromMaster(orderId);
}
return orderRepository.findFromSlave(orderId);
}
5.4.2 主从复制监控
-- 监控复制延迟
SHOW SLAVE STATUS\G
-- 查看主从同步状态
SELECT * FROM performance_schema.replication_applier_status;
-- 监控复制错误
SELECT * FROM performance_schema.replication_applier_status_by_worker;
5.4.3 故障切换
// 简单的故障检测和切换
@Component
public class DataSourceHealthChecker {
@Autowired
private DataSource masterDataSource;
@Autowired
private List<DataSource> slaveDataSources;
@Scheduled(fixedRate = 5000)
public void checkHealth() {
// 检查主库
if (!isHealthy(masterDataSource)) {
// 触发告警和故障转移
alertMasterDown();
}
// 检查从库
for (int i = 0; i < slaveDataSources.size(); i++) {
if (!isHealthy(slaveDataSources.get(i))) {
// 标记从库为不可用
markSlaveUnavailable(i);
}
}
}
private boolean isHealthy(DataSource dataSource) {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1")) {
return rs.next();
} catch (Exception e) {
return false;
}
}
}
六、高级优化策略
6.1 缓存策略
6.1.1 多级缓存架构
// 伪代码:多级缓存实现
public class MultiLevelCacheService {
// L1: 本地缓存(Caffeine)
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(10, TimeUnit.SECONDS)
.build();
// L2: Redis缓存
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// L3: 数据库
@Autowired
private OrderRepository orderRepository;
public Order getOrder(Long orderId) {
String key = "order:" + orderId;
// L1: 本地缓存
Order order = (Order) localCache.getIfPresent(key);
if (order != null) {
return order;
}
// L2: Redis缓存
order = (Order) redisTemplate.opsForValue().get(key);
if (order != null) {
// 回填本地缓存
localCache.put(key, order);
return order;
}
// L3: 数据库
order = orderRepository.findById(orderId).orElse(null);
if (order != null) {
// 回填缓存
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
localCache.put(key, order);
}
return order;
}
public void updateOrder(Order order) {
// 更新数据库
orderRepository.save(order);
// 删除缓存(Cache Aside模式)
String key = "order:" + order.getId();
redisTemplate.delete(key);
localCache.invalidate(key);
}
}
6.1.2 缓存穿透、击穿、雪崩防护
// 缓存空值防止穿透
public Order getOrderWithNullCache(Long orderId) {
String key = "order:" + orderId;
String nullKey = "order:null:" + orderId;
// 检查空值缓存
if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
return null;
}
Order order = (Order) redisTemplate.opsForValue().get(key);
if (order == null) {
order = orderRepository.findById(orderId).orElse(null);
if (order == null) {
// 缓存空值,防止穿透
redisTemplate.opsForValue().set(nullKey, "", 5, TimeUnit.MINUTES);
return null;
}
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
}
return order;
}
// 分布式锁防止缓存击穿
public Order getOrderWithLock(Long orderId) {
String key = "order:" + orderId;
String lockKey = "lock:order:" + orderId;
Order order = (Order) redisTemplate.opsForValue().get(key);
if (order != null) {
return order;
}
// 尝试获取分布式锁
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 双重检查
order = (Order) redisTemplate.opsForValue().get(key);
if (order == null) {
order = orderRepository.findById(orderId).orElse(null);
if (order != null) {
redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
}
}
} finally {
redisTemplate.delete(lockKey);
}
} else {
// 等待并重试
try {
Thread.sleep(50);
return getOrderWithLock(orderId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return order;
}
6.2 连接池优化
6.2.1 HikariCP配置
// Spring Boot配置
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
// 基础配置
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/myapp");
dataSource.setUsername("root");
dataSource.setPassword("password");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池大小(根据业务调整)
dataSource.setMaximumPoolSize(50); // 最大连接数
dataSource.setMinimumIdle(10); // 最小空闲连接
dataSource.setConnectionTimeout(3000); // 连接超时3秒
dataSource.setIdleTimeout(600000); // 空闲10分钟回收
dataSource.setMaxLifetime(1800000); // 连接最大存活30分钟
// 性能优化
dataSource.setPoolName("MyAppHikariCP");
dataSource.addDataSourceProperty("cachePrepStmts", "true");
dataSource.addDataSourceProperty("prepStmtCacheSize", "250");
dataSource.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource.addDataSourceProperty("useServerPrepStmts", "true");
dataSource.addDataSourceProperty("useLocalSessionState", "true");
dataSource.addDataSourceProperty("rewriteBatchedStatements", "true");
dataSource.addDataSourceProperty("cacheResultSetMetadata", "true");
dataSource.addDataSourceProperty("cacheServerConfiguration", "true");
dataSource.addDataSourceProperty("elideSetAutoCommits", "true");
dataSource.addDataSourceProperty("maintainTimeStats", "false");
return dataSource;
}
}
6.2.2 连接池监控
// 监控连接池状态
@Component
public class ConnectionPoolMonitor {
@Autowired
private HikariDataSource dataSource;
@Scheduled(fixedRate = 60000)
public void monitorPool() {
HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
log.info("连接池状态:");
log.info(" 活跃连接数: {}", poolMXBean.getActiveConnections());
log.info(" 空闲连接数: {}", poolMXBean.getIdleConnections());
log.info(" 总连接数: {}", poolMXBean.getTotalConnections());
log.info(" 等待连接数: {}", poolMXBean.getThreadsAwaitingConnection());
// 告警阈值
if (poolMXBean.getThreadsAwaitingConnection() > 10) {
log.warn("连接池等待数过高: {}", poolMXBean.getThreadsAwaitingConnection());
}
}
}
6.3 异步处理
6.3.1 异步写入
// 使用消息队列异步处理
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderRepository orderRepository;
// 同步接口,快速返回
public CompletableFuture<Long> createOrderAsync(Order order) {
// 1. 生成订单ID并持久化到临时表
order.setStatus("PENDING");
Order saved = orderRepository.save(order);
// 2. 发送消息到MQ
rabbitTemplate.convertAndSend("order.exchange", "order.create", saved.getId());
// 3. 立即返回订单ID
return CompletableFuture.completedFuture(saved.getId());
}
// MQ消费者异步处理
@RabbitListener(queues = "order.queue")
public void processOrder(Long orderId) {
try {
// 完整的业务处理
Order order = orderRepository.findById(orderId).orElseThrow();
processPayment(order);
updateInventory(order);
order.setStatus("COMPLETED");
orderRepository.save(order);
} catch (Exception e) {
// 失败重试或记录日志
log.error("处理订单失败: {}", orderId, e);
}
}
}
6.3.2 批量处理
// 批量插入优化
public void batchInsertOrders(List<Order> orders) {
// 使用JDBC批量处理
jdbcTemplate.batchUpdate(
"INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Order order = orders.get(i);
ps.setLong(1, order.getUserId());
ps.setBigDecimal(2, order.getAmount());
ps.setInt(3, order.getStatus());
}
@Override
public int getBatchSize() {
return orders.size();
}
}
);
}
// 使用LOAD DATA INFILE(超大数据量)
public void bulkLoadOrders(String filePath) {
String sql = "LOAD DATA LOCAL INFILE '" + filePath + "' " +
"INTO TABLE orders " +
"FIELDS TERMINATED BY ',' " +
"LINES TERMINATED BY '\\n' " +
"(user_id, amount, status)";
jdbcTemplate.execute(sql);
}
6.4 数据归档
6.4.1 历史数据归档策略
-- 创建归档表(结构与原表相同)
CREATE TABLE orders_archive LIKE orders;
-- 定期归档(存储过程)
DELIMITER $$
CREATE PROCEDURE archive_old_orders()
BEGIN
-- 开始事务
START TRANSACTION;
-- 插入历史数据到归档表
INSERT INTO orders_archive
SELECT * FROM orders
WHERE create_time < DATE_SUB(NOW(), INTERVAL 1 YEAR);
-- 删除原表历史数据
DELETE FROM orders
WHERE create_time < DATE_SUB(NOW(), INTERVAL 1 YEAR);
-- 提交事务
COMMIT;
END$$
DELIMITER ;
-- 设置定时任务(Event Scheduler)
SET GLOBAL event_scheduler = ON;
CREATE EVENT archive_orders_event
ON SCHEDULE EVERY 1 WEEK
DO
CALL archive_old_orders();
6.4.2 分区表归档
-- 使用分区表便于归档
CREATE TABLE orders (
id BIGINT,
user_id INT,
amount DECIMAL(10,2),
create_time DATETIME,
PRIMARY KEY (id, create_time)
) PARTITION BY RANGE COLUMNS(create_time) (
PARTITION p202301 VALUES LESS THAN ('2023-02-01'),
PARTITION p202302 VALUES LESS THAN ('2023-03-01'),
PARTITION p202303 VALUES LESS THAN ('2023-04-01'),
PARTITION pcurrent VALUES LESS THAN (MAXVALUE)
);
-- 归档时直接删除分区(瞬间完成)
ALTER TABLE orders DROP PARTITION p202301;
-- 或者将分区移动到归档表
ALTER TABLE orders EXCHANGE PARTITION p202301 WITH TABLE orders_archive;
七、监控与告警
7.1 性能监控指标
7.1.1 关键指标采集
-- 1. 查询性能指标
SELECT
SCHEMA_NAME,
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
SUM_ROWS_EXAMINED,
SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 2. 表I/O统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_NUMBER_OF_BYTES_READ,
SUM_NUMBER_OF_BYTES_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
-- 3. 索引使用统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
-- 4. 锁等待统计
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
LOCK_TYPE,
LOCK_MODE,
COUNT_WAIT,
SUM_TIMER_WAIT/1000000000000 AS wait_time_sec
FROM performance_schema.data_lock_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC;
7.1.2 慢查询日志分析
# my.cnf 配置慢查询日志
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 # 记录超过1秒的查询
log_queries_not_using_indexes = 1
min_examined_row_limit = 1000
分析慢查询日志:
# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
# 使用pt-query-digest分析(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
7.2 监控告警系统
7.2.1 Prometheus + Grafana监控
# docker-compose.yml
version: '3'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
mysqld_exporter:
image: prom/mysqld-exporter
environment:
- DATA_SOURCE_NAME=exporter:password@(mysql:3306)/
ports:
- "9104:9104"
# prometheus.yml
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysqld_exporter:9104']
7.2.2 告警规则示例
# alert-rules.yml
groups:
- name: mysql
rules:
- alert: MySQLDown
expr: up{job="mysql"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "MySQL instance {{ $labels.instance }} is down"
- alert: MySQLSlowQueries
expr: rate(mysql_global_status_slow_queries[5m]) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL slow queries rate is high"
- alert: MySQLHighConnections
expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL connection usage is high"
7.3 性能分析工具
7.3.1 使用EXPLAIN分析查询
-- 基本EXPLAIN
EXPLAIN SELECT * FROM orders WHERE user_id = 123;
-- 详细EXPLAIN(包含执行时间)
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 123;
-- JSON格式EXPLAIN
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 123;
-- 查看执行计划可视化
EXPLAIN
SELECT o.*, u.username
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.create_time >= '2023-01-01';
EXPLAIN输出解读:
| 字段 | 说明 | 优化目标 |
|---|---|---|
| type | 连接类型 | ALL → range → ref → eq_ref → const |
| key | 实际使用的索引 | 应该有值,不应是NULL |
| rows | 预计扫描行数 | 越少越好 |
| Extra | 额外信息 | 避免Using filesort, Using temporary |
7.3.2 使用sys schema
-- 查看最耗时的查询
SELECT * FROM sys.statement_analysis
ORDER BY avg_latency DESC
LIMIT 10;
-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;
-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;
-- 查看表的I/O情况
SELECT * FROM sys.schema_table_statistics
ORDER BY total_latency DESC
LIMIT 10;
-- 查看锁等待
SELECT * FROM sys.innodb_lock_waits;
八、实战案例:电商系统优化
8.1 系统架构
用户请求
↓
Nginx(负载均衡)
↓
应用服务器集群(Spring Boot)
↓
ShardingSphere(读写分离 + 分库分表)
↓
主库(写) + 从库1(读) + 从库2(读)
↓
Redis集群(缓存)
↓
消息队列(异步处理)
8.2 优化前的问题
-- 问题1:慢查询
SELECT * FROM orders
WHERE user_id = 123
AND status = 1
AND create_time >= '2023-01-01'
ORDER BY create_time DESC;
-- 问题2:大表全表扫描
SELECT COUNT(*) FROM orders WHERE status = 1;
-- 问题3:深度分页
SELECT * FROM orders ORDER BY create_time DESC LIMIT 1000000, 20;
8.3 优化方案实施
8.3.1 索引优化
-- 1. 创建联合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time DESC);
-- 2. 创建覆盖索引
CREATE INDEX idx_status_time_cover ON orders(status, create_time, id, user_id, amount);
-- 3. 优化COUNT查询
CREATE INDEX idx_status ON orders(status);
-- 或使用近似值
SHOW TABLE STATUS LIKE 'orders'; -- 查看Rows值
8.3.2 分库分表
-- 按用户ID分10张表
-- orders_0 ~ orders_9
-- 应用层路由逻辑
public class OrderShardingRouter {
private static final int SHARD_COUNT = 10;
public String getTableName(Long userId) {
int index = (int) (userId % SHARD_COUNT);
return "orders_" + index;
}
public List<String> getAllTableNames() {
return IntStream.range(0, SHARD_COUNT)
.mapToObj(i -> "orders_" + i)
.collect(Collectors.toList());
}
}
8.3.3 读写分离配置
# ShardingSphere配置
rules:
- !READWRITE_SPLITTING
dataSources:
myds:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave0,slave1
load-balance-algorithm-name: round_robin
- !SHARDING
tables:
orders:
actualDataNodes: myds.orders_$->{0..9}
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: mod
shardingAlgorithms:
mod:
type: MOD
props:
sharding-count: 10
8.3.4 缓存策略
// 缓存热点数据
@Cacheable(value = "orders", key = "#orderId", unless = "#result == null")
public Order getOrder(Long orderId) {
return orderRepository.findById(orderId).orElse(null);
}
// 缓存用户订单列表(带分页)
@Cacheable(value = "userOrders", key = "#userId + ':' + #page + ':' + #size")
public List<Order> getUserOrders(Long userId, int page, int size) {
String tableName = shardingRouter.getTableName(userId);
return orderRepository.findByUserIdInTable(tableName, userId, page * size, size);
}
// 更新时清除缓存
@CacheEvict(value = {"orders", "userOrders"}, allEntries = true)
public void updateOrder(Order order) {
orderRepository.save(order);
}
8.3.5 异步处理
// 订单创建流程
public CompletableFuture<Long> createOrder(Order order) {
// 1. 校验库存(同步)
if (!inventoryService.checkStock(order.getProductId(), order.getQuantity())) {
throw new BusinessException("库存不足");
}
// 2. 创建订单(同步)
order.setStatus("PENDING");
Order saved = orderRepository.save(order);
// 3. 发送MQ消息(异步)
rabbitTemplate.convertAndSend("order.exchange", "order.create", saved.getId());
// 4. 立即返回
return CompletableFuture.completedFuture(saved.getId());
}
// MQ消费者
@RabbitListener(queues = "order.queue")
public void processOrder(Long orderId) {
try {
// 异步处理支付、库存扣减、通知等
Order order = orderRepository.findById(orderId).orElseThrow();
// 支付处理
paymentService.process(order);
// 库存扣减
inventoryService.deduct(order.getProductId(), order.getQuantity());
// 更新状态
order.setStatus("COMPLETED");
orderRepository.save(order);
// 发送通知
notificationService.sendOrderCompleted(order);
} catch (Exception e) {
log.error("订单处理失败: {}", orderId, e);
// 重试或补偿
retryService.scheduleRetry(orderId);
}
}
8.4 优化效果对比
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 平均响应时间 | 850ms | 45ms | 94.7% |
| QPS | 500 | 5000 | 10倍 |
| 慢查询数 | 120/分钟 | 2/分钟 | 98% |
| CPU使用率 | 85% | 35% | 59% |
| 磁盘I/O | 95% | 25% | 74% |
九、总结与最佳实践
9.1 优化优先级
索引优化(成本最低,效果最明显)
- 确保WHERE、JOIN、ORDER BY字段有索引
- 使用覆盖索引减少回表
- 定期清理无用索引
查询优化(零成本)
- 避免SELECT *
- 避免函数操作索引列
- 优化分页查询
缓存策略(中等成本)
- 多级缓存架构
- 防止缓存穿透、击穿、雪崩
- 合理的缓存过期策略
架构优化(高成本)
- 读写分离
- 分库分表
- 数据归档
9.2 日常维护 checklist
#!/bin/bash
# MySQL日常维护脚本
# 1. 检查慢查询日志
echo "=== 慢查询统计 ==="
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
# 2. 检查表索引使用情况
mysql -e "SELECT * FROM sys.schema_unused_indexes;"
# 3. 检查主从延迟
mysql -e "SHOW SLAVE STATUS\G" | grep Seconds_Behind_Master
# 4. 检查连接数
mysql -e "SHOW STATUS LIKE 'Threads_connected';"
mysql -e "SHOW STATUS LIKE 'Max_used_connections';"
# 5. 检查锁等待
mysql -e "SELECT * FROM sys.innodb_lock_waits;"
# 6. 生成性能报告
mysql -e "SELECT * FROM sys.statement_analysis ORDER BY avg_latency DESC LIMIT 10;" > /tmp/perf_report.txt
9.3 常见误区
❌ 误区1:索引越多越好
- ✅ 正解:索引会降低写入性能,只创建必要的索引
❌ 误区2:所有查询都要走索引
- ✅ 正解:小表全表扫描可能更快
❌ 误区3:读写分离能解决所有性能问题
- ✅ 正解:需要配合缓存、异步等其他手段
❌ 误区4:分库分表是银弹
- ✅ 正解:会带来分布式事务、跨库查询等复杂问题
9.4 持续优化建议
- 建立性能基线:定期记录关键指标,建立性能基线
- 灰度发布:优化方案先在小范围验证
- A/B测试:对比优化前后的性能数据
- 文档沉淀:记录每次优化的原因、方案和效果
- 团队培训:提升团队数据库优化能力
通过系统性地应用这些策略,您可以有效解决MySQL高并发场景下的性能瓶颈,构建稳定、高效的数据库架构。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。
