引言:理解高并发场景下的MySQL挑战

在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易处理,MySQL数据库都面临着前所未有的压力。高并发环境下,数据库性能往往成为整个系统的瓶颈,直接影响用户体验和业务稳定性。

高并发对MySQL的主要挑战包括:

  • 连接数激增:大量并发连接导致资源竞争
  • 锁竞争:行锁、表锁、元数据锁的等待和死锁
  • I/O瓶颈:磁盘读写速度跟不上内存处理速度
  • CPU负载:复杂查询和排序消耗大量CPU资源
  • 内存压力:缓冲池、排序区等内存结构竞争

本文将从索引优化查询优化架构优化三个层面,系统性地介绍MySQL高并发处理策略,并提供实战代码示例。

一、索引优化:高并发的基石

1.1 索引设计的核心原则

索引是MySQL性能优化的第一道防线。在高并发场景下,合理的索引设计可以将查询性能提升几个数量级。

1.1.1 最左前缀原则与复合索引

复合索引遵循最左前缀原则,索引(a,b,c)等价于(a), (a,b), (a,b,c)三种索引的组合。

实战示例:电商订单查询

-- 订单表结构
CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    status TINYINT NOT NULL,
    create_time DATETIME NOT NULL,
    amount DECIMAL(10,2),
    INDEX idx_user_status_time (user_id, status, create_time)
);

-- 以下查询都能有效使用索引
SELECT * FROM orders WHERE user_id = 1001; -- 使用idx_user_status_time的第一部分
SELECT * FROM orders WHERE user_id = 1001 AND status = 1; -- 使用前两部分
SELECT * FROM orders WHERE user_id = 1001 AND status = 1 AND create_time > '2024-01-01'; -- 使用全部

-- 以下查询无法使用该索引(违反最左前缀)
SELECT * FROM orders WHERE status = 1; -- 无法使用
SELECT * FROM orders WHERE create_time > '2024-01-01'; -- 无法使用
SELECT * FROM orders WHERE status = 1 AND create_time > '2024-01-01'; -- 无法使用

1.1.2 覆盖索引优化

覆盖索引是指查询所需的所有列都包含在索引中,避免回表操作。

实战示例:用户统计查询

-- 用户表结构
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status TINYINT NOT NULL,
    create_time DATETIME NOT NULL,
    INDEX idx_status_create (status, create_time)
);

-- 低效查询:需要回表获取username
SELECT username FROM users WHERE status = 1 AND create_time > '2024-01-01';

-- 高效查询:使用覆盖索引
-- 创建覆盖索引
ALTER TABLE users ADD INDEX idx_status_create_username (status, create_time, username);

-- 现在查询只需要扫描索引,无需回表
EXPLAIN SELECT username FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- Extra字段会显示"Using index",表示使用了覆盖索引

1.1.3 索引下推(ICP)优化

MySQL 5.6+ 支持索引下推,可以在索引遍历过程中就对不符合条件的记录进行过滤。

-- 用户表,复合索引 (status, create_time)
-- 开启ICP(默认开启)
SET optimizer_switch = 'index_condition_pushdown=on';

-- 查询:status=1 且 create_time > '2024-01-01'
-- ICP会在存储引擎层过滤create_time,减少回表次数
EXPLAIN SELECT * FROM users WHERE status = 1 AND create_time > '2024-01-01';
-- 查看Extra字段,如果有"Using index condition"表示ICP生效

1.2 索引维护与监控

1.2.1 索引使用情况监控

-- 查看索引使用统计(需要开启统计信息收集)
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
ORDER BY COUNT_FETCH DESC;

-- 查找未使用的索引(可能浪费写性能)
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
  AND COUNT_FETCH = 0
  AND COUNT_INSERT = 0
  AND COUNT_UPDATE = 0
  AND COUNT_DELETE = 0;

1.2.2 索引碎片整理

-- 查看表碎片率
SELECT 
    TABLE_NAME,
    ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) AS total_mb,
    ROUND(DATA_FREE / 1024 / 1024, 2) AS free_mb,
    ROUND(DATA_FREE / (DATA_LENGTH + INDEX_LENGTH) * 100, 2) AS fragment_ratio
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database'
  AND DATA_FREE > 0
ORDER BY fragment_ratio DESC;

-- 整理索引碎片(在线DDL,MySQL 5.6+)
ALTER TABLE your_table ENGINE=InnoDB;

-- 或者使用pt-online-schema-change工具(Percona Toolkit)
-- 避免锁表,适合大表
pt-online-schema-change --alter "ENGINE=InnoDB" D=your_database,t=your_table --execute

二、查询优化:让SQL飞起来

2.1 执行计划分析(EXPLAIN)

EXPLAIN是SQL优化的必备工具,能展示MySQL如何执行查询。

-- 基本用法
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;

-- 扩展用法(显示分区信息)
EXPLAIN PARTITIONS SELECT * FROM orders WHERE user_id = 1001;

-- 格式化输出(更易读)
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 1001;

EXPLAIN关键字段解读:

字段 说明 优化建议
type 访问类型(ALL/index/range/ref/const) 至少达到range,最好ref/const
key 实际使用的索引 确保使用了合适的索引
rows 预估扫描行数 越小越好
Extra 额外信息 避免”Using filesort”、”Using temporary”

2.2 常见SQL问题与优化

2.2.1 隐式类型转换导致索引失效

-- 表结构:user_id是VARCHAR类型
CREATE TABLE users (
    user_id VARCHAR(20) PRIMARY KEY,
    username VARCHAR(50)
);
CREATE INDEX idx_user_id ON users(user_id);

-- 错误:user_id是字符串,传入数字会导致索引失效
EXPLAIN SELECT * FROM users WHERE user_id = 1001;
-- type=ALL,扫描全表

-- 正确:使用正确类型
EXPLAIN SELECT * FROM users WHERE user_id = '1001';
-- type=const,使用索引

2.2.2 OR条件优化

-- 低效:OR条件可能导致索引失效
SELECT * FROM orders WHERE user_id = 1001 OR amount > 1000;

-- 高效方案1:使用UNION ALL
SELECT * FROM orders WHERE user_id = 1001
UNION ALL
SELECT * FROM orders WHERE amount > 1000 AND user_id != 1001;

-- 高效方案2:确保OR两边都有索引
ALTER TABLE orders ADD INDEX idx_user_id (user_id);
ALTER TABLE orders ADD INDEX idx_amount (amount);
-- 然后MySQL会使用index_merge

2.2.3 分页优化

-- 传统分页(深度分页问题)
SELECT * FROM orders ORDER BY id LIMIT 1000000, 20;
-- 扫描1000020行,性能极差

-- 优化方案1:延迟关联
SELECT t1.* FROM orders t1
INNER JOIN (SELECT id FROM orders ORDER BY id LIMIT 1000000, 20) t2
ON t1.id = t2.id;

-- 优化方案2:记录上次ID(适合滚动加载)
SELECT * FROM orders WHERE id > 1000000 ORDER BY id LIMIT 20;

-- 优化方案3:使用子查询
SELECT * FROM orders 
WHERE id >= (SELECT id FROM orders ORDER BY id LIMIT 1000000, 1)
ORDER BY id LIMIT 20;

2.2.4 避免SELECT *

-- 低效:SELECT * 会查询所有列,包括大文本字段
SELECT * FROM articles WHERE id = 1;

-- 高效:只查询需要的列
SELECT id, title, summary, publish_time FROM articles WHERE id = 1;

-- 更高效:使用覆盖索引
ALTER TABLE articles ADD INDEX idx_cover (id, title, summary, publish_time);
-- 查询完全在索引中完成

2.3 批量操作优化

2.3.1 批量插入优化

-- 低效:逐条插入
INSERT INTO orders (user_id, amount, status) VALUES (1, 100, 1);
INSERT INTO orders (user_id, amount, status) VALUES (2, 200, 1);
INSERT INTO orders (user_id, amount, status) VALUES (3, 300, 1);
-- 每次都要网络往返、事务提交

-- 高效:批量插入
INSERT INTO orders (user_id, amount, status) VALUES 
(1, 100, 1),
(2, 200, 1),
(3, 300, 1);
-- 一次网络往返,一个事务

-- 更高效:批量插入 + 预处理语句
-- Java示例
PreparedStatement ps = conn.prepareStatement(
    "INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)"
);
for (Order order : orderList) {
    ps.setLong(1, order.getUserId());
    ps.setBigDecimal(2, order.getAmount());
    ps.setInt(3, order.getStatus());
    ps.addBatch();
}
ps.executeBatch();

2.3.2 批量更新优化

-- 低效:逐条更新
UPDATE orders SET status = 2 WHERE id = 1;
UPDATE orders SET status = 2 WHERE id = 2;
UPDATE orders SET status = 2 WHERE id = 3;

-- 高效:CASE WHEN批量更新
UPDATE orders 
SET status = CASE 
    WHEN id = 1 THEN 2
    WHEN id = 2 THEN 2
    WHEN id = 3 THEN 2
END
WHERE id IN (1, 2, 3);

-- 或者使用INSERT ON DUPLICATE KEY UPDATE
INSERT INTO orders (id, user_id, amount, status) VALUES 
(1, 1001, 100, 2),
(2, 1002, 200, 2),
(3, 1003, 300, 2)
ON DUPLICATE KEY UPDATE status = VALUES(status);

2.4 事务与锁优化

2.4.1 事务隔离级别选择

-- 查看当前隔离级别
SELECT @@transaction_isolation;

-- 设置隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 不同级别对性能的影响:
-- READ UNCOMMITTED:最低一致性,最高性能(不推荐)
-- READ COMMITTED:平衡选择,大多数场景推荐
-- REPEATABLE READ:默认级别,一致性高,并发性能中等
-- SERIALIZABLE:最高一致性,最低性能(极少使用)

2.4.2 减少锁持有时间

-- 低效:在事务中执行耗时操作
START TRANSACTION;
-- 查询用户信息(持有锁)
SELECT * FROM users WHERE id = 1001 FOR UPDATE;
-- 调用外部API(长时间持有锁,阻塞其他事务)
callExternalAPI();
-- 更新状态
UPDATE users SET status = 2 WHERE id = 1001;
COMMIT;

-- 高效:将耗时操作移到事务外
-- 1. 先查询(不锁定)
SELECT * FROM users WHERE id = 1001;
-- 2. 执行外部API
callExternalAPI();
-- 3. 在事务中快速完成数据库操作
START TRANSACTION;
SELECT * FROM users WHERE id = 1001 FOR UPDATE;
UPDATE users SET status = 2 WHERE id = 1001;
COMMIT;

2.4.3 死锁预防

-- 死锁场景:两个事务以不同顺序更新行
-- 事务A:UPDATE orders SET status=2 WHERE id=1; UPDATE orders SET status=2 WHERE id=2;
-- 事务B:UPDATE orders SET status=2 WHERE id=2; UPDATE orders SET status=2 WHERE id=1;

-- 预防方案1:固定加锁顺序
-- 所有事务按ID顺序更新
UPDATE orders SET status=2 WHERE id IN (1,2) ORDER BY id;

-- 预防方案2:使用SELECT ... FOR UPDATE NOWAIT
-- MySQL 8.0+ 支持
START TRANSACTION;
SELECT * FROM orders WHERE id = 1 FOR UPDATE NOWAIT;
-- 如果获取锁失败,立即报错,不会等待
COMMIT;

-- 预防方案3:乐观锁
ALTER TABLE orders ADD COLUMN version INT DEFAULT 0;

UPDATE orders 
SET status = 2, version = version + 1 
WHERE id = 1 AND version = 0;

-- 检查影响行数,如果为0说明被其他事务修改

三、架构优化:从单机到分布式

3.1 读写分离架构

读写分离是提升MySQL并发能力的核心架构方案,通过将读请求分发到从库,写请求发送到主库,大幅提升系统吞吐量。

3.1.1 主从复制原理

MySQL主从复制基于binlog(二进制日志)实现:

  1. 主库:将变更写入binlog
  2. 从库I/O线程:拉取binlog并写入relay log
  3. 从库SQL线程:执行relay log中的SQL

3.1.2 主从复制配置实战

主库配置(Master):

# my.cnf 或 my.ini
[mysqld]
server-id = 1
log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW  # 推荐ROW格式
expire_logs_days = 7
max_binlog_size = 100M

从库配置(Slave):

[mysqld]
server-id = 2
relay_log = /var/lib/mysql/mysql-relay-bin
log_bin = /var/lib/mysql/mysql-bin
read_only = 1  # 防止误写入从库

配置步骤:

-- 1. 在主库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'Repl@123456';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 2. 在主库锁定表(获取一致位置)
FLUSH TABLES WITH READ LOCK;

-- 3. 查看主库状态
SHOW MASTER STATUS;
-- 记录 File 和 Position 值,例如:mysql-bin.000001, 154

-- 4. 在从库配置主库信息
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='Repl@123456',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=154;

-- 5. 启动从库复制
START SLAVE;

-- 6. 检查复制状态
SHOW SLAVE STATUS\G
-- 确保以下两项为Yes:
-- Slave_IO_Running: Yes
-- Slave_SQL_Running: Yes
-- Seconds_Behind_Master: 0(表示无延迟)

-- 7. 在主库解锁表
UNLOCK TABLES;

3.1.3 读写分离实现方案

方案1:应用层实现(推荐)

// Spring Boot + ShardingSphere 配置示例
@Configuration
public class DataSourceConfig {
    
    @Bean
    public DataSource masterDataSource() {
        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl("jdbc:mysql://master:3306/mydb");
        ds.setUsername("root");
        ds.setPassword("password");
        return ds;
    }
    
    @Bean
    public DataSource slaveDataSource() {
        HikariDataSource ds = new HikariDataSource();
        ds.setJdbcUrl("jdbc:mysql://slave:3306/mydb");
        ds.setUsername("root");
        ds.setPassword("password");
        return ds;
    }
    
    @Bean
    public DataSource routingDataSource() {
        ReplicationDataSourceRouter router = new ReplicationDataSourceRouter();
        Map<String, DataSource> dataSourceMap = new HashMap<>();
        dataSourceMap.put("master", masterDataSource());
        dataSourceMap.put("slave", slaveDataSource());
        router.setTargetDataSources(dataSourceMap);
        router.setDefaultTargetDataSource(masterDataSource());
        return router;
    }
}

方案2:使用Proxy中间件(如MySQL Router)

# MySQL Router 配置
[DEFAULT]
logging_folder = /var/log/mysqlrouter
plugin_folder = /usr/lib/mysqlrouter/lib
runtime_folder = /var/run/mysqlrouter
config_folder = /etc/mysqlrouter

[logger]
level = INFO

[routing:primary]
bind_address = 0.0.0.0
bind_port = 3306
destinations = master:3306
routing_strategy = first_available

[routing:replicas]
bind_address = 0.0.0.0
bind_port = 3307
destinations = slave1:3306,slave2:3306
routing_strategy = round_robin

# 启动
mysqlrouter --config /etc/mysqlrouter/mysqlrouter.conf

方案3:使用ShardingSphere(功能最强大)

# sharding.yaml
dataSources:
  master_ds: !!org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory
    jdbcUrl: jdbc:mysql://master:3306/mydb
    username: root
    password: password
  slave_ds: !!org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory
    jdbcUrl: jdbc:mysql://slave:3306/mydb
    username: root
    password: password

rules:
  - !REPLICATION
    dataSources:
      - name: pr_ds
        primaryDataSourceName: master_ds
        replicaDataSourceNames:
          - slave_ds
    loadBalancers:
      name: ROUND_ROBIN
        type: ROUND_ROBIN
        props:
          workload: 1

3.1.4 读写分离的注意事项

1. 主从延迟问题

-- 在业务代码中处理延迟
public Order getOrder(Long id) {
    // 先从从库查询
    Order order = slaveOrderMapper.selectById(id);
    if (order == null) {
        // 从库查不到,说明可能延迟,从主库查
        order = masterOrderMapper.selectById(id);
    }
    return order;
}

-- 或者强制走主库的场景
-- 1. 刚写入的数据立即查询
-- 2. 需要强一致性的业务(如支付后查询余额)
-- 3. 使用事务的查询

2. 主从数据一致性校验

# 使用pt-table-checksum校验主从一致性
pt-table-checksum h=master,u=root,p=password --databases=mydb

# 输出示例:
# TS ERRORS  DIFFS     ROWS  CHUNKS SKIPPED    TIME TABLE
# 2024-01-01T10:00:00      0      1     1000       2       0   0.501 mydb.orders

# DIFFS=1 表示该表主从数据不一致

3. 监控主从延迟

-- 方法1:查看Seconds_Behind_Master
SHOW SLAVE STATUS\G
-- Seconds_Behind_Master: 0  -- 延迟秒数

-- 方法2:使用percona工具
pt-heartbeat --update -h master -u root -p password --database mydb
pt-heartbeat --monitor -h slave -u root -p password --database mydb

-- 方法3:在业务表中记录时间戳
ALTER TABLE orders ADD COLUMN write_time DATETIME;
INSERT INTO orders (id, write_time) VALUES (1, NOW());
-- 在从库查询该时间戳,计算延迟

3.2 分库分表(Sharding)

当单表数据量超过5000万或单库连接数超过500时,需要考虑分库分表。

3.2.1 垂直分库

-- 原始单库
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE users (...);
CREATE TABLE orders (...);
CREATE TABLE products (...);

-- 垂直分库后
-- 用户库
CREATE DATABASE user_db;
CREATE TABLE user_db.users (...);

-- 订单库
CREATE DATABASE order_db;
CREATE TABLE order_db.orders (...);

-- 商品库
CREATE DATABASE product_db;
CREATE TABLE product_db.products (...);

3.2.2 水平分表

-- 按用户ID取模分表(订单表)
-- 订单表拆分为16张表:orders_0 到 orders_15

-- 创建分表结构
CREATE TABLE orders_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    amount DECIMAL(10,2),
    create_time DATETIME,
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB;

-- 复制结构创建orders_1到orders_15
-- ...

-- 分表路由逻辑(Java示例)
public class OrderTableRouter {
    private static final int TABLE_COUNT = 16;
    
    public String getTableName(Long userId) {
        int index = (int) (userId % TABLE_COUNT);
        return "orders_" + index;
    }
    
    public void insertOrder(Order order) {
        String tableName = getTableName(order.getUserId());
        String sql = "INSERT INTO " + tableName + " (user_id, amount) VALUES (?, ?)";
        // 执行SQL
    }
}

3.2.3 使用ShardingSphere实现分库分表

# sharding.yaml
dataSources:
  ds_0: !!org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory
    jdbcUrl: jdbc:mysql://192.168.1.10:3306/db_0
    username: root
    password: password
  ds_1: !!org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory
    jdbcUrl: jdbc:mysql://192.168.1.11:3306/db_1
    username: root
    password: password

rules:
  - !SHARDING
    tables:
      orders:
        actualDataNodes: ds_${0..1}.orders_${0..15}
        tableStrategy:
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: mod
        databaseStrategy:
          standard:
            shardingColumn: user_id
            shardingAlgorithmName: db_mod
    shardingAlgorithms:
      mod:
        type: MOD
        props:
          sharding-count: 16
      db_mod:
        type: MOD
        props:
          sharding-count: 2

3.3 连接池优化

3.3.1 HikariCP配置(推荐)

HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://master:3306/mydb");
config.setUsername("root");
config.setPassword("password");

// 核心配置
config.setMaximumPoolSize(50);           // 最大连接数
config.setMinimumIdle(10);               // 最小空闲连接
config.setConnectionTimeout(30000);      // 连接超时(毫秒)
config.setIdleTimeout(600000);           // 空闲超时(毫秒)
config.setMaxLifetime(1800000);          // 连接最大存活时间(毫秒)
config.setLeakDetectionThreshold(60000); // 泄漏检测阈值

// 性能优化
config.setReadOnly(false);               // 默认读写
config.setAutoCommit(true);              // 自动提交
config.setPoolName("MasterPool");        // 连接池名称

// MySQL特定优化
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
config.addDataSourceProperty("elideSetAutoCommits", "true");

HikariDataSource ds = new HikariDataSource(config);

3.3.2 连接池监控

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 查看连接线程缓存
SHOW STATUS LIKE 'Threads_cache%';

-- 查看连接拒绝数(重要)
SHOW STATUS LIKE 'Connection_errors_max_connection';
SHOW STATUS LIKE 'Connection_errors_internal';
// HikariCP监控指标
HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
System.out.println("总连接数: " + poolMXBean.getTotalConnections());
System.out.println("等待连接数: " + poolMXBean.getThreadsAwaitingConnection());

3.4 缓存策略

3.4.1 多级缓存架构

用户请求
    ↓
本地缓存(Caffeine)
    ↓
分布式缓存(Redis)
    ↓
MySQL

3.4.2 Redis缓存实战

// 缓存穿透保护
public Order getOrder(Long id) {
    String key = "order:" + id;
    
    // 1. 先查缓存
    String cached = redisTemplate.opsForValue().get(key);
    if (cached != null) {
        if ("null".equals(cached)) {
            return null; // 缓存空值,防止穿透
        }
        return JSON.parseObject(cached, Order.class);
    }
    
    // 2. 缓存未命中,查数据库
    Order order = orderMapper.selectById(id);
    
    // 3. 写入缓存
    if (order == null) {
        // 缓存空值,设置较短过期时间
        redisTemplate.opsForValue().set(key, "null", 5, TimeUnit.MINUTES);
    } else {
        redisTemplate.opsForValue().set(key, JSON.toJSONString(order), 30, TimeUnit.MINUTES);
    }
    
    return order;
}

// 缓存更新策略(先更新数据库,再删除缓存)
@Transactional
public void updateOrder(Order order) {
    // 1. 更新数据库
    orderMapper.updateById(order);
    
    // 2. 删除缓存(让下次查询重新加载)
    String key = "order:" + order.getId();
    redisTemplate.delete(key);
}

3.4.3 缓存与数据库一致性

// 使用分布式锁保证一致性
public void updateOrderWithLock(Order order) {
    String lockKey = "lock:order:" + order.getId();
    String lockValue = UUID.randomUUID().toString();
    
    try {
        // 1. 获取分布式锁(Redis SETNX)
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
        
        if (!locked) {
            throw new RuntimeException("系统繁忙,请稍后重试");
        }
        
        // 2. 更新数据库
        orderMapper.updateById(order);
        
        // 3. 删除缓存
        redisTemplate.delete("order:" + order.getId());
        
    } finally {
        // 4. 释放锁(使用Lua脚本保证原子性)
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                          "return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), 
                             Collections.singletonList(lockKey), lockValue);
    }
}

3.5 异步处理与消息队列

3.5.1 异步写入数据库

// 使用消息队列削峰
public void createOrderAsync(Order order) {
    // 1. 写入消息队列(立即返回)
    rabbitTemplate.convertAndSend("order.create", order);
    
    // 2. 返回给用户(快速响应)
    return "订单已提交,正在处理";
}

// 消费者异步写入数据库
@RabbitListener(queues = "order.create")
public void processOrderCreate(Order order) {
    try {
        // 批量插入数据库
        orderMapper.insert(order);
        
        // 发送成功事件
        eventPublisher.publishEvent(new OrderCreatedEvent(order));
        
    } catch (Exception e) {
        // 失败重试或死信队列
        log.error("订单创建失败", e);
        throw new AmqpRejectAndDontRequeueException("重试次数过多");
    }
}

3.5.2 批量处理优化

// 批量插入优化
public void batchInsertOrders(List<Order> orders) {
    // 每1000条一批
    int batchSize = 1000;
    for (int i = 0; i < orders.size(); i += batchSize) {
        int end = Math.min(i + batchSize, orders.size());
        List<Order> batch = orders.subList(i, end);
        
        // 使用MyBatis批量插入
        orderMapper.batchInsert(batch);
        
        // 每批提交后休息一下,避免数据库压力过大
        if (i + batchSize < orders.size()) {
            Thread.sleep(10);
        }
    }
}

// MyBatis Mapper
@Mapper
public interface OrderMapper {
    @InsertProvider(type = OrderSqlProvider.class, method = "batchInsert")
    void batchInsert(@Param("orders") List<Order> orders);
}

// SQL提供者
public class OrderSqlProvider {
    public String batchInsert(Map<String, Object> param) {
        List<Order> orders = (List<Order>) param.get("orders");
        StringBuilder sql = new StringBuilder();
        sql.append("INSERT INTO orders (user_id, amount, status, create_time) VALUES ");
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            if (i > 0) sql.append(",");
            sql.append(String.format("(%d, %f, %d, '%s')",
                order.getUserId(),
                order.getAmount(),
                order.getStatus(),
                order.getCreateTime()));
        }
        return sql.toString();
    }
}

四、MySQL配置优化

4.1 核心参数调优

4.1.1 InnoDB引擎参数

# my.cnf 核心配置
[mysqld]

# 连接相关
max_connections = 1000          # 最大连接数,根据业务调整
max_user_connections = 800      # 单用户最大连接数
thread_cache_size = 100         # 线程缓存,减少线程创建开销

# InnoDB缓冲池(最重要)
innodb_buffer_pool_size = 16G   # 物理内存的50-70%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_buffer_pool_dump_at_shutdown = 1 # 关闭时保存热数据
innodb_buffer_pool_load_at_startup = 1  # 启动时加载热数据

# 日志文件
innodb_log_file_size = 2G       # 日志文件大小,建议1-2G
innodb_log_buffer_size = 64M    # 日志缓冲区
innodb_flush_log_at_trx_commit = 1 # 1:每次提交刷盘(安全)2:每秒刷盘(性能)

# I/O相关
innodb_flush_method = O_DIRECT  # 绕过OS缓存
innodb_io_capacity = 2000       # IOPS,SSD可设更高
innodb_io_capacity_max = 4000   # 最大IOPS

# 并发相关
innodb_thread_concurrency = 0   # 0表示不限制
innodb_read_io_threads = 8      # 读线程数
innodb_write_io_threads = 8     # 写线程数

# 事务相关
innodb_lock_wait_timeout = 50   # 锁等待超时(秒)
innodb_rollback_on_timeout = 0  # 超时是否回滚整个事务

# 查询缓存(MySQL 8.0已移除,5.7及之前)
# query_cache_type = 0          # 建议关闭
# query_cache_size = 0

# 临时表
tmp_table_size = 256M           # 临时表大小
max_heap_table_size = 256M      # 内存表最大大小

# 排序
sort_buffer_size = 4M           # 每个线程的排序缓冲区
read_buffer_size = 4M           # 顺序读缓冲区
read_rnd_buffer_size = 8M       # 随机读缓冲区

# 连接超时
wait_timeout = 600              # 非交互连接超时(秒)
interactive_timeout = 600       # 交互连接超时(秒)

# 日志
slow_query_log = 1              # 开启慢查询日志
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2             # 慢查询阈值(秒)
log_queries_not_using_indexes = 1 # 记录未使用索引的查询

# 复制相关(主从)
server_id = 1                   # 唯一ID
log_bin = /var/lib/mysql/mysql-bin
binlog_format = ROW             # ROW格式
sync_binlog = 1                 # 每次提交同步binlog(安全)
expire_logs_days = 7            # binlog保留天数

# 性能监控
performance_schema = ON         # 性能模式
innodb_monitor_enable = all     # 启用InnoDB监控

4.1.2 配置查看与调整

-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit';

-- 动态调整(部分参数支持)
SET GLOBAL innodb_buffer_pool_size = 17179869184; -- 16G
SET GLOBAL max_connections = 1000;
SET GLOBAL slow_query_log = 1;

-- 查看运行状态
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Innodb_buffer_pool_pages_dirty';
SHOW STATUS LIKE 'Innodb_row_lock_waits';

4.2 慢查询分析

4.2.1 开启慢查询日志

-- 开启慢查询日志
SET GLOBAL slow_query_log = 1;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 2;  -- 2秒以上的查询记录
SET GLOBAL log_queries_not_using_indexes = 1;

-- 查看配置
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

4.2.2 使用pt-query-digest分析

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 输出示例:
# # 120s time, 10M bytes, 10000 queries
# 
# # Rank Query ID           Response time    Calls R/Call V/M   Item
# # ==== ================== ================ ===== ====== ===== ====
# #    1 0xF9A57DD5A4C...  52000.0000 52.0%  5000 10.4000  0.12 SELECT orders
# #    2 0xB423D4A5C21...  18000.0000 18.0%  2000  9.0000  0.08 SELECT users

4.2.3 使用Performance Schema分析

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看全表扫描的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
WHERE SUM_ROWS_EXAMINED > 10000
  AND SUM_ROWS_SENT / SUM_ROWS_EXAMINED < 0.01
ORDER BY SUM_ROWS_EXAMINED DESC;

4.3 锁监控与优化

4.3.1 查看锁等待

-- 查看当前锁等待(MySQL 5.7+)
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

-- 查看锁信息
SHOW ENGINE INNODB STATUS\G
-- 查看TRANSACTIONS部分

4.3.2 查看元数据锁

-- 查看MDL锁等待
SELECT 
    ps.id AS processlist_id,
    ps.user,
    ps.host,
    ps.db,
    ps.command,
    ps.time,
    ps.state,
    ps.info,
    mdl.lock_type,
    mdl.lock_status
FROM performance_schema.metadata_locks mdl
INNER JOIN performance_schema.threads t ON mdl.object_instance_begin = t.thread_id
INNER JOIN information_schema.processlist ps ON t.processlist_id = ps.id
WHERE mdl.object_schema = 'your_database';

五、实战案例:秒杀系统优化

5.1 秒杀场景特点

  • 瞬时高并发:QPS可达10万+
  • 库存竞争:大量请求同时扣减库存
  • 数据一致性:超卖问题
  • 用户体验:快速响应

5.2 秒杀系统架构设计

用户请求
    ↓
Nginx(限流)
    ↓
Redis(预扣库存)
    ↓
消息队列(削峰)
    ↓
MySQL(异步落单)

5.3 核心代码实现

5.3.1 Redis预扣库存

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 秒杀下单
     */
    public Result seckill(Long userId, Long itemId) {
        String stockKey = STOCK_KEY + itemId;
        String orderKey = ORDER_KEY + itemId + ":" + userId;
        
        // 1. 防止重复下单
        if (redisTemplate.hasKey(orderKey)) {
            return Result.fail("您已参与过秒杀");
        }
        
        // 2. 原子扣减库存(Lua脚本)
        String luaScript = 
            "if redis.call('get', KEYS[1]) >= '1' then " +
            "   redis.call('decr', KEYS[1]); " +
            "   return 1; " +
            "else " +
            "   return 0; " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList(stockKey)
        );
        
        if (result == 0) {
            return Result.fail("库存不足");
        }
        
        // 3. 标记已下单(5分钟过期)
        redisTemplate.opsForValue().set(orderKey, "1", 5, TimeUnit.MINUTES);
        
        // 4. 发送消息到队列(异步创建订单)
        SeckillMessage message = new SeckillMessage(userId, itemId);
        rabbitTemplate.convertAndSend("seckill.order", message);
        
        return Result.success("秒杀成功,订单处理中");
    }
    
    /**
     * 初始化库存
     */
    public void initStock(Long itemId, Integer stock) {
        redisTemplate.opsForValue().set(STOCK_KEY + itemId, stock.toString());
    }
}

5.3.2 消息队列异步下单

@Component
public class SeckillOrderConsumer {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @RabbitListener(queues = "seckill.order")
    public void processSeckillOrder(SeckillMessage message) {
        Long userId = message.getUserId();
        Long itemId = message.getItemId();
        
        try {
            // 1. 检查库存(双重检查)
            Integer stock = Integer.parseInt(
                redisTemplate.opsForValue().get(STOCK_KEY + itemId)
            );
            if (stock <= 0) {
                // 库存不足,回滚Redis
                redisTemplate.opsForValue().increment(STOCK_KEY + itemId);
                return;
            }
            
            // 2. 创建订单(批量插入优化)
            Order order = new Order();
            order.setUserId(userId);
            order.setItemId(itemId);
            order.setStatus(1);
            order.setCreateTime(new Date());
            
            // 使用批量插入,减少数据库压力
            orderMapper.insert(order);
            
            // 3. 发送订单成功事件
            eventPublisher.publishEvent(new SeckillSuccessEvent(order));
            
        } catch (Exception e) {
            // 4. 异常处理:回滚库存
            redisTemplate.opsForValue().increment(STOCK_KEY + itemId);
            log.error("秒杀订单处理失败", e);
        }
    }
}

5.3.3 MySQL表结构优化

-- 秒杀订单表(简化版)
CREATE TABLE seckill_orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    item_id BIGINT NOT NULL,
    status TINYINT NOT NULL DEFAULT 1,
    create_time DATETIME NOT NULL,
    UNIQUE KEY uk_user_item (user_id, item_id)  -- 防止重复下单
) ENGINE=InnoDB;

-- 按时间分表(每月一张表)
-- orders_202401, orders_202402, ...
-- 使用存储过程创建分表
DELIMITER $$
CREATE PROCEDURE create_monthly_table(IN month_str VARCHAR(6))
BEGIN
    SET @table_name = CONCAT('seckill_orders_', month_str);
    SET @sql = CONCAT('CREATE TABLE IF NOT EXISTS ', @table_name, ' (
        id BIGINT PRIMARY KEY AUTO_INCREMENT,
        user_id BIGINT NOT NULL,
        item_id BIGINT NOT NULL,
        status TINYINT NOT NULL DEFAULT 1,
        create_time DATETIME NOT NULL,
        UNIQUE KEY uk_user_item (user_id, item_id)
    ) ENGINE=InnoDB');
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END$$
DELIMITER ;

5.3.4 Nginx限流配置

# 限制单个IP每秒最多10个请求
limit_req_zone $binary_remote_addr zone=seckill:10m rate=10r/s;

server {
    location /seckill {
        limit_req zone=seckill burst=20 nodelay;
        limit_req_status 429;
        
        proxy_pass http://backend;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

5.4 监控与告警

// 埋点监控
@Component
public class SeckillMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    public void recordSeckill(Long itemId, boolean success) {
        // QPS监控
        meterRegistry.counter("seckill.request", "item", itemId.toString()).increment();
        
        // 成功率
        meterRegistry.counter("seckill.success", "item", itemId.toString())
            .increment(success ? 1 : 0);
        
        // 响应时间
        Timer timer = Timer.builder("seckill.duration")
            .tag("item", itemId.toString())
            .register(meterRegistry);
        
        // 使用示例
        timer.record(() -> {
            // 执行秒杀逻辑
        });
    }
}

六、监控与告警体系

6.1 性能监控指标

6.1.1 关键指标采集

-- 1. QPS(每秒查询数)
SHOW GLOBAL STATUS LIKE 'Queries';
-- 计算:(当前值 - 上次值) / 时间间隔

-- 2. TPS(每秒事务数)
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';
-- TPS = (Com_commit + Com_rollback) / 时间间隔

-- 3. 连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 4. 缓冲池命中率
SHOW STATUS LIKE 'Innodb_buffer_pool_read_requests'; -- 逻辑读
SHOW STATUS LIKE 'Innodb_buffer_pool_reads';          -- 物理读
-- 命中率 = (1 - 物理读 / 逻辑读) * 100%,应 > 99%

-- 5. 锁等待
SHOW STATUS LIKE 'Innodb_row_lock_waits';
SHOW STATUS LIKE 'Innodb_row_lock_time_avg';

-- 6. 慢查询数
SHOW STATUS LIKE 'Slow_queries';

6.1.2 使用Prometheus + Grafana监控

# mysqld_exporter配置
# docker-compose.yml
version: '3'
services:
  mysqld_exporter:
    image: prom/mysqld-exporter
    environment:
      DATA_SOURCE_NAME: "user:password@(mysql:3306)/"
    ports:
      - "9104:9104"
    depends_on:
      - mysql

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
# prometheus.yml
scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysqld_exporter:9104']

6.2 告警规则

# alert_rules.yml
groups:
- name: mysql_alerts
  rules:
  - alert: MySQLDown
    expr: up{job="mysql"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "MySQL实例 {{ $labels.instance }} 宕机"
      
  - alert: MySQLHighConnections
    expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL连接数过高"
      
  - alert: MySQLSlowQueries
    expr: rate(mysql_global_status_slow_queries[5m]) > 5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL慢查询过多"
      
  - alert: MySQLReplicationLag
    expr: mysql_slave_lag_seconds > 30
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL主从延迟过高"

6.3 自动化运维脚本

#!/bin/bash
# mysql_health_check.sh

MYSQL_HOST="localhost"
MYSQL_USER="monitor"
MYSQL_PASS="password"

# 检查MySQL是否存活
check_mysql_up() {
    mysqladmin ping -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS > /dev/null 2>&1
    if [ $? -ne 0 ]; then
        echo "CRITICAL: MySQL is down"
        exit 2
    fi
}

# 检查连接数
check_connections() {
    CONN=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Threads_connected'" | grep Threads_connected | awk '{print $2}')
    MAX_CONN=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'max_connections'" | grep max_connections | awk '{print $2}')
    
    PERCENT=$((CONN * 100 / MAX_CONN))
    
    if [ $PERCENT -gt 90 ]; then
        echo "CRITICAL: Connections ${PERCENT}% (${CONN}/${MAX_CONN})"
        exit 2
    elif [ $PERCENT -gt 70 ]; then
        echo "WARNING: Connections ${PERCENT}% (${CONN}/${MAX_CONN})"
        exit 1
    fi
}

# 检查主从延迟
check_replication_lag() {
    LAG=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW SLAVE STATUS\G" | grep Seconds_Behind_Master | awk '{print $2}')
    
    if [ "$LAG" != "0" ] && [ "$LAG" -gt 30 ]; then
        echo "WARNING: Replication lag ${LAG}s"
        exit 1
    fi
}

# 检查慢查询
check_slow_queries() {
    SLOW=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW GLOBAL STATUS LIKE 'Slow_queries'" | grep Slow_queries | awk '{print $2}')
    
    # 记录到文件,计算增量
    LAST_SLOW=0
    if [ -f /tmp/last_slow ]; then
        LAST_SLOW=$(cat /tmp/last_slow)
    fi
    
    DELTA=$((SLOW - LAST_SLOW))
    echo $SLOW > /tmp/last_slow
    
    if [ $DELTA -gt 10 ]; then
        echo "WARNING: ${DELTA} slow queries in last check"
        exit 1
    fi
}

# 主函数
main() {
    check_mysql_up
    check_connections
    check_replication_lag
    check_slow_queries
    echo "OK: MySQL is healthy"
    exit 0
}

main

七、总结与最佳实践

7.1 高并发优化 checklist

索引优化

  • [ ] 所有查询都有合适的索引
  • [ ] 复合索引遵循最左前缀原则
  • [ ] 避免索引失效的场景(隐式转换、OR等)
  • [ ] 定期检查未使用索引并清理
  • [ ] 监控索引碎片率,定期整理

查询优化

  • [ ] 使用EXPLAIN分析慢查询
  • [ ] 避免SELECT *,只查询需要的列
  • [ ] 优化分页查询(延迟关联、记录上次ID)
  • [ ] 批量操作代替单条操作
  • [ ] 减少事务持有时间

架构优化

  • [ ] 实现读写分离
  • [ ] 使用Redis缓存热点数据
  • [ ] 异步处理耗时操作(消息队列)
  • [ ] 连接池参数调优
  • [ ] 监控主从延迟

配置优化

  • [ ] innodb_buffer_pool_size设置为物理内存50-70%
  • [ ] 开启慢查询日志
  • [ ] 调整innodb_flush_log_at_trx_commit(根据业务权衡)
  • [ ] 优化线程缓存(thread_cache_size)
  • [ ] 设置合理的超时时间

7.2 常见误区

  1. 盲目增加索引:索引会降低写性能,需要平衡
  2. 过度分库分表:增加复杂度,应在单表性能优化后考虑
  3. 忽视监控:没有监控就无法持续优化
  4. 配置不当:默认配置不适合高并发场景
  5. 事务过大:事务中包含耗时操作,阻塞其他请求

7.3 持续优化建议

  1. 建立性能基线:记录正常业务指标,便于对比
  2. 定期压测:模拟高并发场景,发现瓶颈
  3. 代码审查:SQL必须经过审查才能上线
  4. 灰度发布:新功能先小流量验证
  5. 故障演练:定期演练主从切换、宕机恢复

7.4 推荐工具与资源

  • Percona Toolkit:MySQL管理工具集
  • pt-query-digest:慢查询分析
  • pt-online-schema-change:在线DDL
  • sys schema:MySQL内置性能视图
  • Prometheus + Grafana:监控告警
  • Arthas:Java应用诊断
  • Wireshark:网络抓包分析

通过系统性地应用这些策略,MySQL可以在高并发场景下保持稳定和高性能。记住,优化是一个持续的过程,需要根据业务变化和监控数据不断调整。