在现代互联网应用中,高并发场景下数据库往往成为系统瓶颈。MySQL作为最流行的关系型数据库,其性能优化需要从多个层面综合考虑。本文将深入探讨从基础的索引优化到高级的读写分离策略,帮助您系统性地解决MySQL高并发瓶颈问题。

一、高并发场景下的数据库瓶颈分析

1.1 什么是数据库瓶颈

数据库瓶颈是指在系统负载增加时,数据库无法线性扩展处理能力,导致响应时间急剧增加或吞吐量下降的现象。在高并发场景下,主要表现为:

  • CPU瓶颈:大量复杂查询导致CPU饱和
  • I/O瓶颈:磁盘读写速度跟不上请求速度
  • 内存瓶颈:缓冲池不足导致频繁磁盘I/O
  • 锁竞争:行锁、表锁导致的等待和死锁

1.2 识别瓶颈的方法

在优化之前,我们需要准确识别瓶颈所在:

-- 查看MySQL当前连接和运行状态
SHOW STATUS LIKE 'Threads_%';
SHOW STATUS LIKE 'Connections';
SHOW STATUS LIKE 'Slow_queries';

-- 查看InnoDB引擎状态
SHOW ENGINE INNODB STATUS\G

-- 查看当前正在执行的查询
SHOW PROCESSLIST;

-- 查看性能模式数据(MySQL 5.6+)
SELECT * FROM performance_schema.events_statements_summary_by_digest 
ORDER BY SUM_TIMER_WAIT DESC LIMIT 10;

通过这些命令,我们可以获取数据库的实时状态,识别慢查询、锁等待等性能问题。

二、索引优化策略

2.1 索引基础与最佳实践

索引是提升查询性能最有效的手段。合理使用索引可以将查询性能提升几个数量级。

2.1.1 索引类型选择

-- 主键索引(自动创建)
CREATE TABLE users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 唯一索引
CREATE UNIQUE INDEX idx_email ON users(email);

-- 普通索引
CREATE INDEX idx_username ON users(username);

-- 联合索引(最左前缀原则)
CREATE INDEX idx_username_email ON users(username, email);

-- 全文索引(用于文本搜索)
CREATE TABLE articles (
    id INT PRIMARY KEY,
    title VARCHAR(200),
    content TEXT,
    FULLTEXT INDEX idx_content (content)
);

2.1.2 索引设计原则

原则1:选择性高的列优先建立索引

-- 好的索引:选择性高
CREATE INDEX idx_email ON users(email);  -- 通常每个邮箱唯一

-- 差的索引:选择性低
CREATE INDEX idx_gender ON users(gender);  -- 只有几种值

原则2:覆盖索引减少回表

-- 原始查询(需要回表)
SELECT username, email FROM users WHERE username = 'john';

-- 优化:创建覆盖索引
CREATE INDEX idx_username_email ON users(username, email);

-- 查询计划将显示"Using index",避免回表操作
EXPLAIN SELECT username, email FROM users WHERE username = 'john';

原则3:前缀索引处理长文本

-- 对长VARCHAR列使用前缀索引
CREATE INDEX idx_email_prefix ON users(email(20));

-- 计算合适的前缀长度
SELECT 
    COUNT(DISTINCT LEFT(email, 10)) / COUNT(*) AS sel_10,
    COUNT(DISTINCT LEFT(email, 15)) / COUNT(*) AS sel_15,
    COUNT(DISTINCT LEFT(email, 20)) / COUNT(*) AS sel_20,
    COUNT(DISTINCT email) / COUNT(*) AS sel_full
FROM users;

2.2 索引优化实战案例

案例1:电商订单查询优化

问题场景:查询用户最近30天的订单,性能缓慢。

-- 原始表结构
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id INT NOT NULL,
    order_status TINYINT NOT NULL,
    order_amount DECIMAL(10,2),
    create_time DATETIME NOT NULL,
    update_time DATETIME
);

-- 原始查询
SELECT * FROM orders 
WHERE user_id = 12345 
  AND order_status = 1 
  AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY create_time DESC;

优化步骤

  1. 分析查询计划
EXPLAIN SELECT * FROM orders 
WHERE user_id = 12345 
  AND order_status = 1 
  AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY)
ORDER BY create_time DESC;
  1. 创建合适的索引
-- 方案A:联合索引(推荐)
CREATE INDEX idx_user_status_time ON orders(user_id, order_status, create_time);

-- 方案B:如果只需要部分字段,使用覆盖索引
CREATE INDEX idx_user_status_time_cover ON orders(user_id, order_status, create_time, order_amount);
  1. 验证优化效果
-- 查看索引使用情况
SHOW INDEX FROM orders;

-- 查看执行计划
EXPLAIN SELECT order_id, order_amount FROM orders 
WHERE user_id = 12345 
  AND order_status = 1 
  AND create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY);

案例2:多条件组合查询

问题场景:商品搜索功能,支持多条件筛选。

-- 商品表
CREATE TABLE products (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    category_id INT,
    brand_id INT,
    price DECIMAL(10,2),
    stock INT,
    status TINYINT,
    created_at DATETIME
);

-- 动态查询(根据用户选择的条件)
SELECT * FROM products 
WHERE category_id = ? 
  AND brand_id = ? 
  AND price BETWEEN ? AND ?
  AND status = 1
ORDER BY created_at DESC;

优化策略

-- 1. 分析查询模式,创建多个索引应对不同场景
CREATE INDEX idx_category_brand_price ON products(category_id, brand_id, price);
CREATE INDEX idx_category_price ON products(category_id, price);
CREATE INDEX idx_brand_price ON products(brand_id, price);

-- 2. 使用索引提示(强制使用特定索引)
SELECT * FROM products USE INDEX (idx_category_brand_price)
WHERE category_id = 1 
  AND brand_id = 5 
  AND price BETWEEN 100 AND 1000;

-- 3. 对于排序,确保索引包含排序列
CREATE INDEX idx_category_status_created ON products(category_id, status, created_at DESC);

2.3 索引维护与监控

监控索引使用情况

-- 查看索引使用统计(MySQL 5.6+)
SELECT * FROM sys.schema_index_statistics WHERE table_schema = 'your_database';

-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;

-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;

索引维护操作

-- 分析表更新统计信息
ANALYZE TABLE orders;

-- 优化表(整理碎片)
OPTIMIZE TABLE orders;

-- 删除不再使用的索引
DROP INDEX idx_old ON orders;

三、查询语句优化

3.1 避免全表扫描

3.1.1 WHERE条件优化

-- 反例:导致全表扫描
SELECT * FROM users WHERE YEAR(created_at) = 2023;

-- 正例:使用索引列
SELECT * FROM users 
WHERE created_at >= '2023-01-01' 
  AND created_at < '2024-01-01';

-- 反例:函数操作索引列
SELECT * FROM orders WHERE LOWER(email) = 'user@example.com';

-- 正例:保持索引列原样
SELECT * FROM orders WHERE email = 'user@example.com';

3.1.2 模糊查询优化

-- 反例:前缀通配符导致索引失效
SELECT * FROM users WHERE username LIKE '%john';

-- 正例:后缀通配符可以使用索引
SELECT * FROM users WHERE username LIKE 'john%';

-- 全文搜索替代方案
SELECT * FROM articles 
WHERE MATCH(title, content) AGAINST ('database' IN NATURAL LANGUAGE MODE);

3.2 避免SELECT *

-- 反例:查询所有列
SELECT * FROM orders WHERE user_id = 123;

-- 正例:只查询需要的列
SELECT order_id, order_amount, create_time 
FROM orders 
WHERE user_id = 123;

-- 更好的做法:使用覆盖索引
CREATE INDEX idx_user_cover ON orders(user_id, order_id, order_amount, create_time);

SELECT order_id, order_amount, create_time 
FROM orders 
WHERE user_id = 123;  -- 使用覆盖索引,无需回表

3.3 分页查询优化

3.3.1 传统分页的问题

-- 传统分页(深度分页问题)
SELECT * FROM orders 
WHERE user_id = 123 
ORDER BY create_time DESC 
LIMIT 1000000, 20;  -- 扫描1000020条记录,返回20条

3.3.2 优化方案

方案1:延迟关联

-- 先获取主键,再关联查询
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders 
    WHERE user_id = 123 
    ORDER BY create_time DESC 
    LIMIT 1000000, 20
) AS tmp ON o.order_id = tmp.order_id;

方案2:位置记录法

-- 记录上一页最后一条记录的位置
SELECT * FROM orders 
WHERE user_id = 123 
  AND create_time < '2023-12-01 10:00:00'  -- 上一页最后一条的时间
ORDER BY create_time DESC 
LIMIT 20;

方案3:使用子查询

-- MySQL 8.0+ 使用窗口函数
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (ORDER BY create_time DESC) as row_num
    FROM orders WHERE user_id = 123
) AS tmp
WHERE row_num BETWEEN 1000001 AND 1000020;

3.4 JOIN优化

3.4.1 JOIN类型选择

-- 内连接(推荐)
SELECT o.*, u.username 
FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE o.user_id = 123;

-- 左连接(需要时使用)
SELECT o.*, u.username 
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE o.create_time >= '2023-01-01';

3.4.2 JOIN优化技巧

-- 1. 小表驱动大表
SELECT * FROM small_table s
JOIN large_table l ON s.id = l.small_id;

-- 2. 确保JOIN字段有索引
CREATE INDEX idx_user_id ON orders(user_id);
CREATE INDEX idx_id ON users(id);

-- 3. 避免笛卡尔积
SELECT * FROM a, b WHERE a.id = b.id;  -- 错误
SELECT * FROM a JOIN b ON a.id = b.id; -- 正确

-- 4. 使用STRAIGHT_JOIN强制表顺序
SELECT STRAIGHT_JOIN * FROM small_table s
JOIN large_table l ON s.id = l.small_id;

四、数据库架构优化

4.1 分库分表

4.1.1 垂直分表

-- 原始大表
CREATE TABLE user_info (
    id INT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    profile TEXT,  -- 大文本字段
    settings TEXT, -- 大文本字段
    created_at DATETIME
);

-- 垂直拆分
CREATE TABLE user_base (
    id INT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at DATETIME
);

CREATE TABLE user_details (
    user_id INT PRIMARY KEY,
    profile TEXT,
    settings TEXT,
    FOREIGN KEY (user_id) REFERENCES user_base(id)
);

4.1.2 水平分表(分片)

-- 按用户ID取模分表
-- orders_0, orders_1, orders_2, ..., orders_9

-- 分表后的查询需要应用层路由
-- 伪代码示例:
function getOrdersByUserId(userId) {
    const tableIndex = userId % 10;
    const tableName = `orders_${tableIndex}`;
    return db.query(`SELECT * FROM ${tableName} WHERE user_id = ?`, [userId]);
}

-- 分表后的聚合查询需要UNION ALL
SELECT * FROM orders_0 WHERE user_id = 123
UNION ALL
SELECT * FROM orders_1 WHERE user_id = 123
-- ... 遍历所有分表

4.2 分区表

-- 按时间范围分区
CREATE TABLE logs (
    id BIGINT NOT NULL AUTO_INCREMENT,
    log_time DATETIME NOT NULL,
    message TEXT,
    PRIMARY KEY (id, log_time)
) PARTITION BY RANGE COLUMNS(log_time) (
    PARTITION p202301 VALUES LESS THAN ('2023-02-01'),
    PARTITION p202302 VALUES LESS THAN ('2023-03-01'),
    PARTITION p202303 VALUES LESS THAN ('2023-04-01'),
    PARTITION p202304 VALUES LESS THAN ('2023-05-01'),
    PARTITION pmax VALUES LESS THAN (MAXVALUE)
);

-- 查询时自动分区裁剪
SELECT * FROM logs 
WHERE log_time >= '2023-03-01' 
  AND log_time < '2023-04-01';  -- 只扫描p202303分区

五、读写分离架构

5.1 读写分离原理

读写分离的核心思想是将写操作(INSERT/UPDATE/DELETE)发送到主库,读操作(SELECT)发送到从库,从而分散数据库压力。

应用层
  ↓
中间件/代理层
  ↓
主库(写) ←→ 从库1(读)
  ↓
从库2(读)
  ↓
从库3(读)

5.2 MySQL主从复制配置

5.2.1 主库配置(Master)

# my.cnf 或 my.ini
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M

# 需要同步的数据库
binlog_do_db = myapp
# 不需要同步的数据库
binlog_ignore_db = mysql
binlog_ignore_db = information_schema

创建复制用户:

CREATE USER 'repl'@'%' IDENTIFIED BY 'ReplPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 锁表并记录位置
FLUSH TABLES WITH READ LOCK;
SHOW MASTER STATUS;
-- 记录 File 和 Position 值,例如:mysql-bin.000001, 154

5.2.2 从库配置(Slave)

# my.cnf
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
log_bin = mysql-bin
read_only = 1  # 只读模式,防止误写

配置复制:

-- 停止从库复制
STOP SLAVE;

-- 配置主库信息
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='ReplPassword123!',
MASTER_LOG_FILE='mysql-bin.000001',  -- 从主库SHOW MASTER STATUS获取
MASTER_LOG_POS=154;                  -- 从主库SHOW MASTER STATUS获取

-- 启动复制
START SLAVE;

-- 查看复制状态
SHOW SLAVE STATUS\G
-- 关键指标:
-- Slave_IO_Running: Yes
-- Slave_SQL_Running: Yes
-- Seconds_Behind_Master: 0(延迟秒数)

5.2.3 验证主从同步

-- 在主库创建测试数据
CREATE DATABASE test_replication;
USE test_replication;
CREATE TABLE test (id INT PRIMARY KEY, value VARCHAR(50));
INSERT INTO test VALUES (1, 'master');
INSERT INTO test VALUES (2, 'master');

-- 在从库查询(应该能看到相同数据)
SELECT * FROM test_replication.test;

-- 在主库更新
UPDATE test_replication.test SET value = 'updated' WHERE id = 1;

-- 在从库验证
SELECT * FROM test_replication.test;

5.3 应用层实现读写分离

5.3.1 使用Spring Boot + ShardingSphere

// 1. 添加依赖
// pom.xml
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>5.1.1</version>
</dependency>

// 2. 配置文件 application.yml
spring:
  shardingsphere:
    datasource:
      names: master,slave0,slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master-ip:3306/myapp?useSSL=false
        username: root
        password: password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave0-ip:3306/myapp?useSSL=false
        username: root
        password: password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1-ip:3306/myapp?useSSL=false
        username: root
        password: password
    
    rules:
      readwrite-splitting:
        data-sources:
          myds:
            type: Static
            props:
              write-data-source-name: master
              read-data-source-names: slave0,slave1
            load-balance-algorithm-name: round_robin
    
    props:
      sql-show: true  # 打印SQL路由信息

// 3. 业务代码(无需特殊处理)
@Service
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    // 写操作自动路由到主库
    @Transactional
    public Order createOrder(Order order) {
        return orderRepository.save(order);
    }
    
    // 读操作自动路由到从库
    public Order getOrderById(Long id) {
        return orderRepository.findById(id).orElse(null);
    }
    
    // 复杂查询可能需要强制走主库
    @ReadwriteSplittingHint(type = "write")
    public Order getRecentOrderWithLock(Long userId) {
        // 使用主库查询,确保数据一致性
        return orderRepository.findRecentByUserId(userId);
    }
}

5.3.2 使用MyCat中间件

<!-- MyCat schema.xml 配置 -->
<schema name="myapp" checkSQLschema="false" sqlMaxLimit="100">
    <table name="orders" dataNode="dn1,dn2,dn3" rule="mod-long" />
</schema>

<dataNode name="dn1" dataHost="host1" database="myapp" />
<dataNode name="dn2" dataHost="host1" database="myapp" />
<dataNode name="dn3" dataHost="host1" database="myapp" />

<dataHost name="host1" maxCon="1000" minCon="10" balance="1" writeType="0" dbType="mysql" dbDriver="native">
    <heartbeat>select user()</heartbeat>
    
    <!-- 写节点 -->
    <writeHost host="hostM1" url="master-ip:3306" user="root" password="password">
        <!-- 读节点 -->
        <readHost host="hostS1" url="slave0-ip:3306" user="root" password="password" />
        <readHost host="hostS2" url="slave1-ip:3306" user="root" password="password" />
    </writeHost>
</dataHost>

5.3.3 使用ShardingSphere-Proxy

# config-sharding.yaml
schemaName: myapp

dataSources:
  ds_0:
    url: jdbc:mysql://master-ip:3306/myapp?serverTimezone=UTC&useSSL=false
    username: root
    password: password
    connectionTimeoutMilliseconds: 3000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
  ds_1:
    url: jdbc:mysql://slave0-ip:3306/myapp?serverTimezone=UTC&useSSL=false
    username: root
    password: password
    connectionTimeoutMilliseconds: 3000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50
  ds_2:
    url: jdbc:mysql://slave1-ip:3306/myapp?serverTimezone=UTC&useSSL=false
    username: root
    password: password
    connectionTimeoutMilliseconds: 3000
    idleTimeoutMilliseconds: 60000
    maxLifetimeMilliseconds: 1800000
    maxPoolSize: 50

rules:
- !READWRITE_SPLITTING
  dataSources:
    myds:
      type: Static
      props:
        write-data-source-name: ds_0
        read-data-source-names: ds_1,ds_2
      load-balance-algorithm-name: round_robin

- !SHARDING
  tables:
    orders:
      actualDataNodes: myds.orders_$->{0..9}
      tableStrategy:
        standard:
          shardingColumn: user_id
          shardingAlgorithmName: mod
  shardingAlgorithms:
    mod:
      type: MOD
      props:
        sharding-count: 10

5.4 读写分离的注意事项

5.4.1 数据延迟问题

// 方案1:关键业务强制走主库
public Order getOrderAfterCreate(Long orderId) {
    // 创建订单后立即查询,强制走主库避免读到延迟数据
    return orderRepository.findFromMaster(orderId);
}

// 方案2:使用缓存标记
public void createOrder(Order order) {
    orderRepository.save(order);
    // 设置缓存标记,5秒内走主库
    redisTemplate.opsForValue().set("order:recent:" + order.getUserId(), "1", 5, TimeUnit.SECONDS);
}

public Order getOrder(Long orderId, Long userId) {
    // 检查是否有最近创建标记
    if (redisTemplate.hasKey("order:recent:" + userId)) {
        return orderRepository.findFromMaster(orderId);
    }
    return orderRepository.findFromSlave(orderId);
}

5.4.2 主从复制监控

-- 监控复制延迟
SHOW SLAVE STATUS\G

-- 查看主从同步状态
SELECT * FROM performance_schema.replication_applier_status;

-- 监控复制错误
SELECT * FROM performance_schema.replication_applier_status_by_worker;

5.4.3 故障切换

// 简单的故障检测和切换
@Component
public class DataSourceHealthChecker {
    
    @Autowired
    private DataSource masterDataSource;
    
    @Autowired
    private List<DataSource> slaveDataSources;
    
    @Scheduled(fixedRate = 5000)
    public void checkHealth() {
        // 检查主库
        if (!isHealthy(masterDataSource)) {
            // 触发告警和故障转移
            alertMasterDown();
        }
        
        // 检查从库
        for (int i = 0; i < slaveDataSources.size(); i++) {
            if (!isHealthy(slaveDataSources.get(i))) {
                // 标记从库为不可用
                markSlaveUnavailable(i);
            }
        }
    }
    
    private boolean isHealthy(DataSource dataSource) {
        try (Connection conn = dataSource.getConnection();
             Statement stmt = conn.createStatement();
             ResultSet rs = stmt.executeQuery("SELECT 1")) {
            return rs.next();
        } catch (Exception e) {
            return false;
        }
    }
}

六、高级优化策略

6.1 缓存策略

6.1.1 多级缓存架构

// 伪代码:多级缓存实现
public class MultiLevelCacheService {
    
    // L1: 本地缓存(Caffeine)
    private final Cache<String, Object> localCache = Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();
    
    // L2: Redis缓存
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // L3: 数据库
    @Autowired
    private OrderRepository orderRepository;
    
    public Order getOrder(Long orderId) {
        String key = "order:" + orderId;
        
        // L1: 本地缓存
        Order order = (Order) localCache.getIfPresent(key);
        if (order != null) {
            return order;
        }
        
        // L2: Redis缓存
        order = (Order) redisTemplate.opsForValue().get(key);
        if (order != null) {
            // 回填本地缓存
            localCache.put(key, order);
            return order;
        }
        
        // L3: 数据库
        order = orderRepository.findById(orderId).orElse(null);
        if (order != null) {
            // 回填缓存
            redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
            localCache.put(key, order);
        }
        
        return order;
    }
    
    public void updateOrder(Order order) {
        // 更新数据库
        orderRepository.save(order);
        
        // 删除缓存(Cache Aside模式)
        String key = "order:" + order.getId();
        redisTemplate.delete(key);
        localCache.invalidate(key);
    }
}

6.1.2 缓存穿透、击穿、雪崩防护

// 缓存空值防止穿透
public Order getOrderWithNullCache(Long orderId) {
    String key = "order:" + orderId;
    String nullKey = "order:null:" + orderId;
    
    // 检查空值缓存
    if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
        return null;
    }
    
    Order order = (Order) redisTemplate.opsForValue().get(key);
    if (order == null) {
        order = orderRepository.findById(orderId).orElse(null);
        if (order == null) {
            // 缓存空值,防止穿透
            redisTemplate.opsForValue().set(nullKey, "", 5, TimeUnit.MINUTES);
            return null;
        }
        redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
    }
    return order;
}

// 分布式锁防止缓存击穿
public Order getOrderWithLock(Long orderId) {
    String key = "order:" + orderId;
    String lockKey = "lock:order:" + orderId;
    
    Order order = (Order) redisTemplate.opsForValue().get(key);
    if (order != null) {
        return order;
    }
    
    // 尝试获取分布式锁
    Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
    if (Boolean.TRUE.equals(locked)) {
        try {
            // 双重检查
            order = (Order) redisTemplate.opsForValue().get(key);
            if (order == null) {
                order = orderRepository.findById(orderId).orElse(null);
                if (order != null) {
                    redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
                }
            }
        } finally {
            redisTemplate.delete(lockKey);
        }
    } else {
        // 等待并重试
        try {
            Thread.sleep(50);
            return getOrderWithLock(orderId);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
    
    return order;
}

6.2 连接池优化

6.2.1 HikariCP配置

// Spring Boot配置
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties("spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        
        // 基础配置
        dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/myapp");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        
        // 连接池大小(根据业务调整)
        dataSource.setMaximumPoolSize(50);      // 最大连接数
        dataSource.setMinimumIdle(10);          // 最小空闲连接
        dataSource.setConnectionTimeout(3000);  // 连接超时3秒
        dataSource.setIdleTimeout(600000);      // 空闲10分钟回收
        dataSource.setMaxLifetime(1800000);     // 连接最大存活30分钟
        
        // 性能优化
        dataSource.setPoolName("MyAppHikariCP");
        dataSource.addDataSourceProperty("cachePrepStmts", "true");
        dataSource.addDataSourceProperty("prepStmtCacheSize", "250");
        dataSource.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        dataSource.addDataSourceProperty("useServerPrepStmts", "true");
        dataSource.addDataSourceProperty("useLocalSessionState", "true");
        dataSource.addDataSourceProperty("rewriteBatchedStatements", "true");
        dataSource.addDataSourceProperty("cacheResultSetMetadata", "true");
        dataSource.addDataSourceProperty("cacheServerConfiguration", "true");
        dataSource.addDataSourceProperty("elideSetAutoCommits", "true");
        dataSource.addDataSourceProperty("maintainTimeStats", "false");
        
        return dataSource;
    }
}

6.2.2 连接池监控

// 监控连接池状态
@Component
public class ConnectionPoolMonitor {
    
    @Autowired
    private HikariDataSource dataSource;
    
    @Scheduled(fixedRate = 60000)
    public void monitorPool() {
        HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
        
        log.info("连接池状态:");
        log.info("  活跃连接数: {}", poolMXBean.getActiveConnections());
        log.info("  空闲连接数: {}", poolMXBean.getIdleConnections());
        log.info("  总连接数: {}", poolMXBean.getTotalConnections());
        log.info("  等待连接数: {}", poolMXBean.getThreadsAwaitingConnection());
        
        // 告警阈值
        if (poolMXBean.getThreadsAwaitingConnection() > 10) {
            log.warn("连接池等待数过高: {}", poolMXBean.getThreadsAwaitingConnection());
        }
    }
}

6.3 异步处理

6.3.1 异步写入

// 使用消息队列异步处理
@Service
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    // 同步接口,快速返回
    public CompletableFuture<Long> createOrderAsync(Order order) {
        // 1. 生成订单ID并持久化到临时表
        order.setStatus("PENDING");
        Order saved = orderRepository.save(order);
        
        // 2. 发送消息到MQ
        rabbitTemplate.convertAndSend("order.exchange", "order.create", saved.getId());
        
        // 3. 立即返回订单ID
        return CompletableFuture.completedFuture(saved.getId());
    }
    
    // MQ消费者异步处理
    @RabbitListener(queues = "order.queue")
    public void processOrder(Long orderId) {
        try {
            // 完整的业务处理
            Order order = orderRepository.findById(orderId).orElseThrow();
            processPayment(order);
            updateInventory(order);
            order.setStatus("COMPLETED");
            orderRepository.save(order);
        } catch (Exception e) {
            // 失败重试或记录日志
            log.error("处理订单失败: {}", orderId, e);
        }
    }
}

6.3.2 批量处理

// 批量插入优化
public void batchInsertOrders(List<Order> orders) {
    // 使用JDBC批量处理
    jdbcTemplate.batchUpdate(
        "INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)",
        new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                Order order = orders.get(i);
                ps.setLong(1, order.getUserId());
                ps.setBigDecimal(2, order.getAmount());
                ps.setInt(3, order.getStatus());
            }
            
            @Override
            public int getBatchSize() {
                return orders.size();
            }
        }
    );
}

// 使用LOAD DATA INFILE(超大数据量)
public void bulkLoadOrders(String filePath) {
    String sql = "LOAD DATA LOCAL INFILE '" + filePath + "' " +
                 "INTO TABLE orders " +
                 "FIELDS TERMINATED BY ',' " +
                 "LINES TERMINATED BY '\\n' " +
                 "(user_id, amount, status)";
    jdbcTemplate.execute(sql);
}

6.4 数据归档

6.4.1 历史数据归档策略

-- 创建归档表(结构与原表相同)
CREATE TABLE orders_archive LIKE orders;

-- 定期归档(存储过程)
DELIMITER $$
CREATE PROCEDURE archive_old_orders()
BEGIN
    -- 开始事务
    START TRANSACTION;
    
    -- 插入历史数据到归档表
    INSERT INTO orders_archive 
    SELECT * FROM orders 
    WHERE create_time < DATE_SUB(NOW(), INTERVAL 1 YEAR);
    
    -- 删除原表历史数据
    DELETE FROM orders 
    WHERE create_time < DATE_SUB(NOW(), INTERVAL 1 YEAR);
    
    -- 提交事务
    COMMIT;
END$$
DELIMITER ;

-- 设置定时任务(Event Scheduler)
SET GLOBAL event_scheduler = ON;

CREATE EVENT archive_orders_event
ON SCHEDULE EVERY 1 WEEK
DO
    CALL archive_old_orders();

6.4.2 分区表归档

-- 使用分区表便于归档
CREATE TABLE orders (
    id BIGINT,
    user_id INT,
    amount DECIMAL(10,2),
    create_time DATETIME,
    PRIMARY KEY (id, create_time)
) PARTITION BY RANGE COLUMNS(create_time) (
    PARTITION p202301 VALUES LESS THAN ('2023-02-01'),
    PARTITION p202302 VALUES LESS THAN ('2023-03-01'),
    PARTITION p202303 VALUES LESS THAN ('2023-04-01'),
    PARTITION pcurrent VALUES LESS THAN (MAXVALUE)
);

-- 归档时直接删除分区(瞬间完成)
ALTER TABLE orders DROP PARTITION p202301;

-- 或者将分区移动到归档表
ALTER TABLE orders EXCHANGE PARTITION p202301 WITH TABLE orders_archive;

七、监控与告警

7.1 性能监控指标

7.1.1 关键指标采集

-- 1. 查询性能指标
SELECT 
    SCHEMA_NAME,
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 2. 表I/O统计
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    COUNT_READ,
    COUNT_WRITE,
    SUM_NUMBER_OF_BYTES_READ,
    SUM_NUMBER_OF_BYTES_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 3. 索引使用统计
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 4. 锁等待统计
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    LOCK_TYPE,
    LOCK_MODE,
    COUNT_WAIT,
    SUM_TIMER_WAIT/1000000000000 AS wait_time_sec
FROM performance_schema.data_lock_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC;

7.1.2 慢查询日志分析

# my.cnf 配置慢查询日志
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录超过1秒的查询
log_queries_not_using_indexes = 1
min_examined_row_limit = 1000

分析慢查询日志:

# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

# 使用pt-query-digest分析(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

7.2 监控告警系统

7.2.1 Prometheus + Grafana监控

# docker-compose.yml
version: '3'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
  
  mysqld_exporter:
    image: prom/mysqld-exporter
    environment:
      - DATA_SOURCE_NAME=exporter:password@(mysql:3306)/
    ports:
      - "9104:9104"
# prometheus.yml
scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysqld_exporter:9104']

7.2.2 告警规则示例

# alert-rules.yml
groups:
- name: mysql
  rules:
  - alert: MySQLDown
    expr: up{job="mysql"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "MySQL instance {{ $labels.instance }} is down"
  
  - alert: MySQLSlowQueries
    expr: rate(mysql_global_status_slow_queries[5m]) > 5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL slow queries rate is high"
  
  - alert: MySQLHighConnections
    expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL connection usage is high"

7.3 性能分析工具

7.3.1 使用EXPLAIN分析查询

-- 基本EXPLAIN
EXPLAIN SELECT * FROM orders WHERE user_id = 123;

-- 详细EXPLAIN(包含执行时间)
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 123;

-- JSON格式EXPLAIN
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 123;

-- 查看执行计划可视化
EXPLAIN 
SELECT o.*, u.username 
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.create_time >= '2023-01-01';

EXPLAIN输出解读

字段 说明 优化目标
type 连接类型 ALL → range → ref → eq_ref → const
key 实际使用的索引 应该有值,不应是NULL
rows 预计扫描行数 越少越好
Extra 额外信息 避免Using filesort, Using temporary

7.3.2 使用sys schema

-- 查看最耗时的查询
SELECT * FROM sys.statement_analysis 
ORDER BY avg_latency DESC 
LIMIT 10;

-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;

-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;

-- 查看表的I/O情况
SELECT * FROM sys.schema_table_statistics 
ORDER BY total_latency DESC 
LIMIT 10;

-- 查看锁等待
SELECT * FROM sys.innodb_lock_waits;

八、实战案例:电商系统优化

8.1 系统架构

用户请求
  ↓
Nginx(负载均衡)
  ↓
应用服务器集群(Spring Boot)
  ↓
ShardingSphere(读写分离 + 分库分表)
  ↓
主库(写) + 从库1(读) + 从库2(读)
  ↓
Redis集群(缓存)
  ↓
消息队列(异步处理)

8.2 优化前的问题

-- 问题1:慢查询
SELECT * FROM orders 
WHERE user_id = 123 
  AND status = 1 
  AND create_time >= '2023-01-01'
ORDER BY create_time DESC;

-- 问题2:大表全表扫描
SELECT COUNT(*) FROM orders WHERE status = 1;

-- 问题3:深度分页
SELECT * FROM orders ORDER BY create_time DESC LIMIT 1000000, 20;

8.3 优化方案实施

8.3.1 索引优化

-- 1. 创建联合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time DESC);

-- 2. 创建覆盖索引
CREATE INDEX idx_status_time_cover ON orders(status, create_time, id, user_id, amount);

-- 3. 优化COUNT查询
CREATE INDEX idx_status ON orders(status);
-- 或使用近似值
SHOW TABLE STATUS LIKE 'orders';  -- 查看Rows值

8.3.2 分库分表

-- 按用户ID分10张表
-- orders_0 ~ orders_9

-- 应用层路由逻辑
public class OrderShardingRouter {
    private static final int SHARD_COUNT = 10;
    
    public String getTableName(Long userId) {
        int index = (int) (userId % SHARD_COUNT);
        return "orders_" + index;
    }
    
    public List<String> getAllTableNames() {
        return IntStream.range(0, SHARD_COUNT)
            .mapToObj(i -> "orders_" + i)
            .collect(Collectors.toList());
    }
}

8.3.3 读写分离配置

# ShardingSphere配置
rules:
- !READWRITE_SPLITTING
  dataSources:
    myds:
      type: Static
      props:
        write-data-source-name: master
        read-data-source-names: slave0,slave1
      load-balance-algorithm-name: round_robin

- !SHARDING
  tables:
    orders:
      actualDataNodes: myds.orders_$->{0..9}
      tableStrategy:
        standard:
          shardingColumn: user_id
          shardingAlgorithmName: mod
  shardingAlgorithms:
    mod:
      type: MOD
      props:
        sharding-count: 10

8.3.4 缓存策略

// 缓存热点数据
@Cacheable(value = "orders", key = "#orderId", unless = "#result == null")
public Order getOrder(Long orderId) {
    return orderRepository.findById(orderId).orElse(null);
}

// 缓存用户订单列表(带分页)
@Cacheable(value = "userOrders", key = "#userId + ':' + #page + ':' + #size")
public List<Order> getUserOrders(Long userId, int page, int size) {
    String tableName = shardingRouter.getTableName(userId);
    return orderRepository.findByUserIdInTable(tableName, userId, page * size, size);
}

// 更新时清除缓存
@CacheEvict(value = {"orders", "userOrders"}, allEntries = true)
public void updateOrder(Order order) {
    orderRepository.save(order);
}

8.3.5 异步处理

// 订单创建流程
public CompletableFuture<Long> createOrder(Order order) {
    // 1. 校验库存(同步)
    if (!inventoryService.checkStock(order.getProductId(), order.getQuantity())) {
        throw new BusinessException("库存不足");
    }
    
    // 2. 创建订单(同步)
    order.setStatus("PENDING");
    Order saved = orderRepository.save(order);
    
    // 3. 发送MQ消息(异步)
    rabbitTemplate.convertAndSend("order.exchange", "order.create", saved.getId());
    
    // 4. 立即返回
    return CompletableFuture.completedFuture(saved.getId());
}

// MQ消费者
@RabbitListener(queues = "order.queue")
public void processOrder(Long orderId) {
    try {
        // 异步处理支付、库存扣减、通知等
        Order order = orderRepository.findById(orderId).orElseThrow();
        
        // 支付处理
        paymentService.process(order);
        
        // 库存扣减
        inventoryService.deduct(order.getProductId(), order.getQuantity());
        
        // 更新状态
        order.setStatus("COMPLETED");
        orderRepository.save(order);
        
        // 发送通知
        notificationService.sendOrderCompleted(order);
        
    } catch (Exception e) {
        log.error("订单处理失败: {}", orderId, e);
        // 重试或补偿
        retryService.scheduleRetry(orderId);
    }
}

8.4 优化效果对比

指标 优化前 优化后 提升
平均响应时间 850ms 45ms 94.7%
QPS 500 5000 10倍
慢查询数 120/分钟 2/分钟 98%
CPU使用率 85% 35% 59%
磁盘I/O 95% 25% 74%

九、总结与最佳实践

9.1 优化优先级

  1. 索引优化(成本最低,效果最明显)

    • 确保WHERE、JOIN、ORDER BY字段有索引
    • 使用覆盖索引减少回表
    • 定期清理无用索引
  2. 查询优化(零成本)

    • 避免SELECT *
    • 避免函数操作索引列
    • 优化分页查询
  3. 缓存策略(中等成本)

    • 多级缓存架构
    • 防止缓存穿透、击穿、雪崩
    • 合理的缓存过期策略
  4. 架构优化(高成本)

    • 读写分离
    • 分库分表
    • 数据归档

9.2 日常维护 checklist

#!/bin/bash
# MySQL日常维护脚本

# 1. 检查慢查询日志
echo "=== 慢查询统计 ==="
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

# 2. 检查表索引使用情况
mysql -e "SELECT * FROM sys.schema_unused_indexes;"

# 3. 检查主从延迟
mysql -e "SHOW SLAVE STATUS\G" | grep Seconds_Behind_Master

# 4. 检查连接数
mysql -e "SHOW STATUS LIKE 'Threads_connected';"
mysql -e "SHOW STATUS LIKE 'Max_used_connections';"

# 5. 检查锁等待
mysql -e "SELECT * FROM sys.innodb_lock_waits;"

# 6. 生成性能报告
mysql -e "SELECT * FROM sys.statement_analysis ORDER BY avg_latency DESC LIMIT 10;" > /tmp/perf_report.txt

9.3 常见误区

误区1:索引越多越好

  • 正解:索引会降低写入性能,只创建必要的索引

误区2:所有查询都要走索引

  • 正解:小表全表扫描可能更快

误区3:读写分离能解决所有性能问题

  • 正解:需要配合缓存、异步等其他手段

误区4:分库分表是银弹

  • 正解:会带来分布式事务、跨库查询等复杂问题

9.4 持续优化建议

  1. 建立性能基线:定期记录关键指标,建立性能基线
  2. 灰度发布:优化方案先在小范围验证
  3. A/B测试:对比优化前后的性能数据
  4. 文档沉淀:记录每次优化的原因、方案和效果
  5. 团队培训:提升团队数据库优化能力

通过系统性地应用这些策略,您可以有效解决MySQL高并发场景下的性能瓶颈,构建稳定、高效的数据库架构。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。