在当今互联网时代,高并发场景已成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,系统都可能面临每秒数万甚至数十万的请求冲击。MySQL作为最流行的关系型数据库之一,在高并发环境下常常成为性能瓶颈。本文将深入探讨MySQL应对百万级请求的策略,从架构设计、数据库优化到应用层优化,提供一套完整的解决方案。

一、理解高并发挑战的本质

1.1 什么是高并发?

高并发是指系统在短时间内接收到大量请求,这些请求可能同时或几乎同时到达系统。对于MySQL而言,高并发主要体现在:

  • 连接数激增:大量客户端同时建立数据库连接
  • 查询请求爆发:短时间内产生海量的SQL查询
  • 写入压力:大量数据插入、更新操作
  • 锁竞争:事务间的锁等待导致性能下降

1.2 百万级请求的典型场景

  • 电商秒杀:100万用户同时抢购1000件商品
  • 社交媒体:热点事件引发的瞬间流量洪峰
  • 金融交易:股票交易高峰期的订单处理
  • 物联网:百万设备同时上报数据

二、架构层面的优化策略

2.1 读写分离架构

读写分离是应对高并发的基础策略,通过将读操作和写操作分离到不同的数据库实例,显著降低主库压力。

实现方案

-- 主库配置(写操作)
-- my.cnf 配置
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW

-- 从库配置(读操作)
[mysqld]
server-id=2
relay-log=mysql-relay-bin
read-only=1

应用层路由示例(Java)

public class DataSourceRouter {
    private static final ThreadLocal<String> dataSourceType = new ThreadLocal<>();
    
    public static void setMaster() {
        dataSourceType.set("master");
    }
    
    public static void setSlave() {
        dataSourceType.set("slave");
    }
    
    public static String getDataSourceType() {
        return dataSourceType.get() != null ? dataSourceType.get() : "slave";
    }
    
    // 在AOP中根据方法注解选择数据源
    @Aspect
    @Component
    public class DataSourceAspect {
        @Before("@annotation(master)")
        public void before(JoinPoint joinPoint, Master master) {
            DataSourceRouter.setMaster();
        }
        
        @Before("@annotation(slave)")
        public void before(JoinPoint joinPoint, Slave slave) {
            DataSourceRouter.setSlave();
        }
    }
}

2.2 分库分表策略

当单表数据量超过千万级或单库连接数达到瓶颈时,需要考虑分库分表。

垂直分库示例

-- 原始单库结构
CREATE DATABASE ecommerce;
USE ecommerce;
CREATE TABLE users (id BIGINT, name VARCHAR(50), email VARCHAR(100));
CREATE TABLE orders (id BIGINT, user_id BIGINT, amount DECIMAL(10,2));
CREATE TABLE products (id BIGINT, name VARCHAR(100), price DECIMAL(10,2));

-- 垂直分库后
CREATE DATABASE user_db;
CREATE DATABASE order_db;
CREATE DATABASE product_db;

-- user_db
CREATE TABLE users (id BIGINT, name VARCHAR(50), email VARCHAR(100));

-- order_db
CREATE TABLE orders (id BIGINT, user_id BIGINT, amount DECIMAL(10,2));

-- product_db
CREATE TABLE products (id BIGINT, name VARCHAR(100), price DECIMAL(10,2));

水平分表示例(按用户ID哈希)

-- 创建分表规则
CREATE TABLE orders_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    create_time DATETIME,
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB;

CREATE TABLE orders_1 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    create_time DATETIME,
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB;

-- 分表路由逻辑(Java示例)
public class TableSharding {
    private static final int TABLE_COUNT = 2;
    
    public static String getTableName(long userId) {
        int index = (int) (userId % TABLE_COUNT);
        return "orders_" + index;
    }
    
    // 动态SQL构建
    public String buildQuery(long userId, String baseQuery) {
        String tableName = getTableName(userId);
        return baseQuery.replace("orders", tableName);
    }
}

2.3 缓存层引入

引入Redis等缓存系统,减少直接访问MySQL的次数。

缓存策略示例

@Service
public class ProductService {
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String PRODUCT_CACHE_KEY = "product:";
    private static final long CACHE_TTL = 300; // 5分钟
    
    public Product getProductById(Long id) {
        String cacheKey = PRODUCT_CACHE_KEY + id;
        
        // 1. 先从缓存获取
        Product product = (Product) redisTemplate.opsForValue().get(cacheKey);
        if (product != null) {
            return product;
        }
        
        // 2. 缓存未命中,查询数据库
        product = productMapper.selectById(id);
        if (product != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(cacheKey, product, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return product;
    }
    
    // 更新时删除缓存
    public void updateProduct(Product product) {
        productMapper.updateById(product);
        String cacheKey = PRODUCT_CACHE_KEY + product.getId();
        redisTemplate.delete(cacheKey);
    }
}

三、数据库层面的优化

3.1 索引优化策略

3.1.1 索引设计原则

  • 最左前缀原则:复合索引必须从左到右使用
  • 覆盖索引:查询字段都在索引中,避免回表
  • 避免索引失效:注意函数、类型转换、模糊查询等场景

索引优化示例

-- 原始查询(性能差)
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- 优化方案1:使用范围查询
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

-- 优化方案2:添加索引
ALTER TABLE orders ADD INDEX idx_create_time (create_time);

-- 复合索引示例
-- 查询:WHERE user_id = ? AND status = ? AND create_time > ?
ALTER TABLE orders ADD INDEX idx_user_status_time (user_id, status, create_time);

-- 覆盖索引示例
-- 查询只需要id和user_id
SELECT id, user_id FROM orders WHERE user_id = 123;
-- 添加覆盖索引
ALTER TABLE orders ADD INDEX idx_user_id_cover (user_id, id);

3.1.2 索引监控与维护

-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    stat_value,
    stat_description
FROM mysql.innodb_index_stats 
WHERE database_name = 'your_database'
  AND table_name = 'orders';

-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query_log%';
SHOW VARIABLES LIKE 'long_query_time';

-- 分析表的索引使用情况
EXPLAIN SELECT * FROM orders WHERE user_id = 123 AND status = 'PAID';

3.2 查询优化

3.2.1 避免全表扫描

-- 错误示例:导致全表扫描
SELECT * FROM users WHERE name LIKE '%张%';

-- 优化方案1:使用前缀匹配
SELECT * FROM users WHERE name LIKE '张%';

-- 优化方案2:使用全文索引
ALTER TABLE users ADD FULLTEXT INDEX ft_name (name);
SELECT * FROM users WHERE MATCH(name) AGAINST('张' IN BOOLEAN MODE);

-- 优化方案3:使用Elasticsearch等外部搜索引擎

3.2.2 分页优化

-- 传统分页(性能差,越往后越慢)
SELECT * FROM orders ORDER BY id LIMIT 1000000, 20;

-- 优化方案1:使用覆盖索引
SELECT * FROM orders 
WHERE id > (SELECT id FROM orders ORDER BY id LIMIT 1000000, 1)
ORDER BY id LIMIT 20;

-- 优化方案2:使用延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders ORDER BY id LIMIT 1000000, 20
) t ON o.id = t.id;

-- 优化方案3:使用游标分页(适合移动端)
SELECT * FROM orders 
WHERE id > ?  -- 上一页最后一条记录的id
ORDER BY id 
LIMIT 20;

3.3 事务与锁优化

3.3.1 事务隔离级别选择

-- 查看当前隔离级别
SELECT @@transaction_isolation;

-- 设置隔离级别(会话级)
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 设置隔离级别(全局级)
SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED;

不同隔离级别的适用场景

  • READ UNCOMMITTED:几乎不用,脏读风险高
  • READ COMMITTED:大多数场景,平衡性能与一致性
  • REPEATABLE READ:MySQL默认,适合需要一致读的场景
  • SERIALIZABLE:最高隔离级别,性能最差,慎用

3.3.2 锁优化策略

-- 查看锁信息
SHOW ENGINE INNODB STATUS;

-- 查看当前锁等待
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;

-- 优化长事务
-- 1. 将大事务拆分为小事务
BEGIN;
-- 操作1
COMMIT;

BEGIN;
-- 操作2
COMMIT;

-- 2. 避免在事务中执行耗时操作
-- 错误示例
BEGIN;
UPDATE orders SET status = 'PAID' WHERE id = 1;
-- 调用外部API(耗时)
callExternalAPI();
COMMIT;

-- 正确示例
UPDATE orders SET status = 'PAID' WHERE id = 1;
callExternalAPI(); -- 事务外执行

3.4 参数调优

3.4.1 关键参数配置

# my.cnf 关键配置
[mysqld]
# 连接相关
max_connections = 2000
max_connect_errors = 100000
thread_cache_size = 100

# InnoDB缓冲池(建议设置为物理内存的50%-70%)
innodb_buffer_pool_size = 16G
innodb_buffer_pool_instances = 8

# 日志相关
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M

# 事务相关
innodb_flush_log_at_trx_commit = 1  # 1:每次提交都刷盘,最安全
# innodb_flush_log_at_trx_commit = 2  # 2:每秒刷盘,性能更好但可能丢失1秒数据

# 并发相关
innodb_thread_concurrency = 16
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 查询缓存(MySQL 8.0已移除)
# query_cache_type = 0
# query_cache_size = 0

# 临时表
tmp_table_size = 256M
max_heap_table_size = 256M

# 排序缓冲区
sort_buffer_size = 4M
join_buffer_size = 4M

# 慢查询
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1
log_queries_not_using_indexes = 1

3.4.2 动态参数调整

-- 查看当前参数值
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';

-- 动态调整(无需重启)
SET GLOBAL innodb_buffer_pool_size = 17179869184; -- 16GB

-- 查看运行状态
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read_requests';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_reads';

四、应用层优化策略

4.1 连接池优化

4.1.1 连接池配置(HikariCP示例)

@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        
        // 基本配置
        dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/ecommerce");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        
        // 连接池配置
        dataSource.setMaximumPoolSize(50);      // 最大连接数
        dataSource.setMinimumIdle(10);          // 最小空闲连接
        dataSource.setConnectionTimeout(30000); // 连接超时30秒
        dataSource.setIdleTimeout(600000);      // 空闲超时10分钟
        dataSource.setMaxLifetime(1800000);     // 连接最大存活时间30分钟
        dataSource.setConnectionTestQuery("SELECT 1");
        
        // 性能优化
        dataSource.setPoolName("HikariCP-1");
        dataSource.setLeakDetectionThreshold(60000); // 泄漏检测60秒
        
        return dataSource;
    }
}

4.1.2 连接池监控

@Component
public class ConnectionPoolMonitor {
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void monitorConnectionPool() {
        HikariDataSource dataSource = (HikariDataSource) dataSource();
        
        HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
        
        System.out.println("=== 连接池监控 ===");
        System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
        System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
        System.out.println("总连接数: " + poolMXBean.getTotalConnections());
        System.out.println("等待连接的线程数: " + poolMXBean.getThreadsAwaitingConnection());
        
        // 告警逻辑
        if (poolMXBean.getThreadsAwaitingConnection() > 10) {
            // 发送告警
            sendAlert("数据库连接池等待线程数过高: " + poolMXBean.getThreadsAwaitingConnection());
        }
    }
}

4.2 批量操作优化

4.2.1 批量插入优化

// 错误示例:逐条插入
public void insertOrders(List<Order> orders) {
    for (Order order : orders) {
        orderMapper.insert(order); // 每次都提交事务
    }
}

// 正确示例:批量插入
public void insertOrdersBatch(List<Order> orders) {
    // 1. 使用批量插入
    orderMapper.insertBatch(orders);
    
    // 2. 或者使用JDBC批量
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public void insertOrdersJdbcBatch(List<Order> orders) {
        String sql = "INSERT INTO orders (id, user_id, amount, status) VALUES (?, ?, ?, ?)";
        
        jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                Order order = orders.get(i);
                ps.setLong(1, order.getId());
                ps.setLong(2, order.getUserId());
                ps.setBigDecimal(3, order.getAmount());
                ps.setString(4, order.getStatus());
            }
            
            @Override
            public int getBatchSize() {
                return orders.size();
            }
        });
    }
}

// MyBatis XML配置
<insert id="insertBatch" parameterType="list">
    INSERT INTO orders (id, user_id, amount, status) VALUES
    <foreach collection="list" item="order" separator=",">
        (#{order.id}, #{order.userId}, #{order.amount}, #{order.status})
    </foreach>
</insert>

4.2.2 批量更新优化

// 使用ON DUPLICATE KEY UPDATE
public void batchUpdateWithUpsert(List<Order> orders) {
    String sql = """
        INSERT INTO orders (id, user_id, amount, status, update_time)
        VALUES (?, ?, ?, ?, NOW())
        ON DUPLICATE KEY UPDATE
            amount = VALUES(amount),
            status = VALUES(status),
            update_time = NOW()
        """;
    
    jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
        @Override
        public void setValues(PreparedStatement ps, int i) throws SQLException {
            Order order = orders.get(i);
            ps.setLong(1, order.getId());
            ps.setLong(2, order.getUserId());
            ps.setBigDecimal(3, order.getAmount());
            ps.setString(4, order.getStatus());
        }
        
        @Override
        public int getBatchSize() {
            return orders.size();
        }
    });
}

4.3 异步处理与消息队列

4.3.1 使用消息队列削峰

// 生产者:接收请求,发送到消息队列
@RestController
public class OrderController {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping("/order")
    public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {
        // 1. 参数校验
        if (!validateRequest(request)) {
            return ResponseEntity.badRequest().body("参数错误");
        }
        
        // 2. 生成订单ID
        String orderId = generateOrderId();
        
        // 3. 发送到消息队列(异步处理)
        OrderMessage message = new OrderMessage(orderId, request);
        rabbitTemplate.convertAndSend("order.exchange", "order.create", message);
        
        // 4. 立即返回,不等待数据库操作
        return ResponseEntity.ok("订单已提交,ID: " + orderId);
    }
}

// 消费者:处理消息,写入数据库
@Component
@RabbitListener(queues = "order.queue")
public class OrderConsumer {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @RabbitHandler
    public void processOrder(OrderMessage message) {
        try {
            // 1. 业务逻辑处理
            Order order = convertToOrder(message);
            
            // 2. 数据库操作
            orderMapper.insert(order);
            
            // 3. 发送成功事件
            sendOrderCreatedEvent(order);
            
        } catch (Exception e) {
            // 4. 异常处理,重试或死信队列
            log.error("处理订单失败: {}", message.getOrderId(), e);
            throw new RuntimeException("处理失败", e);
        }
    }
}

4.3.2 延迟队列处理

// 使用RabbitMQ的延迟队列
@Configuration
public class DelayQueueConfig {
    
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 60000); // 60秒延迟
        args.put("x-dead-letter-exchange", "order.exchange");
        args.put("x-dead-letter-routing-key", "order.delay.process");
        return new Queue("order.delay.queue", true, false, false, args);
    }
    
    @Bean
    public Exchange delayExchange() {
        return new DirectExchange("order.delay.exchange");
    }
    
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with("order.delay.key");
    }
}

// 使用延迟队列处理订单状态更新
public void scheduleOrderStatusUpdate(String orderId, long delaySeconds) {
    OrderDelayMessage message = new OrderDelayMessage(orderId, "PAID");
    
    rabbitTemplate.convertAndSend(
        "order.delay.exchange",
        "order.delay.key",
        message,
        msg -> {
            msg.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));
            return msg;
        }
    );
}

五、监控与告警体系

5.1 MySQL性能监控

5.1.1 关键指标监控

-- 1. 连接数监控
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Threads_running';

-- 2. 查询性能监控
SHOW STATUS LIKE 'Slow_queries';
SHOW STATUS LIKE 'Questions';

-- 3. InnoDB缓冲池命中率
SELECT 
    (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS buffer_hit_rate
FROM information_schema.GLOBAL_STATUS 
WHERE VARIABLE_NAME IN ('Innodb_buffer_pool_reads', 'Innodb_buffer_pool_read_requests');

-- 4. 锁等待监控
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;

5.1.2 慢查询分析

-- 启用慢查询日志
SET GLOBAL slow_query_log = 1;
SET GLOBAL long_query_time = 1;
SET GLOBAL log_queries_not_using_indexes = 1;

-- 分析慢查询日志
-- 使用mysqldumpslow工具
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

-- 或者使用pt-query-digest
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

5.2 应用层监控

5.2.1 监控指标收集

@Component
public class MetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    public MetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    // 记录数据库操作耗时
    public <T> T recordDbOperation(String operation, Supplier<T> supplier) {
        Timer.Sample sample = Timer.start(meterRegistry);
        try {
            return supplier.get();
        } finally {
            sample.stop(Timer.builder("db.operation.duration")
                    .tag("operation", operation)
                    .register(meterRegistry));
        }
    }
    
    // 记录连接池状态
    @Scheduled(fixedRate = 30000)
    public void recordConnectionPoolMetrics() {
        HikariDataSource dataSource = (HikariDataSource) dataSource();
        HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
        
        Gauge.builder("db.connections.active", poolMXBean, HikariPoolMXBean::getActiveConnections)
                .register(meterRegistry);
        
        Gauge.builder("db.connections.idle", poolMXBean, HikariPoolMXBean::getIdleConnections)
                .register(meterRegistry);
        
        Gauge.builder("db.connections.waiting", poolMXBean, HikariPoolMXBean::getThreadsAwaitingConnection)
                .register(meterRegistry);
    }
}

5.2.2 告警规则配置

# Prometheus告警规则示例
groups:
  - name: mysql_alerts
    rules:
      - alert: MySQLHighConnections
        expr: mysql_global_status_threads_connected > 1500
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "MySQL连接数过高"
          description: "MySQL当前连接数为{{ $value }},超过阈值1500"
      
      - alert: MySQLSlowQueries
        expr: rate(mysql_global_status_slow_queries[5m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "MySQL慢查询过多"
          description: "过去5分钟慢查询数为{{ $value }}/秒"
      
      - alert: MySQLBufferPoolHitRateLow
        expr: (1 - mysql_global_status_innodb_buffer_pool_reads / mysql_global_status_innodb_buffer_pool_read_requests) * 100 < 95
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "MySQL缓冲池命中率过低"
          description: "当前命中率为{{ $value }}%,低于95%"

六、实战案例:秒杀系统设计

6.1 系统架构设计

用户请求 → Nginx负载均衡 → 应用服务器集群 → Redis缓存 → MySQL数据库
                     ↓
                消息队列(削峰)
                     ↓
                异步处理服务

6.2 核心代码实现

6.2.1 秒杀商品库存管理

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String STOCK_LOCK_KEY = "seckill:lock:";
    
    /**
     * 秒杀下单
     */
    public SeckillResult seckill(Long userId, Long productId) {
        // 1. 参数校验
        if (userId == null || productId == null) {
            return SeckillResult.fail("参数错误");
        }
        
        // 2. 检查是否已秒杀过
        String userKey = "seckill:user:" + userId + ":" + productId;
        if (redisTemplate.hasKey(userKey)) {
            return SeckillResult.fail("您已参与过该秒杀");
        }
        
        // 3. 获取分布式锁
        String lockKey = STOCK_LOCK_KEY + productId;
        String lockValue = UUID.randomUUID().toString();
        boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
        
        if (!locked) {
            return SeckillResult.fail("系统繁忙,请稍后重试");
        }
        
        try {
            // 4. 检查库存(Redis缓存)
            String stockKey = STOCK_KEY + productId;
            Long stock = redisTemplate.opsForValue().increment(stockKey, -1);
            
            if (stock == null || stock < 0) {
                // 库存不足,恢复库存
                redisTemplate.opsForValue().increment(stockKey, 1);
                return SeckillResult.fail("库存不足");
            }
            
            // 5. 标记用户已秒杀
            redisTemplate.opsForValue().set(userKey, "1", 1, TimeUnit.HOURS);
            
            // 6. 发送消息到队列,异步创建订单
            SeckillMessage message = new SeckillMessage(userId, productId, stock);
            rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
            
            return SeckillResult.success("秒杀成功,订单处理中");
            
        } finally {
            // 7. 释放锁
            String currentLockValue = (String) redisTemplate.opsForValue().get(lockKey);
            if (lockValue.equals(currentLockValue)) {
                redisTemplate.delete(lockKey);
            }
        }
    }
    
    /**
     * 异步创建订单
     */
    @RabbitListener(queues = "seckill.queue")
    public void createOrder(SeckillMessage message) {
        try {
            // 1. 检查库存(数据库)
            Product product = productMapper.selectById(message.getProductId());
            if (product == null || product.getStock() <= 0) {
                // 库存不足,回滚Redis库存
                redisTemplate.opsForValue().increment(STOCK_KEY + message.getProductId(), 1);
                return;
            }
            
            // 2. 扣减数据库库存
            int updated = productMapper.decrementStock(message.getProductId());
            if (updated == 0) {
                // 库存不足,回滚Redis库存
                redisTemplate.opsForValue().increment(STOCK_KEY + message.getProductId(), 1);
                return;
            }
            
            // 3. 创建订单
            Order order = new Order();
            order.setId(generateOrderId());
            order.setUserId(message.getUserId());
            order.setProductId(message.getProductId());
            order.setAmount(product.getPrice());
            order.setStatus("PAID");
            order.setCreateTime(new Date());
            
            orderMapper.insert(order);
            
            // 4. 发送订单创建成功事件
            sendOrderCreatedEvent(order);
            
        } catch (Exception e) {
            log.error("创建订单失败", e);
            // 异常处理,记录日志,人工干预
        }
    }
}

6.2.2 库存预热与缓存

@Component
public class StockPreheatService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    /**
     * 秒杀前预热库存到Redis
     */
    @Scheduled(cron = "0 0 9 * * ?") // 每天9点执行
    public void preheatStock() {
        // 1. 查询秒杀商品
        List<Product> products = productMapper.selectSeckillProducts();
        
        // 2. 批量预热到Redis
        Map<String, Long> stockMap = new HashMap<>();
        for (Product product : products) {
            stockMap.put(STOCK_KEY + product.getId(), product.getStock());
        }
        
        // 3. 使用pipeline批量设置
        redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                for (Map.Entry<String, Long> entry : stockMap.entrySet()) {
                    connection.set(entry.getKey().getBytes(), 
                                 String.valueOf(entry.getValue()).getBytes());
                }
                return null;
            }
        });
        
        log.info("库存预热完成,共预热{}个商品", products.size());
    }
}

6.3 压力测试与优化

# 使用JMeter进行压力测试
# 1. 创建测试计划
# 2. 配置线程组:1000个线程,循环100次
# 3. 添加HTTP请求:POST /seckill
# 4. 添加监听器:查看结果树、聚合报告

# 使用ab进行简单测试
ab -n 10000 -c 1000 -p post-data.txt -T "application/json" http://localhost:8080/seckill

# 使用sysbench进行数据库压力测试
sysbench oltp_read_write --table-size=1000000 --threads=100 --time=60 prepare
sysbench oltp_read_write --table-size=1000000 --threads=100 --time=60 run

七、故障排查与应急处理

7.1 常见故障场景

7.1.1 连接数爆满

-- 1. 查看当前连接
SHOW PROCESSLIST;

-- 2. 查看连接来源
SELECT 
    SUBSTRING_INDEX(host, ':', 1) as ip,
    COUNT(*) as connections
FROM information_schema.PROCESSLIST
GROUP BY ip
ORDER BY connections DESC;

-- 3. 杀死异常连接
-- 找到异常连接的ID
SELECT id FROM information_schema.PROCESSLIST 
WHERE time > 300 AND command != 'Sleep';

-- 杀死连接
KILL [process_id];

-- 4. 临时调整连接数
SET GLOBAL max_connections = 2000;

7.1.2 死锁处理

-- 1. 查看死锁信息
SHOW ENGINE INNODB STATUS;

-- 2. 查看当前锁信息
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;

-- 3. 分析死锁原因
-- 查看事务状态
SELECT * FROM information_schema.INNODB_TRX;

-- 4. 优化SQL,避免死锁
-- 示例:按相同顺序访问表
-- 错误:事务A先锁表1再锁表2,事务B先锁表2再锁表1
-- 正确:所有事务都按表1→表2的顺序加锁

7.2 应急处理流程

7.2.1 系统降级策略

@Service
public class DegradationService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 降级处理:当数据库压力过大时,返回缓存数据
     */
    public Product getProductWithDegradation(Long id) {
        // 1. 尝试从数据库获取
        try {
            Product product = productMapper.selectById(id);
            if (product != null) {
                // 更新缓存
                redisTemplate.opsForValue().set("product:" + id, product, 300, TimeUnit.SECONDS);
                return product;
            }
        } catch (Exception e) {
            log.warn("数据库查询失败,降级到缓存", e);
        }
        
        // 2. 降级到缓存
        Product cachedProduct = (Product) redisTemplate.opsForValue().get("product:" + id);
        if (cachedProduct != null) {
            return cachedProduct;
        }
        
        // 3. 返回默认数据
        return getDefaultProduct();
    }
    
    /**
     * 限流降级
     */
    @RateLimiter(name = "dbQuery", fallbackMethod = "fallback")
    public Product getProductWithRateLimit(Long id) {
        return productMapper.selectById(id);
    }
    
    public Product fallback(Long id, Throwable t) {
        log.warn("触发限流降级", t);
        return getDefaultProduct();
    }
}

7.2.2 熔断器模式

@Service
public class CircuitBreakerService {
    
    @Autowired
    private ProductMapper productMapper;
    
    private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("productService");
    
    public Product getProduct(Long id) {
        return circuitBreaker.run(
            () -> productMapper.selectById(id),
            throwable -> {
                log.error("数据库查询失败,触发熔断", throwable);
                return getDefaultProduct();
            }
        );
    }
}

八、总结与最佳实践

8.1 百万级请求应对策略总结

策略类别 具体措施 适用场景 预期效果
架构优化 读写分离、分库分表、引入缓存 数据量大、读写比例高 提升10-100倍性能
数据库优化 索引优化、查询优化、事务优化 任何场景 提升2-10倍性能
应用层优化 连接池优化、批量操作、异步处理 高并发写入场景 提升5-20倍性能
监控告警 性能监控、慢查询分析、实时告警 生产环境 快速发现问题
应急处理 降级、限流、熔断 故障场景 保证系统可用性

8.2 实施路线图

  1. 第一阶段(1-2周):基础优化

    • 优化索引和慢查询
    • 调整数据库参数
    • 配置连接池
  2. 第二阶段(2-4周):架构升级

    • 实施读写分离
    • 引入Redis缓存
    • 部署消息队列
  3. 第三阶段(4-8周):高级优化

    • 分库分表设计
    • 监控体系搭建
    • 应急预案制定
  4. 第四阶段(持续):持续优化

    • 定期性能分析
    • 压力测试
    • 架构演进

8.3 关键注意事项

  1. 不要过早优化:先测量,再优化
  2. 监控先行:没有监控的优化是盲目的
  3. 渐进式改进:每次只改变一个变量
  4. 保持简单:复杂的系统更难维护和优化
  5. 文档化:记录所有优化决策和效果

8.4 推荐工具与资源

  • 监控工具:Prometheus + Grafana、Percona Monitoring and Management
  • 性能分析:Percona Toolkit、pt-query-digest、MySQL Workbench
  • 压力测试:JMeter、sysbench、ab
  • 学习资源
    • 《高性能MySQL》
    • MySQL官方文档
    • Percona博客
    • 阿里云数据库最佳实践

通过以上策略的综合应用,MySQL系统可以稳定应对百万级请求的挑战,避免系统崩溃。关键在于理解业务场景,选择合适的优化策略,并建立完善的监控和应急体系。记住,没有银弹,只有最适合当前场景的解决方案。