引言:理解高并发对MySQL的挑战
在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库,在高并发环境下容易出现性能瓶颈、锁竞争、连接耗尽甚至崩溃宕机的问题。
高并发对MySQL的挑战主要体现在以下几个方面:
- 连接数激增:大量并发请求会快速消耗数据库连接资源
- CPU负载过高:频繁的查询解析、执行计划生成和结果返回会占用大量CPU资源
- I/O瓶颈:高并发读写会导致磁盘I/O成为系统瓶颈
- 锁竞争:行锁、表锁、元数据锁等在高并发写操作下会产生严重竞争
- 内存压力:缓冲池、连接缓存、排序缓存等内存结构面临巨大压力
本文将从多个维度深入探讨MySQL高并发处理策略,帮助您构建稳定、高性能的数据库系统。
一、连接层优化策略
1.1 合理配置连接参数
MySQL的连接处理是高并发优化的第一道关卡。不当的连接配置会导致连接拒绝或资源耗尽。
关键参数配置:
-- 查看当前连接配置
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'wait_timeout';
SHOW VARIABLES LIKE 'interactive_timeout';
-- 推荐配置(根据服务器内存调整)
SET GLOBAL max_connections = 2000; -- 最大连接数
SET GLOBAL wait_timeout = 300; -- 非交互连接超时时间(秒)
SET GLOBAL interactive_timeout = 600; -- 交互连接超时时间(秒)
SET GLOBAL max_connect_errors = 100000; -- 最大连接错误数
配置说明:
max_connections:建议设置为CPU核心数的2-4倍,但不超过服务器内存承载能力(每个连接约占用10MB内存)wait_timeout:设置过长会占用连接资源,过短会导致频繁重连,300秒是较为平衡的值max_connect_errors:防止恶意连接攻击,适当调高避免正常请求被拒绝
1.2 连接池技术应用
连接池是解决高并发连接问题的核心技术,通过复用连接减少建立/关闭连接的开销。
Java JDBC连接池配置示例(HikariCP):
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySQLConnectionPool {
private static HikariDataSource dataSource;
static {
HikariConfig config = new HikariConfig();
// 数据库连接信息
config.setJdbcUrl("jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true");
config.setUsername("username");
config.setPassword("password");
// 连接池核心配置
config.setMaximumPoolSize(50); // 最大连接数
config.setMinimumIdle(10); // 最小空闲连接数
config.setConnectionTimeout(30000); // 连接超时时间(毫秒)
config.setIdleTimeout(600000); // 空闲超时时间(毫秒)
config.setMaxLifetime(1800000); // 连接最大存活时间(毫秒)
config.setLeakDetectionThreshold(60000); // 泄漏检测阈值(毫秒)
// 性能优化配置
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
dataSource = new HikariDataSource(config);
}
// 获取数据源
public static DataSource getDataSource() {
return dataSource;
}
// 示例:使用连接池执行查询
public void executeQuery() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?");
ResultSet rs = stmt.executeQuery()) {
stmt.setInt(1, 123);
while (rs.next()) {
System.out.println("User: " + rs.getString("username"));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
连接池配置要点:
- 最大连接数:根据业务峰值设置,通常为CPU核心数的2-4倍
- 最小空闲连接:保持一定数量的备用连接,避免突发请求等待
- 连接超时:防止连接获取等待过久
- 预编译SQL缓存:显著减少SQL解析开销
1.3 读写分离架构
对于读多写少的场景,读写分离是提升并发能力的有效手段。
MySQL主从复制配置:
-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1
-- 主库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
-- 从库启动复制
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=4;
START SLAVE;
Java读写分离实现:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.stereotype.Component;
@Component
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final ThreadLocal<DataSourceType> currentDataSource =
ThreadLocal.withInitial(() -> DataSourceType.MASTER);
@Override
protected Object determineCurrentLookupKey() {
return currentDataSource.get();
}
public static void setDataSource(DataSourceType type) {
currentDataSource.set(type);
}
public static void clearDataSource() {
currentDataSource.remove();
}
public enum DataSourceType {
MASTER, SLAVE
}
}
// 使用AOP实现读写分离
@Aspect
@Component
public class DataSourceAspect {
@Around("@annotation(readOnly)")
public Object setReadOnlyDataSource(ProceedingJoinPoint pjp, ReadOnly readOnly) throws Throwable {
try {
DynamicDataSource.setDataSource(DynamicDataSource.DataSourceType.SLAVE);
return pjp.proceed();
} finally {
DynamicDataSource.clearDataSource();
}
}
}
// 注解使用
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
// 写操作使用主库
public void createUser(User user) {
userMapper.insert(user);
}
// 读操作使用从库
@ReadOnly
public User getUserById(Long id) {
return userMapper.selectById(id);
}
}
二、SQL语句优化策略
2.1 索引优化与覆盖索引
索引是提升查询性能的最有效手段,但不当的索引会成为性能杀手。
索引优化原则:
- 最左前缀原则:复合索引必须从左开始匹配
- 区分度原则:选择性高的列放在索引前面
- 覆盖索引:查询列全部包含在索引中,避免回表
示例:优化用户查询表
-- 创建用户表
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
age INT,
status TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_age_status (age, status),
INDEX idx_email (email)
);
-- 问题SQL:全表扫描
EXPLAIN SELECT * FROM users WHERE age > 25 AND status = 1;
-- 优化方案1:创建合适索引
CREATE INDEX idx_age_status ON users(age, status);
-- 优化方案2:使用覆盖索引
CREATE INDEX idx_cover ON users(age, status, username, email);
-- 验证索引效果
EXPLAIN SELECT username, email FROM users WHERE age > 25 AND status = 1;
-- 输出应显示:type=range, key=idx_cover, Extra=Using index condition
-- 问题SQL:索引失效
EXPLAIN SELECT * FROM users WHERE username LIKE '%john%';
-- 优化方案:前缀索引或全文索引
CREATE INDEX idx_username_prefix ON users(username(10));
EXPLAIN SELECT * FROM users WHERE username LIKE 'john%';
-- 或者使用Elasticsearch等外部搜索引擎处理模糊查询
索引失效的常见场景:
-- 1. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000; -- phone是varchar,会失效
SELECT * FROM users WHERE phone = '13800138000'; -- 正确
-- 2. 函数操作
SELECT * FROM users WHERE DATE(created_at) = '2024-01-01'; -- 失效
SELECT * FROM users WHERE created_at >= '2024-01-01' AND created_at < '2024-01-02'; -- 正确
-- 3. OR条件(部分情况)
SELECT * FROM users WHERE age = 25 OR age = 30; -- 可能失效
SELECT * FROM users WHERE age IN (25, 30); -- 更好
-- 4. 负向查询
SELECT * FROM users WHERE status != 0; -- 可能失效
SELECT * FROM users WHERE status = 1; -- 更好
2.2 批量操作与分页优化
高并发场景下,批量操作和分页查询是常见需求,需要特殊优化。
批量插入优化:
-- 问题:逐条插入(慢)
INSERT INTO orders (user_id, product_id, amount) VALUES (1, 100, 99.9);
INSERT INTO orders (user_id, product_id, amount) VALUES (2, 101, 199.9);
INSERT INTO orders (user_id, product_id, amount) VALUES (3, 102, 299.9);
-- 优化:批量插入(快10倍以上)
INSERT INTO orders (user_id, product_id, amount) VALUES
(1, 100, 99.9),
(2, 101, 199.9),
(3, 102, 299.9),
(4, 103, 399.9),
(5, 104, 499.9);
-- Java批量操作示例
public void batchInsertOrders(List<Order> orders) {
String sql = "INSERT INTO orders (user_id, product_id, amount) VALUES (?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
conn.setAutoCommit(false); // 关闭自动提交
for (int i = 0; i < orders.size(); i++) {
Order order = orders.get(i);
stmt.setLong(1, order.getUserId());
stmt.setLong(2, order.getProductId());
stmt.setBigDecimal(3, order.getAmount());
stmt.addBatch();
// 每1000条提交一次
if (i % 1000 == 0 || i == orders.size() - 1) {
stmt.executeBatch();
conn.commit();
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
深分页问题优化:
-- 问题:深分页查询慢(扫描大量数据)
SELECT * FROM orders WHERE status = 1 ORDER BY id DESC LIMIT 1000000, 20;
-- 优化方案1:延迟关联(覆盖索引)
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders WHERE status = 1 ORDER BY id DESC LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;
-- 优化方案2:书签记录法(业务层优化)
-- 第一次查询
SELECT * FROM orders WHERE status = 1 AND id < ? ORDER BY id DESC LIMIT 20;
-- 后续查询使用上次结果的最小id作为条件
-- 优化方案3:使用Elasticsearch处理深分页
2.3 事务与锁优化
高并发下事务和锁是性能瓶颈的关键点。
事务优化原则:
- 短事务:事务尽可能短,减少锁持有时间
- 小事务:单个事务操作数据量不宜过大
- 避免长事务:监控长事务并优化
锁优化示例:
-- 问题:长事务导致锁竞争
START TRANSACTION;
UPDATE products SET stock = stock - 1 WHERE id = 100;
-- 复杂业务逻辑,耗时10秒
UPDATE orders SET status = 'paid' WHERE user_id = 1;
COMMIT; -- 长时间持有锁
-- 优化:拆分事务,减少锁时间
-- 方案1:先执行非关键操作
-- 执行复杂业务逻辑(不持有锁)
START TRANSACTION;
UPDATE products SET stock = stock - 1 WHERE id = 100;
COMMIT; -- 立即提交,释放锁
-- 方案2:使用乐观锁
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;
-- 更新时检查版本
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 100 AND version = ?;
-- Java实现乐观锁
public boolean decreaseStock(Long productId, Integer quantity) {
String sql = "UPDATE products SET stock = stock - ?, version = version + 1 " +
"WHERE id = ? AND version = ?";
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, quantity);
stmt.setLong(2, productId);
stmt.setInt(3, currentVersion);
int affected = stmt.executeUpdate();
return affected > 0; // 成功更新返回true
} catch (SQLException e) {
return false;
}
}
-- 方案3:使用SELECT FOR UPDATE(悲观锁)
START TRANSACTION;
-- 先查询锁定
SELECT stock FROM products WHERE id = 100 FOR UPDATE;
-- 再执行更新
UPDATE products SET stock = stock - 1 WHERE id = 100;
COMMIT;
三、数据库架构优化
3.1 分库分表策略
当单表数据量超过千万级或并发量极高时,需要考虑分库分表。
分表策略示例:
-- 按用户ID取模分表(水平分表)
-- 订单表分16张表
CREATE TABLE orders_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
created_at TIMESTAMP
);
-- orders_1, orders_2, ... orders_15
-- 分表路由函数
public class TableSharding {
private static final int TABLE_COUNT = 16;
public static String getTableName(Long userId) {
int index = (int) (userId % TABLE_COUNT);
return "orders_" + index;
}
public static void main(String[] args) {
// 示例:用户ID为12345的订单表
String tableName = getTableName(12345L);
System.out.println("表名: " + tableName); // orders_9
}
}
-- 分表查询示例
public List<Order> getOrdersByUserId(Long userId) {
String tableName = TableSharding.getTableName(userId);
String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";
// 执行查询...
return orders;
}
分库策略:
-- 按业务垂直分库
-- 用户库(user_db)
CREATE TABLE users (id BIGINT PRIMARY KEY, username VARCHAR(50));
CREATE TABLE user_profiles (user_id BIGINT, avatar VARCHAR(255));
-- 订单库(order_db)
CREATE TABLE orders (id BIGINT PRIMARY KEY, user_id BIGINT, amount DECIMAL(10,2));
CREATE TABLE order_items (order_id BIGINT, product_id BIGINT, quantity INT);
-- 支付库(payment_db)
CREATE TABLE payments (id BIGINT PRIMARY KEY, order_id BIGINT, amount DECIMAL(10,2));
3.2 缓存策略
缓存是提升高并发性能的最有效手段之一。
Redis缓存示例:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@Component
public class CacheService {
private JedisPool jedisPool;
@PostConstruct
public void init() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100); // 最大连接数
config.setMaxIdle(20); // 最大空闲连接
config.setMinIdle(5); // 最小空闲连接
config.setTestOnBorrow(true); // 获取连接时测试
jedisPool = new JedisPool(config, "localhost", 6379, 2000, "password");
}
// 缓存用户信息
public User getUserWithCache(Long userId) {
String cacheKey = "user:" + userId;
try (Jedis jedis = jedisPool.getResource()) {
// 1. 先从缓存获取
String cached = jedis.get(cacheKey);
if (cached != null) {
return JSON.parseObject(cached, User.class);
}
// 2. 缓存未命中,查询数据库
User user = userMapper.selectById(userId);
if (user != null) {
// 3. 写入缓存,设置过期时间
jedis.setex(cacheKey, 3600, JSON.toJSONString(user));
}
return user;
}
}
// 缓存穿透保护(布隆过滤器)
public User getUserWithBloomFilter(Long userId) {
String cacheKey = "user:" + userId;
String bloomKey = "bloom:user";
try (Jedis jedis = jedisPool.getResource()) {
// 检查布隆过滤器
if (!jedis.sismember(bloomKey, String.valueOf(userId))) {
// 不存在于过滤器,直接返回null
return null;
}
// 正常缓存逻辑
return getUserWithCache(userId);
}
}
// 缓存击穿保护(互斥锁)
public User getUserWithMutex(Long userId) {
String cacheKey = "user:" + userId;
String lockKey = "lock:" + userId;
try (Jedis jedis = jedisPool.getResource()) {
// 1. 尝试获取缓存
String cached = jedis.get(cacheKey);
if (cached != null) {
return JSON.parseObject(cached, User.class);
}
// 2. 获取分布式锁
String lockValue = String.valueOf(System.currentTimeMillis());
Boolean locked = jedis.set(lockKey, lockValue, "NX", "EX", 10);
if (locked) {
try {
// 3. 再次检查缓存(防止重复加载)
cached = jedis.get(cacheKey);
if (cached != null) {
return JSON.parseObject(cached, User.class);
}
// 4. 查询数据库
User user = userMapper.selectById(userId);
if (user != null) {
jedis.setex(cacheKey, 3600, JSON.toJSONString(user));
}
return user;
} finally {
// 5. 释放锁(使用Lua脚本保证原子性)
String lua = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(lua, 1, lockKey, lockValue);
}
} else {
// 6. 未获取锁,短暂等待后重试
Thread.sleep(50);
return getUserWithMutex(userId);
}
} catch (Exception e) {
return userMapper.selectById(userId);
}
}
}
3.3 消息队列削峰填谷
对于瞬时高并发写入,使用消息队列进行削峰。
RabbitMQ削峰示例:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
@Component
public class OrderMessageQueue {
private final String EXCHANGE_NAME = "order_exchange";
private final String QUEUE_NAME = "order_queue";
private final String ROUTING_KEY = "order.create";
// 生产者:接收高并发请求
public void createOrderRequest(OrderRequest request) {
// 立即返回订单创建中
String orderId = generateOrderId();
// 发送消息到队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 消息属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化
.priority(5)
.build();
// 发送消息
String message = JSON.toJSONString(request);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props, message.getBytes());
// 返回订单ID,用户可查询状态
return orderId;
} catch (Exception e) {
throw new RuntimeException("订单创建失败", e);
}
}
// 消费者:异步处理订单
@PostConstruct
public void startConsumer() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicQos(10); // 每次只预取10条消息,防止内存溢出
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
OrderRequest request = JSON.parseObject(message, OrderRequest.class);
// 实际创建订单(在可控的并发下执行)
processOrder(request);
// 确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
} catch (Exception e) {
e.printStackTrace();
}
}
private void processOrder(OrderRequest request) {
// 实际的订单处理逻辑
// 在这里可以控制处理速度,避免数据库压力
}
}
四、服务器配置优化
4.1 InnoDB缓冲池优化
InnoDB缓冲池是MySQL性能的核心,缓存数据和索引在内存中。
-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_buffer_pool_instances';
-- 推荐配置(my.cnf)
[mysqld]
# 缓冲池大小:设置为物理内存的50%-70%
innodb_buffer_pool_size = 8G
# 缓冲池实例数:1GB对应1个实例,最多64个
innodb_buffer_pool_instances = 8
# 缓冲池预热:启动时加载热数据
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON
# 刷新策略
innodb_flush_log_at_trx_commit = 1 # ACID保证(最安全)
innodb_flush_log_at_trx_commit = 2 # 性能优化(折中)
innodb_flush_log_at_trx_commit = 0 # 最高性能(有数据丢失风险)
# 日志文件大小
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
# IO线程
innodb_read_io_threads = 8
innodb_write_io_threads = 8
# 其他优化
innodb_lru_scan_depth = 1024
innodb_page_cleaners = 4
4.2 查询缓存与线程池
-- 查询缓存(MySQL 8.0已移除,5.7及之前可配置)
[mysqld]
query_cache_type = 1
query_cache_size = 128M
query_cache_limit = 2M
-- 线程池(企业版或Percona版)
[mysqld]
thread_pool_size = 16
thread_pool_algorithm = 2
thread_pool_high_prio_mode = transactions
thread_pool_high_prio_burst_limit = 32
-- 连接数优化
max_connections = 2000
thread_cache_size = 100
table_open_cache = 2000
table_definition_cache = 1000
4.3 慢查询日志监控
-- 开启慢查询日志
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 -- 记录超过1秒的查询
log_queries_not_using_indexes = 1
log_slow_admin_statements = 1
log_slow_slave_statements = 1
-- 分析慢查询日志
-- 使用mysqldumpslow
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
-- 使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
-- 使用Performance Schema监控
SELECT * FROM performance_schema.events_statements_summary_by_digest
WHERE avg_timer_wait > 1000000000
ORDER BY avg_timer_wait DESC LIMIT 10;
五、监控与预警体系
5.1 关键指标监控
MySQL性能监控脚本:
#!/bin/bash
# MySQL监控脚本
MYSQL_USER="monitor"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
# 获取MySQL状态
mysql_status() {
mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW GLOBAL STATUS LIKE '$1';" | tail -1 | awk '{print $2}'
}
# 获取MySQL变量
mysql_var() {
mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW VARIABLES LIKE '$1';" | tail -1 | awk '{print $2}'
}
# 关键指标
QPS=$(mysql_status "Queries")
TPS=$(mysql_status "Com_commit")
Connections=$(mysql_status "Threads_connected")
SlowQueries=$(mysql_status "Slow_queries")
InnodbBufferPoolHitRate=$(mysql_status "Innodb_buffer_pool_read_hit_rate")
# 计算QPS(需要间隔1秒)
sleep 1
QPS2=$(mysql_status "Queries")
QPS_RATE=$(( (QPS2 - QPS) ))
echo "=== MySQL监控指标 $(date) ==="
echo "QPS: $QPS_RATE"
echo "TPS: $TPS"
echo "连接数: $Connections / $(mysql_var "max_connections")"
echo "慢查询: $SlowQueries"
echo "Buffer Pool命中率: $InnodbBufferPoolHitRate%"
# 告警阈值
MAX_CONNECTIONS=$(mysql_var "max_connections")
if [ $Connections -gt $((MAX_CONNECTIONS * 80 / 100)) ]; then
echo "警告:连接数超过80%!"
fi
if [ $InnodbBufferPoolHitRate -lt 95 ]; then
echo "警告:Buffer Pool命中率低于95%!"
fi
5.2 性能模式监控
-- 查看最耗时的SQL
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 as avg_time_sec,
SUM_ROWS_EXAMINED,
SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 查看表IO情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_NUMBER_OF_BYTES_READ,
SUM_NUMBER_OF_BYTES_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_NUMBER_OF_BYTES_READ + SUM_NUMBER_OF_BYTES_WRITE DESC
LIMIT 10;
-- 查看索引使用情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_FETCH DESC
LIMIT 10;
六、高并发场景实战案例
6.1 秒杀系统设计
秒杀场景的核心问题:
- 瞬时高并发读
- 瞬时高并发写
- 库存超卖问题
完整解决方案:
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:order:";
private static final String USER_KEY = "seckill:user:";
/**
* 秒杀流程
*/
public SeckillResult seckill(Long seckillId, Long userId) {
String lockKey = "seckill:lock:" + seckillId;
try {
// 1. 预减库存(Redis原子操作)
Long stock = redisTemplate.opsForValue().decrement(STOCK_KEY + seckillId);
if (stock < 0) {
// 库存不足,恢复库存
redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
return SeckillResult.fail("库存不足");
}
// 2. 判断是否已秒杀成功
Boolean hasOrdered = redisTemplate.hasKey(ORDER_KEY + seckillId + ":" + userId);
if (hasOrdered) {
return SeckillResult.fail("您已秒杀成功,请勿重复下单");
}
// 3. 发送消息到队列(异步创建订单)
sendSeckillMessage(seckillId, userId);
// 4. 标记已下单
redisTemplate.opsForValue().set(ORDER_KEY + seckillId + ":" + userId, "1", 1, TimeUnit.HOURS);
return SeckillResult.success("秒杀成功,订单处理中");
} catch (Exception e) {
// 异常时恢复库存
redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
return SeckillResult.error("系统异常");
}
}
/**
* 异步处理订单(消费者)
*/
@RabbitListener(queues = "seckill_queue")
public void processSeckillMessage(SeckillMessage message) {
Long seckillId = message.getSeckillId();
Long userId = message.getUserId();
try {
// 1. 数据库扣减库存(带乐观锁)
String sql = "UPDATE seckill_goods SET stock = stock - 1 " +
"WHERE id = ? AND stock > 0";
int affected = jdbcTemplate.update(sql, seckillId);
if (affected == 0) {
// 数据库库存不足,回滚Redis
redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
return;
}
// 2. 创建订单
createOrder(seckillId, userId);
} catch (Exception e) {
// 记录失败日志,人工补偿
log.error("秒杀订单处理失败: seckillId={}, userId={}", seckillId, userId, e);
}
}
/**
* 初始化库存到Redis
*/
@PostConstruct
public void initStock() {
// 从数据库加载库存到Redis
List<SeckillGoods> goodsList = seckillGoodsMapper.selectAll();
for (SeckillGoods goods : goodsList) {
redisTemplate.opsForValue().set(STOCK_KEY + goods.getId(),
String.valueOf(goods.getStock()),
2, TimeUnit.HOURS);
}
}
}
数据库表设计:
-- 秒杀商品表
CREATE TABLE seckill_goods (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
original_price DECIMAL(10,2),
seckill_price DECIMAL(10,2),
stock INT,
start_time DATETIME,
end_time DATETIME,
version INT DEFAULT 0, -- 乐观锁版本
INDEX idx_time (start_time, end_time)
);
-- 秒杀订单表
CREATE TABLE seckill_orders (
id BIGINT PRIMARY KEY,
seckill_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status TINYINT,
created_at TIMESTAMP,
INDEX idx_user (user_id),
INDEX idx_seckill (seckill_id)
);
-- 防止重复索引
CREATE UNIQUE INDEX uk_user_seckill ON seckill_orders(user_id, seckill_id);
6.2 社交媒体热点事件
热点事件数据库优化:
-- 1. 热点数据分离
CREATE TABLE posts_hot (
id BIGINT PRIMARY KEY,
content TEXT,
like_count INT,
comment_count INT,
created_at TIMESTAMP,
INDEX idx_hot (like_count DESC, created_at DESC)
) ENGINE=InnoDB;
CREATE TABLE posts_normal (
id BIGINT PRIMARY KEY,
content TEXT,
like_count INT,
comment_count INT,
created_at TIMESTAMP
) ENGINE=InnoDB;
-- 2. 读写分离+缓存
-- 热点数据写入时同时写入Redis
-- 读取时优先读Redis,其次读posts_hot,最后读posts_normal
-- 3. 分区表(按时间)
CREATE TABLE posts (
id BIGINT,
content TEXT,
created_at TIMESTAMP
) PARTITION BY RANGE (YEAR(created_at)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
七、总结与最佳实践
7.1 高并发优化检查清单
连接层:
- [ ] 配置合适的max_connections(不超过内存限制)
- [ ] 使用连接池(HikariCP/Druid)
- [ ] 实现读写分离架构
SQL层:
- [ ] 关键查询都有合适索引
- [ ] 避免SELECT *,使用覆盖索引
- [ ] 批量操作代替单条操作
- [ ] 优化深分页查询
架构层:
- [ ] 实施缓存策略(Redis)
- [ ] 考虑分库分表
- [ ] 使用消息队列削峰
配置层:
- [ ] 优化InnoDB缓冲池
- [ ] 配置慢查询监控
- [ ] 合理设置事务隔离级别
7.2 性能优化原则
- 先监控后优化:没有数据支撑的优化是盲目的
- 二八法则:80%的性能问题由20%的SQL导致
- 空间换时间:缓存、索引都是用空间换时间
- 异步化:能异步处理的不要同步阻塞
- 分层防御:多层缓存、多级降级
7.3 持续优化建议
- 定期审查:每月审查慢查询日志
- 压力测试:定期进行全链路压测
- 容量规划:根据业务增长预测资源需求
- 故障演练:定期进行故障注入测试
- 知识沉淀:建立性能优化知识库
通过以上策略的综合应用,可以有效提升MySQL在高并发场景下的性能和稳定性,避免数据库崩溃,确保系统平稳运行。记住,数据库优化是一个持续的过程,需要根据业务发展和技术演进不断调整和优化。
