引言:理解高并发场景下的MySQL挑战

在当今互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库,在高并发环境下面临着流量洪峰和锁竞争两大核心挑战。

流量洪峰指的是在短时间内大量请求涌入,导致数据库连接池耗尽、CPU和内存资源紧张,甚至出现服务不可用的情况。锁竞争则是多个事务同时访问相同资源时产生的等待和阻塞,严重时会导致死锁和性能急剧下降。这些问题如果处理不当,不仅会影响用户体验,还可能造成数据不一致或系统崩溃。

本文将从索引优化、查询优化、事务与锁优化、架构升级等多个维度,全面解析MySQL应对高并发的策略。我们将通过具体的案例和代码示例,帮助读者深入理解并掌握这些优化技巧。

一、索引优化:高并发性能的基石

1.1 索引的基本原理与重要性

索引是MySQL中提高查询性能最有效的手段之一。在高并发场景下,合理的索引设计可以将查询性能提升几个数量级,同时减少锁竞争的范围和时间。

索引的工作原理类似于书籍的目录。当我们需要查找特定内容时,通过目录可以快速定位到目标页面,而不需要逐页翻阅。在MySQL中,索引使用B+树数据结构存储,支持高效的范围查询和等值查询。

1.2 索引优化的具体策略

1.2.1 覆盖索引(Covering Index)

覆盖索引是指索引包含了查询所需的所有字段,避免了回表操作,从而大幅提升查询性能。

场景示例:假设我们有一个用户订单表,经常需要查询用户的订单状态。

-- 创建订单表
CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    order_no VARCHAR(64) NOT NULL,
    status TINYINT NOT NULL,
    amount DECIMAL(10,2),
    create_time DATETIME,
    INDEX idx_user_status (user_id, status)
);

-- 优化前的查询(需要回表)
SELECT order_no, amount FROM orders WHERE user_id = 1001 AND status = 1;

-- 优化后的覆盖索引
ALTER TABLE orders ADD INDEX idx_user_status_amount (user_id, status, order_no, amount);

-- 现在查询可以直接从索引中获取所有数据
SELECT order_no, amount FROM orders WHERE user_id = 1001 AND status = 1;

通过EXPLAIN分析可以看到,使用覆盖索引后,Extra字段显示”Using index”,表示避免了回表操作。

1.2.2 最左前缀原则与索引下推

MySQL的复合索引遵循最左前缀原则。假设我们创建了索引(a, b, c),那么以下查询都能有效利用索引:

  • WHERE a = ?
  • WHERE a = ? AND b = ?
  • WHERE a = ? AND b = ? AND c = ?

但以下查询无法充分利用索引:

  • WHERE b = ?
  • WHERE b = ? AND c = ?
  • WHERE c = ?

索引下推(ICP) 是MySQL 5.6引入的优化,可以在索引遍历过程中就对不符合条件的记录进行过滤。

-- 创建复合索引
ALTER TABLE orders ADD INDEX idx_user_time_status (user_id, create_time, status);

-- 索引下推优化的查询
SELECT * FROM orders 
WHERE user_id = 1001 
  AND create_time >= '2024-01-01' 
  AND status = 1;

-- 通过EXPLAIN可以看到,Extra字段显示"Using index condition"
-- 表示MySQL在索引层面就过滤了不符合条件的记录

1.2.3 索引选择性与区分度

索引的选择性是指不重复的索引值与表记录总数的比值。选择性越高,索引的效率越高。通常选择性大于0.3的列才适合建立索引。

-- 计算列的选择性
SELECT 
    COUNT(DISTINCT status) / COUNT(*) AS selectivity
FROM orders;

-- 对于选择性低的列(如性别、状态),可以考虑:
-- 1. 不建立索引
-- 2. 结合其他高选择性列建立复合索引
-- 3. 使用前缀索引(针对字符串)
ALTER TABLE orders ADD INDEX idx_status_prefix (status(1));

1.3 索引维护与监控

1.3.1 索引使用情况监控

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_READ,
    COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
ORDER BY COUNT_READ DESC;

-- 查找未使用的索引(可能造成写性能下降)
SELECT 
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    t.INDEX_NAME
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage u
    ON t.TABLE_SCHEMA = u.OBJECT_SCHEMA 
    AND t.TABLE_NAME = u.OBJECT_NAME 
    AND t.INDEX_NAME = u.INDEX_NAME
WHERE u.COUNT_READ IS NULL 
  AND t.INDEX_NAME != 'PRIMARY';

1.3.2 索引碎片整理

-- 查看表碎片情况
SELECT 
    TABLE_NAME,
    ROUND(DATA_LENGTH / 1024 / 1024, 2) AS data_size_mb,
    ROUND(DATA_FREE / 1024 / 1024, 2) AS free_size_mb
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database';

-- 优化表(整理碎片)
OPTIMIZE TABLE orders;

-- 或者使用ALTER TABLE重建索引
ALTER TABLE orders ENGINE = InnoDB;

二、查询优化:让SQL飞起来

2.1 避免全表扫描

全表扫描是高并发场景下的性能杀手。我们需要通过合理的索引和查询条件避免全表扫描。

2.1.1 避免SELECT *

-- 反例:查询所有字段,造成不必要的网络传输和内存消耗
SELECT * FROM orders WHERE user_id = 1001;

-- 正例:只查询需要的字段
SELECT id, order_no, status, amount FROM orders WHERE user_id = 1001;

-- 更好的做法:使用覆盖索引
ALTER TABLE orders ADD INDEX idx_user_cover (user_id, id, order_no, status, amount);
SELECT id, order_no, status, amount FROM orders WHERE user_id = 1001;

2.1.2 避免在WHERE子句中对字段进行函数操作

-- 反例:无法使用索引
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- 正例:可以使用索引
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

2.1.3 避免使用LIKE以%开头

-- 反例:无法使用索引
SELECT * FROM orders WHERE order_no LIKE '%12345';

-- 正例:可以使用索引
SELECT * FROM orders WHERE order_no LIKE 'ABC%';

-- 如果必须使用后缀匹配,可以考虑:
-- 1. 倒序存储 + 索引
ALTER TABLE orders ADD COLUMN order_no_reverse VARCHAR(64);
UPDATE orders SET order_no_reverse = REVERSE(order_no);
ALTER TABLE orders ADD INDEX idx_order_no_reverse (order_no_reverse);

-- 查询时
SELECT * FROM orders WHERE order_no_reverse LIKE REVERSE('%12345');

2.2 分页优化

在高并发场景下,深度分页是一个常见问题。LIMIT 1000000, 10 这样的查询会扫描大量无用数据。

2.2.1 延迟关联优化

-- 原始低效查询
SELECT * FROM orders 
WHERE user_id = 1001 
ORDER BY create_time DESC 
LIMIT 1000000, 10;

-- 优化方案:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders 
    WHERE user_id = 1001 
    ORDER BY create_time DESC 
    LIMIT 1000000, 10
) t ON o.id = t.id;

2.2.2 位置记录法

-- 第一次查询
SELECT * FROM orders 
WHERE user_id = 1001 
ORDER BY create_time DESC 
LIMIT 10;

-- 第二次查询(记录上次最后一条的位置)
SELECT * FROM orders 
WHERE user_id = 1001 
  AND create_time < '2024-01-01 12:00:00'  -- 上次最后一条的create_time
ORDER BY create_time DESC 
LIMIT 10;

2.3 JOIN优化

2.3.1 小表驱动大表

-- 假设user表1000条记录,orders表100万条记录
-- 正确的做法:小表驱动大表
SELECT u.*, o.order_no
FROM user u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.id = 1001;

-- 错误的做法:大表驱动小表(性能差)
SELECT u.*, o.order_no
FROM orders o
LEFT JOIN user u ON o.user_id = u.id
WHERE u.id = 1001;

2.3.2 避免笛卡尔积

-- 反例:缺少关联条件,产生笛卡尔积
SELECT * FROM orders, user;

-- 正例:明确关联条件
SELECT * FROM orders o, user u WHERE o.user_id = u.id;

-- 更好的写法:使用INNER JOIN
SELECT * FROM orders o INNER JOIN user u ON o.user_id = u.id;

2.4 批量操作优化

在高并发场景下,频繁的单条插入/更新会造成严重的性能问题。

2.4.1 批量插入

-- 反例:逐条插入(1000次数据库交互)
INSERT INTO orders (user_id, order_no, status) VALUES (1, 'A001', 1);
INSERT INTO orders (user_id, order_no, status) VALUES (2, 'A002', 1);
-- ... 重复1000次

-- 正例:批量插入(1次数据库交互)
INSERT INTO orders (user_id, order_no, status) VALUES 
(1, 'A001', 1),
(2, 'A002', 1),
(3, 'A003', 1),
-- ... 一次插入多条
(1000, 'A1000', 1);

-- Java代码示例(使用PreparedStatement批量处理)
String sql = "INSERT INTO orders (user_id, order_no, status) VALUES (?, ?, ?)";
PreparedStatement ps = conn.prepareStatement(sql);
for (Order order : orderList) {
    ps.setInt(1, order.getUserId());
    ps.setString(2, order.getOrderNo());
    ps.setInt(3, order.getStatus());
    ps.addBatch();
}
ps.executeBatch();

2.4.2 批量更新

-- 使用CASE WHEN进行批量更新
UPDATE orders 
SET status = CASE id
    WHEN 1 THEN 2
    WHEN 2 THEN 3
    WHEN 3 THEN 4
    ELSE status
END,
amount = CASE id
    WHEN 1 THEN 100.00
    WHEN 2 THEN 200.00
    WHEN 3 THEN 300.00
    ELSE amount
END
WHERE id IN (1, 2, 3);

-- 或者使用INSERT ON DUPLICATE KEY UPDATE
INSERT INTO orders (id, user_id, order_no, status, amount) VALUES
(1, 1001, 'A001', 2, 100.00),
(2, 1002, 'A002', 2, 200.00),
(3, 1003, 'A003', 2, 300.00)
ON DUPLICATE KEY UPDATE
    status = VALUES(status),
    amount = VALUES(amount);

三、事务与锁优化:解决锁竞争的核心

3.1 理解MySQL锁机制

3.1.1 锁的类型

MySQL中的锁主要分为以下几类:

  1. 共享锁(S锁):允许事务读取一行数据
  2. 排他锁(X锁):允许事务删除或更新一行数据
  3. 意向锁(IS/IX锁):表级锁,表示事务打算在表中的某些行上加S锁或X锁
  4. 记录锁(Record Lock):锁定索引记录
  5. 间隙锁(Gap Lock):锁定索引记录之间的间隙
  6. 临键锁(Next-Key Lock):记录锁 + 间隙锁,用于解决幻读

3.1.2 锁的兼容性矩阵

锁类型 S锁 X锁 IS锁 IX锁
S锁 兼容 冲突 兼容 冲突
X锁 冲突 冲突 冲突 冲突
IS锁 兼容 冲突 兼容 兼容
IX锁 冲突 冲突 兼容 兼容

3.2 减少锁竞争的策略

3.2.1 缩短事务长度

事务持有锁的时间越长,锁竞争的概率就越大。

// 反例:事务过长,持有锁时间太久
@Transactional
public void processOrder(Long orderId) {
    // 1. 查询订单(加S锁)
    Order order = orderMapper.selectById(orderId);
    
    // 2. 远程调用库存服务(耗时操作,持有锁)
    inventoryService.checkStock(order.getProductId());
    
    // 3. 远程调用支付服务(耗时操作,持有锁)
    paymentService.processPayment(order);
    
    // 4. 更新订单状态(加X锁)
    order.setStatus(2);
    orderMapper.update(order);
}

// 正例:将耗时操作移到事务外
public void processOrder(Long orderId) {
    // 1. 先查询(不加锁或只加S锁)
    Order order = orderMapper.selectById(orderId);
    
    // 2. 耗时的外部调用(在事务外)
    inventoryService.checkStock(order.getProductId());
    paymentService.processPayment(order);
    
    // 3. 在短事务中更新数据库
    updateOrderStatus(orderId, 2);
}

@Transactional
public void updateOrderStatus(Long orderId, Integer status) {
    orderMapper.updateStatus(orderId, status);
}

3.2.2 减少锁的粒度

-- 使用乐观锁(减少悲观锁的使用)
ALTER TABLE orders ADD COLUMN version INT DEFAULT 0;

-- 更新时检查版本号
UPDATE orders 
SET status = 2, version = version + 1 
WHERE id = 1001 AND version = 0;

-- 如果更新失败(版本号不匹配),说明数据已被其他事务修改
-- 应用层需要重试或返回错误

-- Java代码实现乐观锁
public boolean updateOrderWithOptimisticLock(Order order) {
    int retryCount = 0;
    while (retryCount < MAX_RETRY) {
        int affectedRows = orderMapper.updateWithVersion(order);
        if (affectedRows > 0) {
            return true;
        }
        // 版本冲突,重试
        retryCount++;
        order = orderMapper.selectById(order.getId());
    }
    return false;
}

3.2.3 使用合适的事务隔离级别

-- 查看当前隔离级别
SELECT @@transaction_isolation;

-- 设置事务隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 在应用层设置(Spring Boot)
spring:
  datasource:
    hikari:
      transaction-isolation: TRANSACTION_READ_COMMITTED

隔离级别选择建议

  • READ UNCOMMITTED:不推荐,可能读到脏数据
  • READ COMMITTED:大多数场景推荐,避免脏读,锁竞争最小
  • REPEATABLE READ:MySQL默认级别,保证可重复读,但会产生间隙锁
  • SERIALIZABLE:最高隔离级别,锁竞争最严重,只在特殊场景使用

3.3 死锁检测与处理

3.3.1 死锁场景分析

-- 场景1:交叉更新导致死锁
-- 事务A
BEGIN;
UPDATE orders SET status = 2 WHERE id = 1;
-- 稍后
UPDATE orders SET status = 2 WHERE id = 2;
COMMIT;

-- 事务B(同时执行)
BEGIN;
UPDATE orders SET status = 2 WHERE id = 2;
-- 稍后
UPDATE orders SET status = 2 WHERE id = 1;
COMMIT;

-- 场景2:间隙锁导致的死锁
-- 事务A
BEGIN;
SELECT * FROM orders WHERE id BETWEEN 10 AND 20 FOR UPDATE;
-- 插入id=15的记录
INSERT INTO orders (id, user_id, order_no) VALUES (15, 1001, 'A015');
COMMIT;

-- 事务B(同时执行)
BEGIN;
SELECT * FROM orders WHERE id BETWEEN 10 AND 20 FOR UPDATE;
-- 插入id=15的记录(会阻塞,直到事务A提交或回滚)
INSERT INTO orders (id, user_id, order_no) VALUES (15, 1002, 'A016');
COMMIT;

3.3.2 死锁监控与诊断

-- 查看最近的死锁信息
SHOW ENGINE INNODB STATUS\G

-- 在输出中查找"LATEST DETECTED DEADLOCK"部分

-- 开启死锁监控(MySQL 5.6+)
SET GLOBAL innodb_print_all_deadlocks = ON;

-- 查看死锁相关的系统变量
SHOW VARIABLES LIKE 'innodb_deadlock_detect%';
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout%';

3.3.3 应用层死锁处理策略

public class DeadlockRetryHandler {
    private static final int MAX_RETRY = 3;
    private static final long RETRY_DELAY_MS = 100;
    
    public <T> T executeWithRetry(Supplier<T> operation) {
        int retryCount = 0;
        while (retryCount < MAX_RETRY) {
            try {
                return operation.get();
            } catch (DeadlockException e) {
                retryCount++;
                if (retryCount >= MAX_RETRY) {
                    throw e;
                }
                // 随机延迟,避免多个事务同时重试
                try {
                    Thread.sleep(RETRY_DELAY_MS + (long)(Math.random() * 100));
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ie);
                }
            }
        }
        throw new RuntimeException("Max retry exceeded");
    }
}

// 使用示例
DeadlockRetryHandler handler = new DeadlockRetryHandler();
handler.executeWithRetry(() -> {
    // 你的业务逻辑
    return orderService.processOrder(orderId);
});

3.4 锁等待超时优化

-- 查看当前锁等待情况
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

-- 查看锁等待超时设置
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout';

-- 调整锁等待超时时间(根据业务需求)
SET GLOBAL innodb_lock_wait_timeout = 50;

四、架构升级:从单机到分布式

4.1 读写分离

4.1.1 主从复制原理

MySQL主从复制基于binlog(二进制日志)实现:

  1. 主库将变更写入binlog
  2. 从库I/O线程读取主库binlog并写入中继日志(relay log)
  3. 从库SQL线程执行中继日志中的SQL

4.1.2 读写分离实现

应用层实现(ShardingSphere-JDBC)

# application.yml
spring:
  shardingsphere:
    datasource:
      names: master, slave0, slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master:3306/db
        username: root
        password: password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave0:3306/db
        username: root
        password: password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1:3306/db
        username: root
        password: password
    
    rules:
      readwrite-splitting:
        data-sources:
          ds0:
            type: Static
            props:
              write-data-source-name: master
              read-data-source-names: slave0,slave1
            load-balancer-name: round_robin
        load-balancers:
          round_robin:
            type: ROUND_ROBIN

MyCat实现(中间件方式)

<!-- schema.xml -->
<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100">
    <table name="orders" dataNode="dn1,dn2,dn3" rule="mod-long" />
</schema>

<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataNode name="dn3" dataHost="localhost1" database="db3" />

<dataHost name="localhost1" maxCon="1000" minCon="10" balance="1" writeType="0" dbType="mysql" dbDriver="native">
    <heartbeat>select user()</heartbeat>
    <writeHost host="hostM1" url="master:3306" user="root" password="password">
        <readHost host="hostS1" url="slave0:3306" user="root" password="password" />
        <readHost host="hostS2" url="slave1:3306" user="root" password="password" />
    </writeHost>
</dataHost>

4.1.3 主从延迟问题处理

-- 查看从库延迟
SHOW SLAVE STATUS\G
-- 关注 Seconds_Behind_Master 字段

-- 在应用层处理延迟(写后读问题)
public Order getOrderAfterWrite(Long orderId) {
    // 写入后立即读取,可能读到旧数据
    // 方案1:强制读主库
    return orderMapper.selectFromMaster(orderId);
    
    // 方案2:延迟双删
    // 1. 删除缓存
    // 2. 更新数据库
    // 3. 延迟一段时间后再次删除缓存
    
    // 方案3:使用版本号或时间戳
    // 读取时带上版本号,如果版本号不匹配,等待或重试
}

4.2 分库分表

4.2.1 垂直拆分

-- 原始大表
CREATE TABLE user (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    password VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20),
    -- 以下为详细信息,查询频率低
    address VARCHAR(200),
    id_card VARCHAR(20),
    bank_card VARCHAR(30),
    -- 以下为统计信息,更新频繁
    login_count INT,
    last_login_time DATETIME,
    total_amount DECIMAL(10,2)
);

-- 垂直拆分后
-- 用户基本信息表(高频访问)
CREATE TABLE user_base (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    password VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20)
);

-- 用户详细信息表(低频访问)
CREATE TABLE user_detail (
    user_id BIGINT PRIMARY KEY,
    address VARCHAR(200),
    id_card VARCHAR(20),
    bank_card VARCHAR(30),
    FOREIGN KEY (user_id) REFERENCES user_base(id)
);

-- 用户统计信息表(高频更新)
CREATE TABLE user_stats (
    user_id BIGINT PRIMARY KEY,
    login_count INT,
    last_login_time DATETIME,
    total_amount DECIMAL(10,2),
    FOREIGN KEY (user_id) REFERENCES user_base(id)
);

4.2.2 水平拆分

-- 按用户ID取模分表(订单表)
-- orders_0, orders_1, orders_2, orders_3

-- 创建分表函数
DELIMITER $$
CREATE FUNCTION get_order_table_name(user_id BIGINT) 
RETURNS VARCHAR(50)
DETERMINISTIC
BEGIN
    DECLARE table_suffix INT;
    SET table_suffix = user_id % 4;
    RETURN CONCAT('orders_', table_suffix);
END$$
DELIMITER ;

-- 应用层分表路由(Java实现)
public class OrderTableRouter {
    private static final int TABLE_COUNT = 4;
    
    public String getTableName(Long userId) {
        int suffix = (int) (userId % TABLE_COUNT);
        return "orders_" + suffix;
    }
    
    public String getTableName(Long userId, Long orderId) {
        // 优先使用userId,如果userId为空则使用orderId
        if (userId != null) {
            return getTableName(userId);
        }
        int suffix = (int) (orderId % TABLE_COUNT);
        return "orders_" + suffix;
    }
}

// MyBatis Mapper示例
@Mapper
public interface OrderMapper {
    @Select("SELECT * FROM ${tableName} WHERE id = #{id}")
    Order selectById(@Param("tableName") String tableName, @Param("id") Long id);
    
    @Insert("INSERT INTO ${tableName} (user_id, order_no, status) VALUES (#{order.userId}, #{order.orderNo}, #{order.status})")
    void insert(@Param("tableName") String tableName, @Param("order") Order order);
}

// Service层使用
@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    
    private OrderTableRouter router = new OrderTableRouter();
    
    public Order getOrder(Long id, Long userId) {
        String tableName = router.getTableName(userId);
        return orderMapper.selectById(tableName, id);
    }
    
    public void createOrder(Order order) {
        String tableName = router.getTableName(order.getUserId());
        orderMapper.insert(tableName, order);
    }
}

4.2.3 使用ShardingSphere实现分库分表

# ShardingSphere配置
spring:
  shardingsphere:
    datasource:
      names: ds0, ds1
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://server0:3306/db0
        username: root
        password: password
      ds1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://server1:3306/db1
        username: root
        password: password
    
    rules:
      sharding:
        tables:
          orders:
            actual-data-nodes: ds${0..1}.orders_${0..3}
            table-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: orders-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: orders-db-inline
        sharding-algorithms:
          orders-table-inline:
            type: CLASS_BASED
            props:
              strategy: standard
              algorithmClassName: com.example.ModShardingAlgorithm
              sharding-column: user_id
              sharding-count: 4
          orders-db-inline:
            type: CLASS_BASED
            props:
              strategy: standard
              algorithmClassName: com.example.ModShardingAlgorithm
              sharding-column: user_id
              sharding-count: 2

4.3 缓存策略

4.3.1 多级缓存架构

// 本地缓存 + 分布式缓存 + 数据库
public class OrderService {
    // 本地缓存(Caffeine)
    private final Cache<Long, Order> localCache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();
    
    // Redis分布式缓存
    @Autowired
    private RedisTemplate<String, Order> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    public Order getOrder(Long id) {
        // 1. 查询本地缓存
        Order order = localCache.getIfPresent(id);
        if (order != null) {
            return order;
        }
        
        // 2. 查询Redis
        String key = "order:" + id;
        order = redisTemplate.opsForValue().get(key);
        if (order != null) {
            // 回填本地缓存
            localCache.put(id, order);
            return order;
        }
        
        // 3. 查询数据库
        order = orderMapper.selectById(id);
        if (order != null) {
            // 回填缓存
            redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
            localCache.put(id, order);
        }
        
        return order;
    }
    
    public void updateOrder(Order order) {
        // 更新数据库
        orderMapper.update(order);
        
        // 删除缓存(Cache Aside Pattern)
        String key = "order:" + order.getId();
        redisTemplate.delete(key);
        localCache.invalidate(order.getId());
        
        // 延迟双删(处理主从延迟)
        scheduledExecutorService.schedule(() -> {
            redisTemplate.delete(key);
        }, 500, TimeUnit.MILLISECONDS);
    }
}

4.3.2 缓存穿透、击穿、雪崩防护

// 缓存穿透防护(查询不存在的数据)
public Order getOrderWithPenetrationProtection(Long id) {
    String key = "order:" + id;
    String cacheKey = "cache:" + key;
    
    // 查询缓存
    String cached = redisTemplate.opsForValue().get(cacheKey);
    if ("NULL".equals(cached)) {
        return null; // 明确标记为不存在
    }
    if (cached != null) {
        return JSON.parseObject(cached, Order.class);
    }
    
    // 查询数据库
    Order order = orderMapper.selectById(id);
    if (order == null) {
        // 缓存空值,防止穿透
        redisTemplate.opsForValue().set(cacheKey, "NULL", 5, TimeUnit.MINUTES);
        return null;
    }
    
    // 正常缓存
    redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(order), 30, TimeUnit.MINUTES);
    return order;
}

// 缓存击穿防护(热点key过期)
public Order getOrderWithBreakdownProtection(Long id) {
    String key = "order:" + id;
    
    // 使用分布式锁防止缓存击穿
    String lockKey = "lock:" + key;
    String lockValue = UUID.randomUUID().toString();
    
    try {
        // 尝试获取锁(10秒过期,防止死锁)
        Boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
        
        if (Boolean.TRUE.equals(locked)) {
            // 获取锁成功,查询数据库并回填缓存
            Order order = orderMapper.selectById(id);
            if (order != null) {
                redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
            }
            return order;
        } else {
            // 未获取锁,等待并重试
            Thread.sleep(100);
            return getOrderWithBreakdownProtection(id);
        }
    } finally {
        // 释放锁(使用Lua脚本保证原子性)
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), 
            Collections.singletonList(lockKey), lockValue);
    }
}

// 缓存雪崩防护(大量key同时过期)
public void warmupCache() {
    // 预热缓存
    List<Order> orders = orderMapper.selectAll();
    for (Order order : orders) {
        String key = "order:" + order.getId();
        // 随机过期时间,避免同时过期
        int expireTime = 30 + new Random().nextInt(10); // 30-40分钟
        redisTemplate.opsForValue().set(key, order, expireTime, TimeUnit.MINUTES);
    }
}

4.4 消息队列削峰

4.4.1 异步处理架构

// Controller层接收请求,快速返回
@RestController
public class OrderController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping("/orders")
    public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {
        // 1. 参数校验
        validateRequest(request);
        
        // 2. 生成订单ID
        String orderId = generateOrderId();
        
        // 3. 发送消息到队列(立即返回)
        OrderMessage message = new OrderMessage(orderId, request);
        rabbitTemplate.convertAndSend("order.exchange", "order.create", message);
        
        // 4. 返回订单ID,客户端轮询查询结果
        return ResponseEntity.accepted().body(orderId);
    }
}

// 消费者处理订单
@Component
@RabbitListener(queues = "order.queue")
public class OrderMessageConsumer {
    @Autowired
    private OrderService orderService;
    
    @RabbitHandler
    public void processOrder(OrderMessage message) {
        try {
            // 1. 检查库存
            boolean hasStock = inventoryService.checkStock(message.getProductId());
            if (!hasStock) {
                // 库存不足,发送失败消息
                sendFailureMessage(message, "库存不足");
                return;
            }
            
            // 2. 创建订单(数据库操作)
            Order order = orderService.createOrder(message);
            
            // 3. 扣减库存
            inventoryService.deductStock(message.getProductId(), message.getQuantity());
            
            // 4. 发送成功消息
            sendSuccessMessage(order);
            
        } catch (Exception e) {
            // 异常处理,重试或死信队列
            handleException(message, e);
        }
    }
}

4.4.2 延迟队列实现订单超时取消

// 使用RabbitMQ的TTL + 死信队列实现延迟消息
@Configuration
public class DelayedOrderConfig {
    
    // 正常订单队列
    @Bean
    public Queue orderQueue() {
        return new Queue("order.queue", true);
    }
    
    // 延迟队列(消息过期后进入死信队列)
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "order.exchange");
        args.put("x-dead-letter-routing-key", "order.timeout");
        args.put("x-message-ttl", 30 * 60 * 1000); // 30分钟超时
        return new Queue("order.delay.queue", true, false, false, args);
    }
    
    // 死信队列(处理超时订单)
    @Bean
    public Queue orderTimeoutQueue() {
        return new Queue("order.timeout.queue", true);
    }
    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(orderTimeoutQueue())
            .to(orderExchange())
            .with("order.timeout");
    }
}

// 生产者发送延迟消息
public void sendOrderDelayMessage(Order order) {
    rabbitTemplate.convertAndSend(
        "order.exchange",
        "order.delay",
        order,
        message -> {
            message.getMessageProperties().setExpiration("1800000"); // 30分钟
            return message;
        }
    );
}

// 消费者处理超时订单
@Component
@RabbitListener(queues = "order.timeout.queue")
public class OrderTimeoutConsumer {
    @Autowired
    private OrderMapper orderMapper;
    
    @RabbitHandler
    public void handleTimeout(Order order) {
        // 检查订单状态
        Order dbOrder = orderMapper.selectById(order.getId());
        if (dbOrder != null && dbOrder.getStatus() == OrderStatus.PENDING.getValue()) {
            // 取消订单
            orderMapper.updateStatus(order.getId(), OrderStatus.CANCELLED.getValue());
            
            // 释放库存
            inventoryService.releaseStock(order.getProductId(), order.getQuantity());
            
            // 发送取消通知
            sendCancelNotification(order);
        }
    }
}

4.5 数据库连接池优化

4.5.1 HikariCP配置优化

spring:
  datasource:
    hikari:
      # 连接池名称
      pool-name: MyHikariCP
      # 最小空闲连接
      minimum-idle: 10
      # 最大连接数(根据业务调整)
      maximum-pool-size: 50
      # 连接超时时间(毫秒)
      connection-timeout: 30000
      # 空闲连接存活时间(毫秒)
      idle-timeout: 600000
      # 连接最大生命周期(毫秒)
      max-lifetime: 1800000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 是否启用JMX监控
      register-mbeans: true
      # 连接预热
      initialization-fail-timeout: 1
      # 连接泄漏检测
      leak-detection-threshold: 60000

4.5.2 连接池监控

// 监控连接池状态
@Component
public class ConnectionPoolMonitor {
    @Autowired
    private DataSource dataSource;
    
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void monitorConnectionPool() {
        if (dataSource instanceof HikariDataSource) {
            HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
            HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
            
            log.info("连接池状态 - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待连接: {}",
                poolMXBean.getActiveConnections(),
                poolMXBean.getIdleConnections(),
                poolMXBean.getTotalConnections(),
                poolMXBean.getThreadsAwaitingConnection());
            
            // 告警逻辑
            if (poolMXBean.getThreadsAwaitingConnection() > 10) {
                log.warn("连接池等待线程数过高: {}", poolMXBean.getThreadsAwaitingConnection());
                // 发送告警
                alertService.sendAlert("数据库连接池等待线程数过高");
            }
        }
    }
}

五、监控与告警:及时发现问题

5.1 性能监控指标

5.1.1 关键指标监控

-- 查看数据库整体性能指标
SELECT 
    VARIABLE_NAME,
    VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME IN (
    'Threads_connected',
    'Threads_running',
    'Slow_queries',
    'Questions',
    'Queries',
    'Innodb_row_lock_waits',
    'Innodb_row_lock_time_avg',
    'Innodb_buffer_pool_reads',
    'Innodb_buffer_pool_read_requests'
);

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 查看InnoDB锁等待
SELECT 
    COUNT(*) AS lock_waits,
    AVG(lock_time) AS avg_lock_time
FROM information_schema.innodb_lock_waits;

5.1.2 慢查询日志分析

-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录

-- 分析慢查询日志(使用mysqldumpslow)
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

-- 或者使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

5.1.3 使用Performance Schema监控

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT / 1000000000000 AS avg_time_sec,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看表I/O统计
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    COUNT_READ,
    COUNT_WRITE,
    SUM_TIMER_WAIT / 1000000000000 AS total_time_sec
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

5.2 应用层监控

5.2.1 Spring Boot Actuator + Micrometer

# application.yml
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
    distribution:
      percentiles-histogram:
        http.server.requests: true
      percentiles:
        http.server.requests: 0.5,0.95,0.99
// 自定义数据库监控指标
@Component
public class DatabaseMetrics {
    private final MeterRegistry meterRegistry;
    
    public DatabaseMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordQuery(String queryType, long duration, boolean success) {
        Timer.builder("db.query.duration")
            .tag("type", queryType)
            .tag("success", String.valueOf(success))
            .register(meterRegistry)
            .record(duration, TimeUnit.MILLISECONDS);
        
        Counter.builder("db.query.count")
            .tag("type", queryType)
            .tag("success", String.valueOf(success))
            .register(meterRegistry)
            .increment();
    }
}

5.2.2 自定义埋点

// 在Service层埋点
@Service
public class OrderService {
    @Autowired
    private DatabaseMetrics metrics;
    
    public Order getOrder(Long id) {
        long start = System.currentTimeMillis();
        boolean success = false;
        try {
            Order order = orderMapper.selectById(id);
            success = true;
            return order;
        } finally {
            metrics.recordQuery("select", System.currentTimeMillis() - start, success);
        }
    }
}

5.3 告警策略

5.3.1 Prometheus + Grafana告警规则

# prometheus-alerts.yml
groups:
- name: database
  rules:
  # 连接数告警
  - alert: MySQLTooManyConnections
    expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL连接数过高"
      description: "当前连接数 {{ $value }}%,超过80%"
  
  # 慢查询告警
  - alert: MySQLSlowQueries
    expr: rate(mysql_global_status_slow_queries[5m]) > 5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL慢查询增多"
      description: "每秒慢查询 {{ $value }} 个"
  
  # 锁等待告警
  - alert: MySQLLockWaits
    expr: mysql_global_status_innodb_row_lock_waits > 100
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "MySQL锁等待严重"
      description: "InnoDB行锁等待 {{ $value }} 次"
  
  # 主从延迟告警
  - alert: MySQLReplicationLag
    expr: mysql_slave_lag_seconds > 30
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL主从延迟"
      description: "从库延迟 {{ $value }} 秒"

六、实战案例:秒杀系统优化

6.1 秒杀场景特点分析

秒杀场景具有以下特点:

  1. 瞬时流量巨大:短时间内百万级请求
  2. 库存有限:商品数量少,竞争激烈
  3. 读多写少:大量查询请求,少量下单请求
  4. 数据一致性要求高:不能超卖

6.2 秒杀系统架构设计

客户端 → CDN → Nginx → 服务网关 → 秒杀服务 → Redis → MySQL
                    ↓
                消息队列 → 异步处理 → 数据库

6.3 核心代码实现

6.3.1 库存预热与扣减

@Service
public class SeckillService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 预热库存到Redis
     */
    public void preloadStock(Long seckillId, Integer stock) {
        redisTemplate.opsForValue().set(STOCK_KEY + seckillId, stock.toString());
    }
    
    /**
     * 秒杀下单(Redis预扣库存)
     */
    public String seckill(Long seckillId, Long userId) {
        String stockKey = STOCK_KEY + seckillId;
        String orderKey = ORDER_KEY + seckillId + ":" + userId;
        
        // 1. 检查是否已下单(防止重复下单)
        if (redisTemplate.hasKey(orderKey)) {
            throw new RuntimeException("您已参与过秒杀");
        }
        
        // 2. 使用Lua脚本原子扣减库存
        String luaScript = 
            "local stock = redis.call('get', KEYS[1]) " +
            "if not stock then return -1 end " +
            "if tonumber(stock) <= 0 then return 0 end " +
            "redis.call('decr', KEYS[1]) " +
            "return 1";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList(stockKey)
        );
        
        if (result == null || result == -1) {
            throw new RuntimeException("秒杀活动不存在");
        }
        if (result == 0) {
            throw new RuntimeException("库存不足");
        }
        
        // 3. 标记已下单
        redisTemplate.opsForValue().set(orderKey, "1", 30, TimeUnit.MINUTES);
        
        // 4. 发送消息到队列异步创建订单
        SeckillMessage message = new SeckillMessage(seckillId, userId);
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
        
        // 5. 返回订单ID(前端轮询查询结果)
        return generateOrderNo();
    }
    
    /**
     * 异步创建订单(消费者)
     */
    @RabbitListener(queues = "seckill.queue")
    public void createOrder(SeckillMessage message) {
        try {
            // 1. 检查数据库库存(防止超卖)
            Integer stock = orderMapper.getSeckillStock(message.getSeckillId());
            if (stock <= 0) {
                // 回滚Redis库存
                rollbackStock(message.getSeckillId());
                return;
            }
            
            // 2. 创建订单
            Order order = new Order();
            order.setOrderNo(generateOrderNo());
            order.setSeckillId(message.getSeckillId());
            order.setUserId(message.getUserId());
            order.setStatus(OrderStatus.SUCCESS.getValue());
            orderMapper.insert(order);
            
            // 3. 扣减数据库库存(使用乐观锁)
            int affected = orderMapper.decreaseStock(message.getSeckillId());
            if (affected == 0) {
                // 库存不足,回滚并记录异常
                rollbackStock(message.getSeckillId());
                log.error("秒杀库存不足,seckillId: {}", message.getSeckillId());
            }
            
        } catch (Exception e) {
            log.error("创建订单失败", e);
            // 异常处理,死信队列或重试
        }
    }
    
    /**
     * 回滚Redis库存
     */
    private void rollbackStock(Long seckillId) {
        String stockKey = STOCK_KEY + seckillId;
        redisTemplate.opsForValue().increment(stockKey);
    }
}

6.3.2 数据库层优化

-- 秒杀商品表
CREATE TABLE seckill_goods (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    stock INT NOT NULL,
    start_time DATETIME,
    end_time DATETIME,
    version INT DEFAULT 0,  -- 乐观锁版本号
    INDEX idx_time (start_time, end_time)
);

-- 秒杀订单表(分表)
CREATE TABLE seckill_order_0 (
    id BIGINT PRIMARY KEY,
    order_no VARCHAR(64) UNIQUE,
    seckill_id BIGINT,
    user_id BIGINT,
    status TINYINT,
    create_time DATETIME,
    INDEX idx_user (user_id),
    INDEX idx_seckill (seckill_id)
) PARTITION BY HASH(user_id) PARTITIONS 4;

-- 扣减库存SQL(乐观锁)
UPDATE seckill_goods 
SET stock = stock - 1, version = version + 1 
WHERE id = ? AND stock > 0 AND version = ?;

-- 查询库存(不加锁)
SELECT stock FROM seckill_goods WHERE id = ?;

6.3.3 接口限流与防刷

@Component
public class RateLimiter {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 令牌桶限流
     */
    public boolean tryAcquire(String key, int permits, int maxPermits, int refillRate) {
        String luaScript = 
            "local key = KEYS[1] " +
            "local permits = tonumber(ARGV[1]) " +
            "local maxPermits = tonumber(ARGV[2]) " +
            "local refillRate = tonumber(ARGV[3]) " +
            "local now = tonumber(ARGV[4]) " +
            "local interval = 1000 / refillRate " +
            "local tokens = redis.call('hget', key, 'tokens') " +
            "local lastRefill = redis.call('hget', key, 'lastRefill') " +
            "if not tokens then tokens = maxPermits else tokens = tonumber(tokens) end " +
            "if not lastRefill then lastRefill = now else lastRefill = tonumber(lastRefill) end " +
            "local elapsed = now - lastRefill " +
            "if elapsed > 0 then " +
            "    tokens = math.min(maxPermits, tokens + elapsed / interval) " +
            "end " +
            "if tokens >= permits then " +
            "    tokens = tokens - permits " +
            "    redis.call('hset', key, 'tokens', tokens, 'lastRefill', now) " +
            "    redis.call('expire', key, 3600) " +
            "    return 1 " +
            "else " +
            "    return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList("rate_limit:" + key),
            permits, maxPermits, refillRate, System.currentTimeMillis()
        );
        
        return result != null && result == 1;
    }
}

// 在Controller中使用
@RestController
public class SeckillController {
    @Autowired
    private RateLimiter rateLimiter;
    
    @Autowired
    private SeckillService seckillService;
    
    @PostMapping("/seckill/{seckillId}")
    public Result seckill(@PathVariable Long seckillId, @RequestHeader("userId") Long userId) {
        // 1. 用户级限流(每个用户每秒最多1次)
        String userKey = "user:" + userId;
        if (!rateLimiter.tryAcquire(userKey, 1, 1, 1)) {
            return Result.error("操作太频繁,请稍后再试");
        }
        
        // 2. IP级限流(防止恶意刷单)
        String ipKey = "ip:" + getClientIp();
        if (!rateLimiter.tryAcquire(ipKey, 10, 100, 10)) {
            return Result.error("请求过于频繁,请稍后再试");
        }
        
        // 3. 全局限流(保护后端)
        String globalKey = "global:seckill:" + seckillId;
        if (!rateLimiter.tryAcquire(globalKey, 1000, 10000, 1000)) {
            return Result.error("系统繁忙,请稍后再试");
        }
        
        // 4. 执行秒杀
        try {
            String orderNo = seckillService.seckill(seckillId, userId);
            return Result.success(orderNo);
        } catch (Exception e) {
            return Result.error(e.getMessage());
        }
    }
}

6.4 秒杀系统监控

// 秒杀系统专用监控
@Component
public class SeckillMonitor {
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final AtomicLong totalRequests = new AtomicLong(0);
    private final AtomicLong successRequests = new AtomicLong(0);
    private final AtomicLong stockDepletionTime = new AtomicLong(0);
    
    public void recordRequest(boolean success) {
        totalRequests.incrementAndGet();
        if (success) {
            successRequests.incrementAndGet();
        }
        
        // 计算成功率
        double successRate = (double) successRequests.get() / totalRequests.get();
        meterRegistry.gauge("seckill.success.rate", successRate);
    }
    
    public void recordStockDepletion(long timeMs) {
        stockDepletionTime.set(timeMs);
        meterRegistry.timer("seckill.stock.depletion.time").record(timeMs, TimeUnit.MILLISECONDS);
    }
    
    @Scheduled(fixedRate = 10000)
    public void logMetrics() {
        log.info("秒杀监控 - 总请求: {}, 成功: {}, 成功率: {}%, 库存耗时: {}ms",
            totalRequests.get(),
            successRequests.get(),
            totalRequests.get() > 0 ? 
                String.format("%.2f", (double) successRequests.get() / totalRequests.get() * 100) : 0,
            stockDepletionTime.get());
    }
}

七、总结与最佳实践

7.1 高并发优化的核心原则

  1. 空间换时间:使用索引、缓存减少查询时间
  2. 时间换空间:异步处理、消息队列削峰
  3. 减少竞争:降低锁粒度、缩短事务
  4. 分散压力:读写分离、分库分表
  5. 快速失败:限流、熔断、降级

7.2 优化优先级建议

  1. 第一优先级:索引优化、慢SQL优化(成本低,效果明显)
  2. 第二优先级:缓存策略、读写分离(中等成本,效果显著)
  3. 第三优先级:异步处理、消息队列(较高成本,解决峰值问题)
  4. 第四优先级:分库分表、架构升级(高成本,解决数据量问题)

7.3 持续优化建议

  1. 建立性能基线:记录系统正常运行时的指标
  2. 定期性能分析:每周分析慢查询日志
  3. 压测常态化:定期进行压力测试
  4. 监控告警:建立完善的监控体系
  5. 代码审查:重点关注数据库相关代码
  6. 容量规划:根据业务增长提前规划扩容

7.4 常见误区提醒

  1. 过度索引:索引不是越多越好,会影响写性能
  2. 缓存滥用:缓存一致性问题复杂,需要谨慎设计
  3. 分库分表过早:会增加系统复杂度,应在单表性能达到瓶颈后再考虑
  4. 忽视事务:高并发下事务控制不当会导致严重问题
  5. 缺少监控:没有监控就无法知道优化效果

通过以上全面的优化策略,MySQL可以在高并发场景下保持良好的性能表现。关键是要根据实际业务场景选择合适的优化手段,并持续监控和调整。记住,没有银弹,只有最适合的方案。