引言

在当今互联网应用中,高并发场景无处不在。无论是电商秒杀、社交网络还是金融交易系统,MySQL作为最流行的关系型数据库,面临着巨大的性能挑战。当并发请求量激增时,数据库往往成为系统瓶颈,导致响应延迟甚至服务崩溃。本文将深入探讨MySQL高并发处理的完整策略,从架构设计、索引优化到缓存机制,提供一套实战可行的解决方案。

一、高并发场景下的MySQL架构优化

1.1 读写分离架构

读写分离是应对高并发读操作的最基础架构优化。通过将读请求分发到多个从库,写请求集中在主库,可以显著提升系统吞吐量。

实现方案:

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW

-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read-only = 1

-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

-- 在从库启动复制
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=154;

应用层路由策略:

// Spring Boot + ShardingSphere 配置示例
@Configuration
public class DataSourceConfig {
    @Bean
    public DataSource masterDataSource() {
        return DataSourceBuilder.create()
            .url("jdbc:mysql://master:3306/db")
            .username("root")
            .password("password")
            .build();
    }
    
    @Bean
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create()
            .url("jdbc:mysql://slave:3306/db")
            .username("root")
            .password("password")
            .build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        
        RoutingDataSource routingDataSource = new RoutingDataSource();
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        return routingDataSource;
    }
}

1.2 分库分表策略

当单表数据量超过千万级或单库连接数达到瓶颈时,需要考虑分库分表。

垂直分库示例:

-- 原始单库结构
CREATE TABLE user (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    profile TEXT,
    created_at TIMESTAMP
);

-- 垂直分库后:用户基本信息库
CREATE TABLE user_basic (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at TIMESTAMP
);

-- 用户详情库(独立数据库)
CREATE TABLE user_profile (
    user_id BIGINT PRIMARY KEY,
    profile TEXT,
    last_login TIMESTAMP
);

水平分表策略(按时间分片):

-- 订单表按月分片
CREATE TABLE order_202301 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status TINYINT,
    created_at TIMESTAMP,
    INDEX idx_user_id (user_id)
);

CREATE TABLE order_202302 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status TINYINT,
    created_at TIMESTAMP,
    INDEX idx_user_id (user_id)
);

-- 分片路由逻辑(Java示例)
public class OrderShardingService {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyyMM");
    
    public String getTableName(LocalDateTime orderTime) {
        String suffix = orderTime.format(FORMATTER);
        return "order_" + suffix;
    }
    
    public void insertOrder(Order order) {
        String tableName = getTableName(order.getCreatedTime());
        String sql = String.format(
            "INSERT INTO %s (id, user_id, amount, status, created_at) VALUES (?, ?, ?, ?, ?)",
            tableName
        );
        // 执行插入操作
    }
}

1.3 分布式事务处理

在高并发分库分表场景下,分布式事务是必须考虑的问题。

Seata分布式事务示例:

// 1. 引入依赖
// pom.xml
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.5.2</version>
</dependency>

// 2. 业务代码示例
@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private InventoryMapper inventoryMapper;
    
    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
    public void createOrder(Order order) {
        // 1. 扣减库存(库存库)
        inventoryMapper.decreaseStock(order.getProductId(), order.getQuantity());
        
        // 2. 创建订单(订单库)
        orderMapper.insert(order);
        
        // 3. 发送消息(消息队列)
        sendOrderCreatedMessage(order);
    }
}

二、索引设计与优化策略

2.1 索引设计原则

黄金法则:

  1. 最左前缀原则:复合索引必须从左到右使用
  2. 覆盖索引:查询字段尽量包含在索引中
  3. 索引选择性:高选择性的列更适合建索引
  4. 避免冗余索引:定期检查并删除无用索引

索引设计示例:

-- 用户表索引设计
CREATE TABLE user (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    -- 主键索引(自动创建)
    -- 唯一索引
    UNIQUE KEY uk_username (username),
    UNIQUE KEY uk_email (email),
    
    -- 联合索引(遵循最左前缀原则)
    INDEX idx_status_created (status, created_at),
    
    -- 覆盖索引
    INDEX idx_user_profile (username, email, status, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 索引使用示例
-- 1. 有效使用联合索引
EXPLAIN SELECT * FROM user WHERE status = 1 AND created_at > '2023-01-01';
-- 2. 无效使用(跳过第一列)
EXPLAIN SELECT * FROM user WHERE created_at > '2023-01-01';
-- 3. 覆盖索引查询
EXPLAIN SELECT username, email FROM user WHERE status = 1;

2.2 索引优化实战

慢查询分析与优化:

-- 1. 开启慢查询日志
SET GLOBAL slow_query_log = 1;
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';

-- 2. 使用EXPLAIN分析执行计划
EXPLAIN SELECT * FROM user WHERE username LIKE 'john%';
-- 关注type列:ALL > index > range > ref > eq_ref > const > system
-- 关注Extra列:Using filesort, Using temporary需要优化

-- 3. 索引优化案例
-- 原始查询(全表扫描)
SELECT * FROM orders WHERE DATE(created_at) = '2023-01-01';

-- 优化后(使用范围查询)
SELECT * FROM orders 
WHERE created_at >= '2023-01-01 00:00:00' 
  AND created_at < '2023-01-02 00:00:00';

-- 4. 添加时间范围索引
ALTER TABLE orders ADD INDEX idx_created_at (created_at);

2.3 索引维护策略

定期索引维护脚本:

-- 1. 查看索引使用情况
SELECT 
    table_name,
    index_name,
    stat_value,
    stat_description
FROM mysql.innodb_index_stats 
WHERE database_name = 'your_database'
ORDER BY table_name, index_name;

-- 2. 检查冗余索引
SELECT 
    a.table_schema,
    a.table_name,
    a.index_name AS redundant_index,
    b.index_name AS existing_index
FROM 
    statistics a
JOIN 
    statistics b ON a.table_schema = b.table_schema 
    AND a.table_name = b.table_name
    AND a.index_name != b.index_name
WHERE 
    a.seq_in_index = 1 
    AND b.seq_in_index = 1
    AND a.index_name LIKE CONCAT('%', b.index_name, '%');

-- 3. 重建索引(减少碎片)
ALTER TABLE your_table ENGINE = InnoDB; -- 重建表和索引
-- 或者单独重建索引
ALTER TABLE your_table DROP INDEX idx_old, ADD INDEX idx_new (column1, column2);

三、缓存机制与数据一致性

3.1 多级缓存架构

缓存层次设计:

客户端缓存 → CDN → 应用缓存(Redis) → 数据库缓存(Buffer Pool)

Redis缓存示例:

// Spring Boot + Redis 缓存配置
@Configuration
@EnableCaching
public class RedisCacheConfig {
    
    @Bean
    public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(10))
            .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
        
        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
    }
}

// 缓存服务示例
@Service
public class UserService {
    @Autowired
    private UserMapper userMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 缓存穿透保护
    public User getUserById(Long id) {
        String key = "user:" + id;
        
        // 1. 先查缓存
        User user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        }
        
        // 2. 缓存未命中,查数据库
        user = userMapper.selectById(id);
        
        // 3. 缓存空值(防止缓存穿透)
        if (user == null) {
            redisTemplate.opsForValue().set(key, "NULL", 30, TimeUnit.SECONDS);
            return null;
        }
        
        // 4. 写入缓存
        redisTemplate.opsForValue().set(key, user, 10, TimeUnit.MINUTES);
        return user;
    }
    
    // 更新缓存(Cache Aside模式)
    @Transactional
    public void updateUser(User user) {
        // 1. 更新数据库
        userMapper.updateById(user);
        
        // 2. 删除缓存(避免脏数据)
        String key = "user:" + user.getId();
        redisTemplate.delete(key);
    }
}

3.2 缓存一致性策略

分布式锁实现:

@Component
public class RedisDistributedLock {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String LOCK_PREFIX = "lock:";
    private static final long DEFAULT_EXPIRE_TIME = 30; // 30秒
    
    /**
     * 获取分布式锁
     */
    public boolean tryLock(String key, long expireTime) {
        String lockKey = LOCK_PREFIX + key;
        String value = UUID.randomUUID().toString();
        
        // SETNX + EXPIRE 原子操作
        Boolean result = redisTemplate.opsForValue().setIfAbsent(
            lockKey, 
            value, 
            Duration.ofSeconds(expireTime)
        );
        
        return Boolean.TRUE.equals(result);
    }
    
    /**
     * 释放分布式锁
     */
    public void unlock(String key) {
        String lockKey = LOCK_PREFIX + key;
        String currentValue = redisTemplate.opsForValue().get(lockKey);
        
        // 使用Lua脚本保证原子性
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "   return redis.call('del', KEYS[1]) " +
            "else " +
            "   return 0 " +
            "end";
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        redisTemplate.execute(script, Collections.singletonList(lockKey), currentValue);
    }
}

3.3 缓存雪崩与穿透防护

缓存雪崩防护:

// 1. 随机过期时间
public void setCacheWithRandomTTL(String key, Object value) {
    int baseTTL = 600; // 10分钟
    int randomTTL = baseTTL + new Random().nextInt(300); // 5分钟随机范围
    redisTemplate.opsForValue().set(key, value, randomTTL, TimeUnit.SECONDS);
}

// 2. 热点数据永不过期 + 后台刷新
public class HotDataCache {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    public void refreshHotData(String key, Supplier<Object> dataSupplier) {
        // 定时刷新热点数据
        scheduler.scheduleAtFixedRate(() -> {
            try {
                Object data = dataSupplier.get();
                redisTemplate.opsForValue().set(key, data, 10, TimeUnit.MINUTES);
            } catch (Exception e) {
                log.error("刷新热点数据失败", e);
            }
        }, 0, 5, TimeUnit.MINUTES); // 每5分钟刷新一次
    }
}

四、连接池与线程优化

4.1 连接池配置优化

HikariCP配置示例:

# application.yml
spring:
  datasource:
    hikari:
      # 连接池大小(根据业务调整)
      maximum-pool-size: 20
      minimum-idle: 5
      # 连接超时时间
      connection-timeout: 30000
      # 空闲连接存活时间
      idle-timeout: 600000
      # 连接最大生命周期
      max-lifetime: 1800000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 连接泄漏检测
      leak-detection-threshold: 60000
      # 连接预热
      initialization-fail-timeout: 1

连接池监控:

@Component
public class ConnectionPoolMonitor {
    
    @Autowired
    private DataSource dataSource;
    
    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitorConnectionPool() {
        if (dataSource instanceof HikariDataSource) {
            HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
            
            HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
            
            log.info("连接池状态 - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待连接: {}",
                poolMXBean.getActiveConnections(),
                poolMXBean.getIdleConnections(),
                poolMXBean.getTotalConnections(),
                poolMXBean.getThreadsAwaitingConnection()
            );
            
            // 动态调整连接池大小
            if (poolMXBean.getThreadsAwaitingConnection() > 10) {
                log.warn("等待连接数过多,考虑增加连接池大小");
                // 可以通过配置中心动态调整
            }
        }
    }
}

4.2 线程池优化

自定义线程池配置:

@Configuration
public class ThreadPoolConfig {
    
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心线程数:CPU核心数 * 2
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        executor.setCorePoolSize(corePoolSize);
        
        // 最大线程数:核心线程数 * 2
        executor.setMaxPoolSize(corePoolSize * 2);
        
        // 队列容量
        executor.setQueueCapacity(1000);
        
        // 线程名称前缀
        executor.setThreadNamePrefix("async-task-");
        
        // 拒绝策略:调用者运行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        // 线程空闲存活时间
        executor.setKeepAliveSeconds(60);
        
        // 初始化
        executor.initialize();
        
        return executor;
    }
}

五、SQL优化与查询改写

5.1 避免全表扫描

优化案例:

-- 1. 避免在WHERE子句中使用函数
-- 错误示例
SELECT * FROM orders WHERE DATE(created_at) = '2023-01-01';

-- 正确示例
SELECT * FROM orders 
WHERE created_at >= '2023-01-01 00:00:00' 
  AND created_at < '2023-01-02 00:00:00';

-- 2. 避免使用SELECT *
-- 错误示例
SELECT * FROM user WHERE id = 1;

-- 正确示例(使用覆盖索引)
SELECT username, email FROM user WHERE id = 1;

-- 3. 避免使用OR连接多个条件
-- 错误示例
SELECT * FROM user WHERE status = 1 OR status = 2;

-- 正确示例
SELECT * FROM user WHERE status IN (1, 2);

5.2 分页优化

深度分页问题解决方案:

-- 1. 传统分页(性能差)
SELECT * FROM orders ORDER BY created_at DESC LIMIT 1000000, 10;

-- 2. 优化方案一:使用覆盖索引
SELECT order_id, created_at 
FROM orders 
WHERE created_at < '2023-01-01' 
ORDER BY created_at DESC 
LIMIT 10;

-- 3. 优化方案二:延迟关联
SELECT * FROM orders o
INNER JOIN (
    SELECT order_id 
    FROM orders 
    WHERE created_at < '2023-01-01' 
    ORDER BY created_at DESC 
    LIMIT 1000000, 10
) t ON o.order_id = t.order_id;

-- 4. 优化方案三:使用游标分页(推荐)
-- 第一页
SELECT * FROM orders 
WHERE created_at >= '2023-01-01' 
ORDER BY created_at DESC 
LIMIT 10;

-- 第二页(使用上一页最后一条记录的created_at)
SELECT * FROM orders 
WHERE created_at < '2023-01-01 12:00:00' 
ORDER BY created_at DESC 
LIMIT 10;

5.3 批量操作优化

批量插入优化:

// 1. 传统批量插入(性能差)
public void batchInsertTraditional(List<User> users) {
    for (User user : users) {
        userMapper.insert(user); // 每次插入都创建连接
    }
}

// 2. 批量插入优化
public void batchInsertOptimized(List<User> users) {
    String sql = "INSERT INTO user (username, email, status) VALUES (?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false); // 关闭自动提交
        
        for (int i = 0; i < users.size(); i++) {
            User user = users.get(i);
            ps.setString(1, user.getUsername());
            ps.setString(2, user.getEmail());
            ps.setInt(3, user.getStatus());
            ps.addBatch();
            
            // 每1000条提交一次
            if (i % 1000 == 0) {
                ps.executeBatch();
                conn.commit();
            }
        }
        
        // 提交剩余数据
        ps.executeBatch();
        conn.commit();
        
    } catch (SQLException e) {
        log.error("批量插入失败", e);
        throw new RuntimeException(e);
    }
}

六、监控与告警体系

6.1 MySQL性能监控

关键指标监控:

-- 1. 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Threads_running';

-- 2. 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 3. 查看InnoDB缓冲池命中率
SHOW STATUS LIKE 'Innodb_buffer_pool_read_requests';
SHOW STATUS LIKE 'Innodb_buffer_pool_reads';
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100%

-- 4. 查看锁等待情况
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;

-- 5. 查看表锁情况
SHOW OPEN TABLES WHERE In_use > 0;

监控脚本示例:

#!/bin/bash
# MySQL监控脚本

MYSQL_HOST="localhost"
MYSQL_USER="monitor"
MYSQL_PASS="password"

# 获取连接数
connections=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Threads_connected'" | grep -v "Threads_connected" | awk '{print $2}')

# 获取慢查询数
slow_queries=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Slow_queries'" | grep -v "Slow_queries" | awk '{print $2}')

# 获取缓冲池命中率
buffer_pool_reads=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Innodb_buffer_pool_reads'" | grep -v "Innodb_buffer_pool_reads" | awk '{print $2}')
buffer_pool_read_requests=$(mysql -h$MYSQL_HOST -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Innodb_buffer_pool_read_requests'" | grep -v "Innodb_buffer_pool_read_requests" | awk '{print $2}')

if [ $buffer_pool_read_requests -gt 0 ]; then
    hit_rate=$(echo "scale=2; (1 - $buffer_pool_reads / $buffer_pool_read_requests) * 100" | bc)
else
    hit_rate=0
fi

# 输出监控结果
echo "MySQL监控结果:"
echo "当前连接数: $connections"
echo "慢查询数: $slow_queries"
echo "缓冲池命中率: ${hit_rate}%"

# 告警逻辑
if [ $connections -gt 100 ]; then
    echo "警告:连接数过高!"
fi

if [ $(echo "$hit_rate < 95" | bc) -eq 1 ]; then
    echo "警告:缓冲池命中率过低!"
fi

6.2 慢查询日志分析

使用pt-query-digest分析慢查询:

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 生成HTML报告
pt-query-digest --output=html /var/log/mysql/slow.log > slow_report.html

慢查询优化流程:

  1. 收集:开启慢查询日志,收集慢查询
  2. 分析:使用pt-query-digest分析慢查询模式
  3. 优化:添加索引、改写SQL、调整参数
  4. 验证:对比优化前后的执行计划
  5. 监控:持续监控优化效果

七、实战案例:电商秒杀系统

7.1 秒杀系统架构设计

整体架构:

用户请求 → Nginx → 应用服务器 → Redis缓存 → 消息队列 → MySQL

数据库表设计:

-- 秒杀商品表
CREATE TABLE seckill_goods (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    goods_id BIGINT NOT NULL,
    stock INT NOT NULL,
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL,
    version INT DEFAULT 0, -- 乐观锁版本号
    INDEX idx_goods_time (goods_id, start_time)
) ENGINE=InnoDB;

-- 秒杀订单表
CREATE TABLE seckill_order (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_no VARCHAR(64) NOT NULL,
    user_id BIGINT NOT NULL,
    goods_id BIGINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_user_goods (user_id, goods_id),
    INDEX idx_order_no (order_no)
) ENGINE=InnoDB;

7.2 秒杀业务逻辑实现

分布式锁 + Redis + 消息队列方案:

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private SeckillGoodsMapper seckillGoodsMapper;
    
    @Autowired
    private SeckillOrderMapper seckillOrderMapper;
    
    @Autowired
    private RedisDistributedLock redisDistributedLock;
    
    // 秒杀接口
    @Transactional
    public SeckillResult seckill(Long userId, Long goodsId) {
        String lockKey = "seckill:" + goodsId;
        
        // 1. 获取分布式锁(防止超卖)
        if (!redisDistributedLock.tryLock(lockKey, 5)) {
            return SeckillResult.fail("系统繁忙,请稍后重试");
        }
        
        try {
            // 2. 检查是否已秒杀
            String orderKey = "seckill_order:" + userId + ":" + goodsId;
            if (redisTemplate.hasKey(orderKey)) {
                return SeckillResult.fail("您已参与过该秒杀");
            }
            
            // 3. 检查秒杀时间
            String goodsKey = "seckill_goods:" + goodsId;
            SeckillGoods goods = (SeckillGoods) redisTemplate.opsForValue().get(goodsKey);
            
            if (goods == null) {
                goods = seckillGoodsMapper.selectById(goodsId);
                if (goods == null) {
                    return SeckillResult.fail("商品不存在");
                }
                redisTemplate.opsForValue().set(goodsKey, goods, 10, TimeUnit.MINUTES);
            }
            
            long now = System.currentTimeMillis();
            if (now < goods.getStartTime().getTime() || now > goods.getEndTime().getTime()) {
                return SeckillResult.fail("秒杀未开始或已结束");
            }
            
            // 4. 检查库存(使用Redis预减库存)
            String stockKey = "seckill_stock:" + goodsId;
            Long stock = redisTemplate.opsForValue().decrement(stockKey);
            
            if (stock == null || stock < 0) {
                // 恢复库存
                redisTemplate.opsForValue().increment(stockKey);
                return SeckillResult.fail("库存不足");
            }
            
            // 5. 发送消息到队列(异步创建订单)
            SeckillMessage message = new SeckillMessage();
            message.setUserId(userId);
            message.setGoodsId(goodsId);
            message.setStock(stock);
            
            rabbitTemplate.convertAndSend("seckill.exchange", "seckill.key", message);
            
            // 6. 记录用户已参与
            redisTemplate.opsForValue().set(orderKey, 1, 30, TimeUnit.MINUTES);
            
            return SeckillResult.success("秒杀成功,订单处理中");
            
        } finally {
            // 7. 释放锁
            redisDistributedLock.unlock(lockKey);
        }
    }
    
    // 消费消息,创建订单
    @RabbitListener(queues = "seckill.queue")
    public void processSeckillMessage(SeckillMessage message) {
        try {
            // 1. 检查数据库库存(防止Redis与数据库不一致)
            SeckillGoods goods = seckillGoodsMapper.selectById(message.getGoodsId());
            if (goods.getStock() <= 0) {
                // 库存不足,恢复Redis库存
                redisTemplate.opsForValue().increment("seckill_stock:" + message.getGoodsId());
                return;
            }
            
            // 2. 使用乐观锁更新数据库库存
            int updated = seckillGoodsMapper.updateStockWithVersion(
                message.getGoodsId(), 
                goods.getVersion()
            );
            
            if (updated == 0) {
                // 更新失败,恢复Redis库存
                redisTemplate.opsForValue().increment("seckill_stock:" + message.getGoodsId());
                return;
            }
            
            // 3. 创建订单
            SeckillOrder order = new SeckillOrder();
            order.setOrderNo(generateOrderNo());
            order.setUserId(message.getUserId());
            order.setGoodsId(message.getGoodsId());
            order.setAmount(goods.getPrice());
            order.setStatus(1);
            
            seckillOrderMapper.insert(order);
            
            // 4. 发送订单创建成功消息
            rabbitTemplate.convertAndSend("order.exchange", "order.key", order);
            
        } catch (Exception e) {
            log.error("处理秒杀消息失败", e);
            // 恢复Redis库存
            redisTemplate.opsForValue().increment("seckill_stock:" + message.getGoodsId());
        }
    }
}

7.3 压力测试与调优

使用JMeter进行压力测试:

<!-- JMeter测试计划配置 -->
<TestPlan>
  <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="秒杀并发测试">
    <stringProp name="ThreadGroup.num_threads">1000</stringProp>
    <stringProp name="ThreadGroup.ramp_time">10</stringProp>
    <stringProp name="ThreadGroup.duration">60</stringProp>
  </ThreadGroup>
  
  <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="秒杀请求">
    <stringProp name="HTTPSampler.domain">localhost</stringProp>
    <stringProp name="HTTPSampler.port">8080</stringProp>
    <stringProp name="HTTPSampler.path">/seckill/do</stringProp>
    <stringProp name="HTTPSampler.method">POST</stringProp>
    <elementProp name="Arguments" elementType="Arguments">
      <collectionProp name="Arguments.arguments">
        <elementProp name="userId" elementType="HTTPArgument">
          <stringProp name="Argument.value">${__Random(1,10000)}</stringProp>
        </elementProp>
        <elementProp name="goodsId" elementType="HTTPArgument">
          <stringProp name="Argument.value">1</stringProp>
        </elementProp>
      </collectionProp>
    </elementProp>
  </HTTPSamplerProxy>
</TestPlan>

性能调优参数:

# MySQL配置优化(my.cnf)
[mysqld]
# 连接相关
max_connections = 1000
max_connect_errors = 10000
wait_timeout = 600
interactive_timeout = 600

# InnoDB缓冲池(根据内存调整,一般为总内存的50-70%)
innodb_buffer_pool_size = 4G
innodb_buffer_pool_instances = 8

# 日志相关
innodb_log_file_size = 512M
innodb_log_buffer_size = 16M
innodb_flush_log_at_trx_commit = 2  # 1:每次提交都刷盘(安全) 2:每秒刷盘(性能)

# 事务相关
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = 1

# 查询缓存(MySQL 8.0已移除,5.7及以下版本)
query_cache_type = 0
query_cache_size = 0

# 其他优化
innodb_flush_method = O_DIRECT
innodb_file_per_table = 1
innodb_read_io_threads = 8
innodb_write_io_threads = 8

八、总结与最佳实践

8.1 高并发处理核心原则

  1. 分层设计:从应用层到数据库层逐层优化
  2. 缓存优先:能用缓存解决的就不要直接访问数据库
  3. 异步处理:非核心流程异步化,提升响应速度
  4. 限流降级:保护系统不被突发流量击垮
  5. 监控告警:实时掌握系统状态,快速定位问题

8.2 常见问题排查清单

问题现象 可能原因 解决方案
响应时间慢 1. 慢查询
2. 锁等待
3. 连接池耗尽
1. 优化SQL/加索引
2. 减少事务范围
3. 调整连接池大小
CPU使用率高 1. 复杂计算
2. 大量排序
3. 临时表
1. 优化算法
2. 添加索引避免排序
3. 调整tmp_table_size
内存使用高 1. 缓冲池不足
2. 连接数过多
3. 大查询结果
1. 增加innodb_buffer_pool_size
2. 限制连接数
3. 分页查询
锁竞争激烈 1. 长事务
2. 不合理的索引
3. 高并发更新
1. 缩短事务时间
2. 优化索引
3. 使用乐观锁

8.3 持续优化建议

  1. 定期审查:每月审查慢查询日志和执行计划
  2. 容量规划:根据业务增长预测数据库容量需求
  3. 技术演进:关注MySQL新版本特性(如8.0的直方图、窗口函数)
  4. 团队培训:提升团队数据库优化能力
  5. 工具建设:建立完善的监控和自动化运维工具链

通过以上策略的综合应用,可以有效应对高并发场景下的MySQL性能挑战。记住,没有一劳永逸的解决方案,需要根据业务特点和系统负载持续调优。在实际生产环境中,建议先在测试环境充分验证,再逐步上线,确保系统稳定性。