引言:理解高并发挑战

在现代互联网应用中,高并发访问已成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着巨大的并发压力。高并发场景下,数据库性能瓶颈往往成为系统稳定性的关键制约因素。

高并发对MySQL的主要挑战包括:

  • 连接数耗尽:大量并发请求导致连接池耗尽,新请求无法获取连接
  • CPU资源争抢:频繁的上下文切换和查询执行消耗大量CPU资源
  • I/O瓶颈:高并发读写导致磁盘I/O成为性能瓶颈
  • 锁竞争:行锁、表锁等资源竞争导致事务等待和死锁
  • 缓存失效:热点数据更新导致缓存穿透、雪崩等问题

本文将从架构优化、数据库配置调优、SQL优化、缓存策略等多个维度,系统性地介绍MySQL高并发处理的实战策略。

一、架构层面的优化策略

1.1 读写分离架构

读写分离是应对高并发读多写少场景的经典架构模式。通过将读操作和写操作分离到不同的数据库实例,可以显著提升系统整体吞吐量。

核心原理

  • 主库(Master)负责所有写操作(INSERT/UPDATE/DELETE)
  • 从库(Slave)负责所有读操作(SELECT)
  • 应用层通过路由策略区分读写请求

实现方案示例

// 基于Spring Boot的读写分离实现
@Component
public class DataSourceRouter extends AbstractRoutingDataSource {
    
    private static final ThreadLocal<DataSourceType> CONTEXT_HOLDER = 
        ThreadLocal.withInitial(() -> DataSourceType.MASTER);
    
    public enum DataSourceType {
        MASTER, SLAVE
    }
    
    public static void setMaster() {
        CONTEXT_HOLDER.set(DataSourceType.MASTER);
    }
    
    public static void setSlave() {
        CONTEXT_HOLDER.set(DataSourceType.SLAVE);
    }
    
    public static void clear() {
        CONTEXT_HOLDER.remove();
    }
    
    @Override
    protected Object determineCurrentLookupKey() {
        return CONTEXT_HOLDER.get();
    }
}

// 使用AOP进行读写路由
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.*(..))")
    public void before(JoinPoint joinPoint) {
        String methodName = joinPoint.getSignature().getName();
        if (methodName.startsWith("get") || methodName.startsWith("list") || 
            methodName.startsWith("query")) {
            DataSourceRouter.setSlave();
        } else {
            DataSourceRouter.setMaster();
        }
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void after() {
        DataSourceRouter.clear();
    }
}

配置要点

  • 确保主从复制延迟在可接受范围内(通常秒)
  • 对于强一致性要求的读操作,仍需走主库
  • 监控从库延迟,自动剔除延迟过高的从库

1.2 分库分表策略

当单表数据量超过千万级或并发量极高时,需要考虑分库分表(Sharding)。

水平分表示例

-- 用户表按user_id取模分表
-- user_0, user_1, ..., user_9

-- 创建分表
CREATE TABLE user_0 (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at TIMESTAMP,
    INDEX idx_username (username)
) ENGINE=InnoDB;

-- 分表规则(Java实现)
public class ShardingUtil {
    private static final int TABLE_COUNT = 10;
    
    public static String getTableName(String userId) {
        int hash = userId.hashCode() % TABLE_COUNT;
        return "user_" + (hash < 0 ? -hash : hash);
    }
    
    public static String getTableName(Long userId) {
        return "user_" + (userId % TABLE_COUNT);
    }
}

// 使用示例
String tableName = ShardingUtil.getTableName(userId);
String sql = "INSERT INTO " + tableName + " (id, username, email) VALUES (?, ?, ?)";

分库分表中间件

  • ShardingSphere:功能完整的分布式数据库中间件
  • MyCat:基于Cobar的开源数据库中间件
  • Vitess:YouTube开源的MySQL集群管理工具

1.3 数据库连接池优化

连接池是应用与数据库之间的关键缓冲层,合理配置可大幅提升并发处理能力。

HikariCP配置示例

# application.yml
spring:
  datasource:
    hikari:
      # 连接池名称
      pool-name: MySQLHikariCP
      # 最小空闲连接数
      minimum-idle: 10
      # 最大连接数(根据并发量调整)
      maximum-pool-size: 50
      # 连接超时时间(毫秒)
      connection-timeout: 30000
      # 连接最大生命周期(毫秒)
      max-lifetime: 1800000
      # 空闲连接超时时间(毫秒)
      idle-timeout: 600000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 是否缓存预编译语句
      cache-prep-statements: true
      # 预编译语句缓存大小
      prep-stmt-cache-size: 250
      # 预编译语句最大长度
      prep-stmt-cache-sql-limit: 2048

连接池监控指标

  • 活跃连接数 vs 最大连接数
  • 等待连接的线程数
  • 连接获取平均时间
  • 连接泄露检测

二、MySQL配置调优

2.1 核心参数优化

MySQL的配置参数直接影响其并发处理能力。以下是最关键的几个参数:

# my.cnf 关键配置

[mysqld]
# 连接相关
max_connections = 1000          # 最大连接数,根据内存和业务调整
max_connect_errors = 100000     # 最大连接错误次数
back_log = 500                  # 连接请求队列长度

# InnoDB引擎核心参数
innodb_buffer_pool_size = 16G   # 缓冲池大小,通常为物理内存的50-75%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G       # 重做日志文件大小
innodb_log_buffer_size = 64M    # 重做日志缓冲区
innodb_flush_log_at_trx_commit = 1 # 事务提交策略(1=严格模式,2=性能模式)
innodb_flush_method = O_DIRECT  # 避免双缓冲
innodb_io_capacity = 2000       # InnoDB可用的IOPS
innodb_read_io_threads = 8      # 读线程数
innodb_write_io_threads = 8     # 写线程数

# 查询缓存(MySQL 8.0已移除,5.7及之前版本)
query_cache_type = 0            # 关闭查询缓存
query_cache_size = 0

# 临时表和排序
tmp_table_size = 256M           # 临时表大小
max_heap_table_size = 256M      # 内存表最大大小
sort_buffer_size = 4M           # 排序缓冲区(每个线程)
join_buffer_size = 4M           # 连接缓冲区(每个线程)

# 其他
table_open_cache = 2000         # 表缓存
table_definition_cache = 1400    # 表定义缓存
thread_cache_size = 100         # 线程缓存

2.2 慢查询日志分析

慢查询日志是发现性能问题的利器,配合工具可快速定位问题SQL。

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1;  -- 记录超过1秒的查询
SET GLOBAL log_queries_not_using_indexes = 'ON';

-- 查看慢查询统计
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

使用pt-query-digest分析慢日志

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 输出示例:
# 120.5s user time, 20s system time, 120.5M rss, 200.5M vsz
# Current date: 2024-01-15 10:30:00
# Hostname: db-server-01
# Files: slow.log
# Overall: 12345 total, 123 unique, 0.00 QPS, 0.00x concurrency ________
# Time range: 2024-01-15 00:00:00 to 10:30:00
# Attribute          total     min     max     avg     95%  stddev  median
# ============     ======= ======= ======= ======= ======= ======= =======
# Exec time           1234s    0.01s   45.2s    1.00s    5.2s    3.45s   0.50s
# Lock time          0.50s    0.00s   0.05s    0.0004s  0.001s  0.002s  0.000s
# Rows sent          123.45M   0       1.00M    10.00    100.00  50.00   5.00
# Rows examine       1234.56M  0       10.00M   100.00   1000.00 500.00  50.00
# Query size         123.45M   20      10240    1024     2048    512     1024

# Profile
# Rank Query ID           Response time Calls R/Call V/M   Item
# ==== ================== ============= ===== ====== ===== ====
#    1 0x123456789ABCDEF  500.5000 40.5%  5000 0.1001  0.02 SELECT user
#    2 0x987654321FEDCBA  300.3000 24.3%  3000 0.1001  0.03 SELECT order

2.3 使用Performance Schema监控

MySQL 5.6+ 提供的Performance Schema可以实时监控数据库性能指标。

-- 查看当前最耗时的查询
SELECT 
    THREAD_ID,
    EVENT_NAME,
    SUM_TIMER_WAIT/1000000000000 AS wait_seconds,
    COUNT_STAR AS calls
FROM events_statements_summary_by_thread
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 查看表I/O统计
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    SUM_TIMER_READ/1000000000000 AS read_seconds,
    SUM_TIMER_WRITE/1000000000000 AS write_seconds,
    COUNT_READ,
    COUNT_WRITE
FROM table_io_waits_summary_by_object_type
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

三、SQL优化策略

3.1 索引优化

索引是提升查询性能最有效的手段,但需要合理设计。

索引设计原则

  • 选择性高的列优先建立索引
  • 联合索引遵循最左前缀原则
  • 避免过度索引(写操作会变慢)
  • 定期分析索引使用情况

索引优化示例

-- 原始查询(慢)
SELECT * FROM orders 
WHERE user_id = 12345 
AND status = 'PAID' 
AND created_at >= '2024-01-01';

-- 优化方案1:创建联合索引
CREATE INDEX idx_user_status_created ON orders (user_id, status, created_at);

-- 优化方案2:覆盖索引(避免回表)
CREATE INDEX idx_user_status_created_cover ON orders (user_id, status, created_at, order_no, amount);

-- 查看索引使用情况
EXPLAIN SELECT * FROM orders WHERE user_id = 12345 AND status = 'PAID';
-- 关注:type=ref, key=idx_user_status_created, rows=少量

-- 查看索引碎片
SELECT 
    TABLE_NAME,
    INDEX_NAME,
    CONCAT(ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2), 'MB') AS total_size,
    CONCAT(ROUND(DATA_FREE / 1024 / 1024, 2), 'MB') AS free_size
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database'
ORDER BY DATA_FREE DESC;

-- 重建索引(减少碎片)
ALTER TABLE orders ENGINE=InnoDB;  -- 在线DDL,MySQL 5.6+
-- 或
OPTIMIZE TABLE orders;  -- 需要锁表,适用于小表

3.2 避免索引失效的常见场景

-- 1. 隐式类型转换
-- 错误:user_id是varchar,传入数字
SELECT * FROM users WHERE user_id = 123;  -- 索引失效
-- 正确
SELECT * FROM users WHERE user_id = '123';

-- 2. 函数操作
-- 错误
SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01';
-- 正确
SELECT * FROM orders WHERE created_at >= '2024-01-01' AND created_at < '2024-01-02';

-- 3. LIKE以通配符开头
-- 错误
SELECT * FROM users WHERE username LIKE '%john%';
-- 正确(前缀匹配)
SELECT * FROM users WHERE username LIKE 'john%';

-- 4. OR条件(部分场景)
-- 错误(如果两个条件都有索引,可能失效)
SELECT * FROM orders WHERE user_id = 123 OR amount > 1000;
-- 正确(使用UNION ALL)
SELECT * FROM orders WHERE user_id = 123
UNION ALL
SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;

-- 5. 负向查询
-- 错误
SELECT * FROM orders WHERE status != 'CANCELLED';
-- 正确(如果状态值有限,使用IN)
SELECT * FROM orders WHERE status IN ('PAID', 'SHIPPED', 'DELIVERED');

3.3 分页优化

深度分页是高并发场景下的性能杀手。

-- 原始分页(慢,扫描大量数据)
SELECT * FROM orders 
WHERE user_id = 12345 
ORDER BY created_at DESC 
LIMIT 1000000, 20;  -- 跳过100万行

-- 优化方案1:延迟关联(覆盖索引)
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders 
    WHERE user_id = 12345 
    ORDER BY created_at DESC 
    LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

-- 优化方案2:记录上次位置(业务层优化)
-- 第一次查询
SELECT * FROM orders 
WHERE user_id = 12345 
AND created_at < '2024-01-15 10:00:00'  -- 上次最后一条的时间
ORDER BY created_at DESC 
LIMIT 20;

-- 优化方案3:使用子查询(MySQL 8.0+)
WITH ranked_orders AS (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC) as rn
    FROM orders
    WHERE user_id = 12345
)
SELECT * FROM ranked_orders WHERE rn BETWEEN 1000001 AND 1000020;

3.4 批量操作优化

高并发下,单条插入/更新会带来巨大开销,批量操作是关键。

// 错误:单条插入,性能极差
for (Order order : orderList) {
    orderMapper.insert(order);  // 每次都要网络往返
}

// 正确:批量插入
public void batchInsert(List<Order> orderList) {
    String sql = "INSERT INTO orders (order_no, user_id, amount, status) VALUES (?, ?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false);  // 关闭自动提交
        
        for (int i = 0; i < orderList.size(); i++) {
            Order order = orderList.get(i);
            ps.setString(1, order.getOrderNo());
            ps.setLong(2, order.getUserId());
            ps.setBigDecimal(3, order.getAmount());
            ps.setString(4, order.getStatus());
            ps.addBatch();
            
            // 每1000条提交一次
            if (i % 1000 == 0 || i == orderList.size() - 1) {
                ps.executeBatch();
                conn.commit();
            }
        }
    } catch (SQLException e) {
        // 异常处理
    }
}

// MyBatis批量插入
<insert id="batchInsert" parameterType="list">
    INSERT INTO orders (order_no, user_id, amount, status)
    VALUES
    <foreach collection="list" item="item" separator=",">
        (#{item.orderNo}, #{item.userId}, #{item.amount}, #{item.status})
    </foreach>
</insert>

// 批量更新
<update id="batchUpdate" parameterType="list">
    <foreach collection="list" item="item" separator=";">
        UPDATE orders 
        SET status = #{item.status}, amount = #{item.amount}
        WHERE id = #{item.id}
    </foreach>
</update>

3.5 事务优化

长事务和大事务会严重影响并发性能。

-- 错误:大事务(一次性插入10万条)
START TRANSACTION;
INSERT INTO logs (...) VALUES (...);  -- 10万次
COMMIT;

-- 正确:分批提交
DELIMITER $$
CREATE PROCEDURE batch_insert_logs()
BEGIN
    DECLARE i INT DEFAULT 0;
    DECLARE batch_size INT DEFAULT 1000;
    DECLARE total_rows INT DEFAULT 100000;
    
    WHILE i < total_rows DO
        START TRANSACTION;
        
        INSERT INTO logs (...) 
        VALUES 
        (CONCAT('log_', i), ...),
        (CONCAT('log_', i+1), ...),
        ...
        (CONCAT('log_', i+batch_size-1), ...);
        
        COMMIT;
        SET i = i + batch_size;
        
        -- 小延迟,避免CPU占满
        DO SLEEP(0.01);
    END WHILE;
END$$
DELIMITER ;

-- 事务隔离级别选择
-- 高并发读多写少:READ COMMITTED(降低幻读风险,性能较好)
-- 高并发写多:REPEATABLE READ(默认级别,保证一致性)
SET GLOBAL transaction_isolation = 'READ-COMMITTED';

四、缓存应用策略

4.1 缓存设计原则

缓存是解决高并发问题的终极武器,但需要精心设计。

缓存设计黄金法则

  • 缓存命中率:目标>95%,低于80%需要重新设计
  • 缓存更新策略:Cache-Aside、Write-Through、Write-Behind
  • 缓存一致性:最终一致性 vs 强一致性
  • 缓存容量:根据热点数据量设计
  • 过期策略:主动失效 vs 被动过期

4.2 Redis缓存实战

4.2.1 缓存穿透防护

缓存穿透指查询不存在的数据,导致请求直接打到数据库。

// 布隆过滤器 + 缓存方案
@Component
public class CachePenetrationService {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final BloomFilter<String> bloomFilter;
    
    public CachePenetrationService(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
        // 初始化布隆过滤器,预期插入100万条,误判率0.01%
        this.bloomFilter = BloomFilter.create(
            Funnels.stringFunnel(Charset.defaultCharset()), 
            1000000, 
            0.0001
        );
    }
    
    public Object getUserById(Long userId) {
        String key = "user:" + userId;
        
        // 1. 先查布隆过滤器
        if (!bloomFilter.mightContain(key)) {
            // 一定不存在
            return null;
        }
        
        // 2. 查缓存
        Object user = redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        }
        
        // 3. 查数据库
        user = userMapper.selectById(userId);
        
        if (user != null) {
            // 4. 写入缓存
            redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
        } else {
            // 5. 缓存空值(防止缓存穿透)
            redisTemplate.opsForValue().set(key, NULL_OBJECT, 5, TimeUnit.MINUTES);
        }
        
        return user;
    }
    
    // 初始化时加载热点数据到布隆过滤器
    @PostConstruct
    public void initBloomFilter() {
        List<Long> userIds = userMapper.selectAllIds();
        for (Long userId : userIds) {
            bloomFilter.put("user:" + userId);
        }
    }
}

4.2.2 缓存雪崩防护

缓存雪崩指大量缓存同时失效,导致数据库压力激增。

// 解决方案1:随机过期时间
public void setWithRandomExpire(String key, Object value, int baseExpireSeconds) {
    // 基础过期时间 + 随机偏移(0-300秒)
    int randomExpire = baseExpireSeconds + new Random().nextInt(300);
    redisTemplate.opsForValue().set(key, value, randomExpire, TimeUnit.SECONDS);
}

// 解决方案2:热点数据永不过期 + 后台刷新
@Component
public class HotDataCacheService {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    @PostConstruct
    public void init() {
        // 每5分钟刷新热点数据
        scheduler.scheduleAtFixedRate(this::refreshHotData, 0, 5, TimeUnit.MINUTES);
    }
    
    private void refreshHotData() {
        Set<String> hotKeys = redisTemplate.opsForZSet().range("hot:keys", 0, -1);
        if (hotKeys != null) {
            for (String key : hotKeys) {
                // 异步刷新
                CompletableFuture.runAsync(() -> {
                    Object data = loadFromDB(key);
                    if (data != null) {
                        redisTemplate.opsForValue().set(key, data, 30, TimeUnit.MINUTES);
                    }
                });
            }
        }
    }
    
    private Object loadFromDB(String key) {
        // 从数据库加载数据
        return null;
    }
}

// 解决方案3:Redis集群 + 本地缓存二级缓存
@Component
public class MultiLevelCacheService {
    
    private final Cache localCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    public Object get(String key) {
        // 1. 本地缓存
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }
        
        // 2. Redis缓存
        value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            localCache.put(key, value);
            return value;
        }
        
        // 3. 数据库
        value = loadFromDB(key);
        if (value != null) {
            redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
            localCache.put(key, value);
        }
        
        return value;
    }
}

4.2.3 缓存与数据库一致性保证

// 方案1:Cache-Aside + 删除缓存策略
public class CacheAsidePattern {
    
    // 更新操作:先更新数据库,再删除缓存
    public void updateOrder(Order order) {
        // 1. 更新数据库
        orderMapper.update(order);
        
        // 2. 删除缓存(让下次查询重新加载)
        String key = "order:" + order.getId();
        redisTemplate.delete(key);
        
        // 3. 延迟双删(解决主从延迟)
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(500);  // 等待主从同步
                redisTemplate.delete(key);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    
    // 查询操作
    public Order getOrder(Long id) {
        String key = "order:" + id;
        
        // 1. 查缓存
        Order order = (Order) redisTemplate.opsForValue().get(key);
        if (order != null) {
            return order;
        }
        
        // 2. 查数据库
        order = orderMapper.selectById(id);
        if (order != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(key, order, 30, TimeUnit.MINUTES);
        }
        
        return order;
    }
}

// 方案2:基于Canal的数据库binlog监听(最终一致性)
@Component
public class CanalCacheUpdater {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    // 模拟Canal监听binlog
    public void onBinlogEvent(String table, String type, Long id) {
        String key = table + ":" + id;
        
        if ("UPDATE".equals(type) || "DELETE".equals(type)) {
            // 删除缓存
            redisTemplate.delete(key);
            
            // 如果是更新,可以异步预热缓存
            if ("UPDATE".equals(type)) {
                CompletableFuture.runAsync(() -> {
                    Object data = loadFromDB(table, id);
                    if (data != null) {
                        redisTemplate.opsForValue().set(key, data, 30, TimeUnit.MINUTES);
                    }
                });
            }
        }
    }
}

4.2.4 Redis集群配置

# Redis集群配置(Spring Boot)
spring:
  redis:
    # 集群模式
    cluster:
      nodes:
        - 192.168.1.101:6379
        - 192.168.1.102:6379
        - 192.168.1.103:6379
        - 192.168.1.104:6379
        - 192.168.1.105:6379
        - 192.168.1.106:6379
      max-redirects: 3
    # 连接池配置
    lettuce:
      pool:
        max-active: 100
        max-idle: 50
        min-idle: 10
        max-wait: 1000ms
      cluster:
        refresh:
          adaptive: true  # 自适应刷新拓扑

Redis Lua脚本保证原子性

-- 原子性获取并更新缓存
-- KEYS[1]: 缓存key
-- ARGV[1]: 过期时间(秒)
-- ARGV[2]: 数据库查询结果(JSON字符串)
local key = KEYS[1]
local expire = tonumber(ARGV[1])
local data = ARGV[2]

-- 检查缓存是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
    return redis.call('GET', key)
else
    -- 缓存不存在,写入并设置过期时间
    redis.call('SET', key, data, 'EX', expire)
    return data
end

Java调用Lua脚本

@Component
public class RedisLuaService {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final DefaultRedisScript<String> getOrSetScript;
    
    public RedisLuaService(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
        
        // 加载Lua脚本
        getOrSetScript = new DefaultRedisScript<>();
        getOrSetScript.setLocation(new ClassPathResource("lua/getOrSet.lua"));
        getOrSetScript.setResultType(String.class);
    }
    
    public String getOrSet(String key, int expireSeconds, Supplier<String> dataLoader) {
        // 执行Lua脚本
        String result = redisTemplate.execute(
            getOrSetScript,
            Collections.singletonList(key),
            String.valueOf(expireSeconds),
            dataLoader.get()  // 只有缓存不存在时才会调用
        );
        return result;
    }
}

4.3 多级缓存架构

// 完整的多级缓存实现
@Component
public class MultiLevelCache {
    
    // L1: Caffeine本地缓存(进程内)
    private final Cache<String, Object> localCache = Caffeine.newBuilder()
        .maximumSize(10000)
        .expireAfterWrite(10, TimeUnit.SECONDS)  // 短过期时间
        .recordStats()  // 记录统计
        .build();
    
    // L2: Redis分布式缓存
    private final RedisTemplate<String, Object> redisTemplate;
    
    // L3: 数据库
    private final UserMapper userMapper;
    
    public User getUser(Long userId) {
        String key = "user:" + userId;
        
        // L1: 本地缓存
        User user = (User) localCache.getIfPresent(key);
        if (user != null) {
            Metrics.counter("cache.hit", "level", "local").increment();
            return user;
        }
        
        // L2: Redis缓存
        user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            // 回填本地缓存
            localCache.put(key, user);
            Metrics.counter("cache.hit", "level", "redis").increment();
            return user;
        }
        
        // L3: 数据库
        user = userMapper.selectById(userId);
        if (user != null) {
            // 双写缓存
            redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
            localCache.put(key, user);
            Metrics.counter("cache.miss").increment();
        } else {
            // 缓存空值
            redisTemplate.opsForValue().set(key, NULL_OBJECT, 5, TimeUnit.MINUTES);
        }
        
        return user;
    }
    
    // 批量查询优化
    public List<User> batchGetUsers(List<Long> userIds) {
        List<String> keys = userIds.stream()
            .map(id -> "user:" + id)
            .collect(Collectors.toList());
        
        // 1. 批量查本地缓存
        Map<String, Object> localResult = localCache.getAllPresent(keys);
        
        // 2. 批量查Redis
        List<String> remainingKeys = keys.stream()
            .filter(k -> !localResult.containsKey(k))
            .collect(Collectors.toList());
        
        List<Object> redisValues = redisTemplate.opsForValue().multiGet(remainingKeys);
        
        // 3. 组装结果并回填本地缓存
        // ... 省略具体实现
        
        // 4. 查数据库(剩余未命中的)
        // ... 省略具体实现
        
        return null;
    }
}

五、高并发场景下的特殊优化

5.1 秒杀场景优化

秒杀是典型的高并发写场景,需要特殊处理。

架构设计

  1. 库存预热:提前将库存加载到Redis
  2. 请求拦截:前端限流 + 后端限流
  3. 库存扣减:Redis Lua脚本原子操作
  4. 异步下单:消息队列削峰
// 秒杀服务实现
@Service
public class SeckillService {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final RabbitTemplate rabbitTemplate;
    private final OrderMapper orderMapper;
    
    // 库存扣减Lua脚本
    private static final String DEDUCT_STOCK_SCRIPT = 
        "local stock = redis.call('GET', KEYS[1]) " +
        "if tonumber(stock) <= 0 then " +
        "   return -1 " +
        "end " +
        "redis.call('DECR', KEYS[1]) " +
        "return tonumber(stock) - 1";
    
    private final DefaultRedisScript<Long> deductStockScript;
    
    public SeckillService(RedisTemplate<String, Object> redisTemplate, 
                         RabbitTemplate rabbitTemplate,
                         OrderMapper orderMapper) {
        this.redisTemplate = redisTemplate;
        this.rabbitTemplate = rabbitTemplate;
        this.orderMapper = orderMapper;
        
        deductStockScript = new DefaultRedisScript<>();
        deductStockScript.setScriptText(DEDUCT_STOCK_SCRIPT);
        deductStockScript.setResultType(Long.class);
    }
    
    /**
     * 秒杀下单
     * @param userId 用户ID
     * @param productId 商品ID
     * @return 订单ID(异步生成)
     */
    public String seckill(Long userId, Long productId) {
        String stockKey = "seckill:stock:" + productId;
        String userKey = "seckill:user:" + productId;
        
        // 1. 检查是否已购买(防止重复下单)
        if (redisTemplate.opsForSet().isMember(userKey, userId)) {
            throw new RuntimeException("您已参与过秒杀");
        }
        
        // 2. 原子扣减库存(Lua脚本)
        Long remainingStock = redisTemplate.execute(
            deductStockScript,
            Collections.singletonList(stockKey)
        );
        
        if (remainingStock == null || remainingStock < 0) {
            throw new RuntimeException("库存不足");
        }
        
        // 3. 记录已购买用户
        redisTemplate.opsForSet().add(userKey, userId);
        redisTemplate.expire(userKey, 1, TimeUnit.HOURS);
        
        // 4. 发送消息到队列,异步创建订单
        SeckillMessage message = new SeckillMessage();
        message.setUserId(userId);
        message.setProductId(productId);
        message.setStock(remainingStock);
        
        rabbitTemplate.convertAndSend("seckill.order", message);
        
        // 5. 返回订单ID(预生成)
        String orderId = "SECKILL_" + System.currentTimeMillis() + "_" + userId;
        return orderId;
    }
    
    /**
     * 异步创建订单(消费者)
     */
    @RabbitListener(queues = "seckill.order")
    public void createOrder(SeckillMessage message) {
        try {
            // 1. 创建订单
            Order order = new Order();
            order.setOrderNo(generateOrderNo());
            order.setUserId(message.getUserId());
            order.setProductId(message.getProductId());
            order.setAmount(new BigDecimal("99.00"));
            order.setStatus("PAID");
            order.setCreateTime(new Date());
            
            orderMapper.insert(order);
            
            // 2. 发送订单成功消息
            rabbitTemplate.convertAndSend("order.success", order);
            
        } catch (Exception e) {
            // 异常处理:记录日志、补偿机制
            log.error("创建订单失败", e);
            // 库存回滚
            String stockKey = "seckill:stock:" + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey);
        }
    }
}

5.2 限流策略

// 基于Redis的分布式限流器
@Component
public class RateLimiter {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    // Lua脚本:令牌桶算法
    private static final String RATE_LIMIT_SCRIPT = 
        "local key = KEYS[1] " +
        "local limit = tonumber(ARGV[1]) " +
        "local expire = tonumber(ARGV[2]) " +
        "local current = tonumber(redis.call('GET', key) or '0') " +
        "if current + 1 > limit then " +
        "   return 0 " +
        "else " +
        "   redis.call('INCR', key) " +
        "   if current == 0 then " +
        "       redis.call('EXPIRE', key, expire) " +
        "   end " +
        "   return 1 " +
        "end";
    
    private final DefaultRedisScript<Long> rateLimitScript;
    
    public RateLimiter(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
        rateLimitScript = new DefaultRedisScript<>();
        rateLimitScript.setScriptText(RATE_LIMIT_SCRIPT);
        rateLimitScript.setResultType(Long.class);
    }
    
    /**
     * 尝试获取令牌
     * @param key 限流key(如:user:123:api:/order/create)
     * @param limit 限制数量
     * @param expireTime 过期时间(秒)
     * @return true-允许,false-拒绝
     */
    public boolean tryAcquire(String key, int limit, int expireTime) {
        Long result = redisTemplate.execute(
            rateLimitScript,
            Collections.singletonList(key),
            String.valueOf(limit),
            String.valueOf(expireTime)
        );
        return result != null && result == 1;
    }
    
    /**
     * 限流注解
     */
    @Aspect
    @Component
    public class RateLimitAspect {
        
        @Autowired
        private RateLimiter rateLimiter;
        
        @Around("@annotation(rateLimit)")
        public Object around(ProceedingJoinPoint pjp, RateLimitAnnotation rateLimit) throws Throwable {
            // 生成key:方法名 + 用户ID
            String key = String.format("%s:%s:%s", 
                rateLimit.key(), 
                getUserId(), 
                pjp.getSignature().getName());
            
            if (!rateLimiter.tryAcquire(key, rateLimit.limit(), rateLimit.period())) {
                throw new RateLimitException("请求过于频繁,请稍后再试");
            }
            
            return pjp.proceed();
        }
    }
}

// 使用示例
@RestController
public class OrderController {
    
    @RateLimitAnnotation(key = "order", limit = 10, period = 60)
    @PostMapping("/order/create")
    public Result createOrder(@RequestBody OrderDTO orderDTO) {
        // 创建订单逻辑
        return Result.success();
    }
}

5.3 消息队列削峰

// RabbitMQ配置
@Configuration
public class RabbitMQConfig {
    
    @Bean
    public Queue orderQueue() {
        // 队列持久化
        return new Queue("order.queue", true);
    }
    
    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("order.exchange");
    }
    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.*");
    }
    
    // 消费者限流配置
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);  // 并发消费者数
        factory.setMaxConcurrentConsumers(10);  // 最大并发数
        factory.setPrefetchCount(10);  // 预取消息数(关键参数)
        return factory;
    }
}

// 生产者
@Service
public class OrderProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendOrderMessage(Order order) {
        // 发送延迟消息(用于订单超时取消)
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.create",
            order,
            message -> {
                // 设置延迟属性(需要延迟队列插件)
                message.getMessageProperties().setDelay(30 * 1000);  // 30秒延迟
                return message;
            }
        );
    }
}

// 消费者
@Component
public class OrderConsumer {
    
    private final OrderService orderService;
    
    @RabbitListener(queues = "order.queue")
    public void processOrder(Order order) {
        try {
            // 业务处理
            orderService.process(order);
        } catch (Exception e) {
            // 异常重试
            throw new AmqpRejectAndDontRequeueException("处理失败", e);
        }
    }
}

六、监控与告警

6.1 关键监控指标

-- MySQL监控SQL
-- 1. 当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 2. 慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 3. InnoDB缓冲池命中率
SELECT 
    (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM information_schema.GLOBAL_STATUS 
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';

-- 4. 锁等待情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    LOCK_TYPE,
    LOCK_MODE,
    LOCK_STATUS,
    THREAD_ID,
    PROCESSLIST_ID
FROM performance_schema.data_locks;

-- 5. 复制延迟
SHOW SLAVE STATUS\G
-- 关注:Seconds_Behind_Master

6.2 监控系统搭建

Prometheus + Grafana监控方案

# docker-compose.yml
version: '3'
services:
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      
  mysql_exporter:
    image: prom/mysqld-exporter
    environment:
      - DATA_SOURCE_NAME=exporter:exporter@(mysql:3306)/
    ports:
      - "9104:9104"

prometheus.yml配置

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysql_exporter:9104']

关键监控面板

  • MySQL Connections(连接数趋势)
  • Query Performance(查询性能)
  • InnoDB Metrics(InnoDB引擎指标)
  • Replication Lag(复制延迟)
  • Slow Queries(慢查询统计)

6.3 告警规则示例

# alert-rules.yml
groups:
- name: mysql_alerts
  rules:
  - alert: MySQLTooManyConnections
    expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections * 100 > 80
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL连接数过高"
      description: "当前连接数 {{ $value }}%,超过80%"
      
  - alert: MySQLSlowQueries
    expr: increase(mysql_global_status_slow_queries[5m]) > 10
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "MySQL慢查询增多"
      description: "5分钟内新增 {{ $value }} 个慢查询"
      
  - alert: MySQLReplicationLag
    expr: mysql_slave_lag_seconds > 10
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "MySQL复制延迟"
      description: "从库延迟 {{ $value }} 秒"

七、实战案例:电商秒杀系统

7.1 完整架构图

客户端 → CDN → Nginx → 限流 → 秒杀服务 → Redis集群
                                      ↓
                                   消息队列 → 订单服务 → MySQL集群
                                      ↓
                                   监控告警系统

7.2 核心代码实现

// 秒杀服务完整实现
@Service
public class SeckillSystem {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RateLimiter rateLimiter;
    
    // 库存扣减Lua脚本
    private static final String DEDUCT_STOCK_SCRIPT = 
        "local stock_key = KEYS[1] " +
        "local stock = tonumber(redis.call('GET', stock_key) or '0') " +
        "if stock <= 0 then return -1 end " +
        "redis.call('DECR', stock_key) " +
        "return stock - 1";
    
    private final DefaultRedisScript<Long> deductScript;
    
    public SeckillSystem() {
        deductScript = new DefaultRedisScript<>();
        deductScript.setScriptText(DEDUCT_STOCK_SCRIPT);
        deductScript.setResultType(Long.class);
    }
    
    /**
     * 秒杀主流程
     */
    @Transactional
    public SeckillResult seckill(Long userId, Long productId) {
        // 1. 限流检查(用户级)
        String userLimitKey = "limit:user:" + userId;
        if (!rateLimiter.tryAcquire(userLimitKey, 5, 60)) {
            return SeckillResult.fail("请求过于频繁");
        }
        
        // 2. 限流检查(IP级)
        String ip = getClientIp();
        String ipLimitKey = "limit:ip:" + ip;
        if (!rateLimiter.tryAcquire(ipLimitKey, 20, 60)) {
            return SeckillResult.fail("IP请求过于频繁");
        }
        
        // 3. 检查是否已购买
        String userKey = "seckill:users:" + productId;
        if (redisTemplate.opsForSet().isMember(userKey, userId)) {
            return SeckillResult.fail("您已参与过秒杀");
        }
        
        // 4. 原子扣减库存
        String stockKey = "seckill:stock:" + productId;
        Long remaining = redisTemplate.execute(deductScript, Collections.singletonList(stockKey));
        
        if (remaining == null || remaining < 0) {
            return SeckillResult.fail("库存不足");
        }
        
        // 5. 记录购买用户
        redisTemplate.opsForSet().add(userKey, userId);
        redisTemplate.expire(userKey, 2, TimeUnit.HOURS);
        
        // 6. 发送消息到队列
        SeckillMessage message = new SeckillMessage();
        message.setUserId(userId);
        message.setProductId(productId);
        message.setStock(remaining);
        message.setTimestamp(System.currentTimeMillis());
        
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
        
        // 7. 返回结果
        String orderId = "SECKILL_" + System.currentTimeMillis() + "_" + userId;
        return SeckillResult.success(orderId);
    }
    
    /**
     * 异步订单处理
     */
    @RabbitListener(queues = "seckill.queue")
    public void processSeckillOrder(SeckillMessage message) {
        try {
            // 1. 检查库存(二次确认)
            String stockKey = "seckill:stock:" + message.getProductId();
            Long stock = (Long) redisTemplate.opsForValue().get(stockKey);
            if (stock == null || stock < 0) {
                // 库存异常,回滚
                redisTemplate.opsForValue().increment(stockKey);
                return;
            }
            
            // 2. 创建订单
            Order order = new Order();
            order.setOrderNo(generateOrderNo());
            order.setUserId(message.getUserId());
            order.setProductId(message.getProductId());
            order.setAmount(new BigDecimal("99.00"));
            order.setStatus("PAID");
            order.setCreateTime(new Date());
            
            orderMapper.insert(order);
            
            // 3. 发送订单成功消息
            rabbitTemplate.convertAndSend("order.exchange", "order.success", order);
            
        } catch (Exception e) {
            log.error("订单处理失败", e);
            // 库存回滚
            String stockKey = "seckill:stock:" + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey);
        }
    }
    
    /**
     * 库存预热
     */
    @PostConstruct
    public void preloadStock() {
        // 从数据库加载库存到Redis
        List<Product> products = productMapper.selectAll();
        for (Product product : products) {
            String key = "seckill:stock:" + product.getId();
            redisTemplate.opsForValue().set(key, product.getStock(), 2, TimeUnit.HOURS);
        }
    }
    
    private String generateOrderNo() {
        return "SECKILL_" + System.currentTimeMillis() + "_" + 
               ThreadLocalRandom.current().nextInt(1000, 9999);
    }
    
    private String getClientIp() {
        // 实际实现从Request中获取
        return "127.0.0.1";
    }
}

7.3 性能测试数据

测试环境:4核8G MySQL,2核4G Redis,单机应用
并发数:1000
测试时长:60秒

结果:
- 总请求数:60,000
- 成功数:58,200
- 失败数:1,800(限流、库存不足)
- 平均响应时间:45ms
- P99响应时间:120ms
- 数据库QPS:约500(异步写入)
- Redis QPS:约2000
- CPU使用率:应用60%,MySQL 40%,Redis 50%

八、总结与最佳实践

8.1 高并发优化 checklist

架构层

  • [ ] 读写分离是否实施
  • [ ] 分库分表是否必要
  • [ ] 连接池配置是否合理
  • [ ] 缓存策略是否完善

数据库层

  • [ ] 核心参数是否优化
  • [ ] 慢查询是否定期清理
  • [ ] 索引是否合理
  • [ ] 事务是否精简

应用层

  • [ ] 批量操作是否使用
  • [ ] 异步处理是否充分
  • [ ] 限流是否到位
  • [ ] 监控是否完善

8.2 常见误区

  1. 过度优化:不要在并发量不高的情况下过度设计
  2. 缓存滥用:缓存不是银弹,不合理的缓存会带来一致性问题
  3. 忽视监控:没有监控的优化是盲目的
  4. 单点瓶颈:任何一层都可能成为瓶颈,需要全局视角

8.3 持续优化建议

  1. 建立性能基线:记录正常情况下的各项指标
  2. 定期压测:模拟真实场景进行压力测试
  3. 代码审查:重点关注数据库相关代码
  4. 知识沉淀:建立团队的优化知识库

通过以上策略的综合应用,可以有效提升MySQL在高并发场景下的性能表现。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。