引言:理解高并发环境下的MySQL挑战

在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着前所未有的挑战。高并发环境下,数据库可能面临连接数耗尽、CPU飙升、磁盘I/O瓶颈、锁竞争激烈等问题,导致响应时间延长甚至服务不可用。

高并发处理的核心目标是:在保证数据一致性和完整性的前提下,最大化数据库的吞吐量和响应速度。这需要从架构设计、配置优化、SQL调优、硬件资源等多个维度综合考虑。本文将深入探讨MySQL在高并发环境下的处理策略,提供可落地的优化方案。

一、连接层优化:从源头控制并发压力

1.1 合理配置连接数参数

MySQL的连接处理是高并发的第一道关卡。关键参数包括max_connectionsback_logthread_cache_size

-- 查看当前连接数配置
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'thread_cache_size';
SHOW STATUS LIKE 'Threads_connected';

-- 动态调整连接数(无需重启)
SET GLOBAL max_connections = 2000;
SET GLOBAL thread_cache_size = 100;

配置建议

  • max_connections:默认值151,高并发场景建议设置为CPU核心数×2×256,但不超过操作系统文件描述符限制。通常设置为1000-5000。
  • back_log:请求连接队列长度,建议设置为max_connections的1/4到1/2,例如500-1000。
  • thread_cache_size:线程缓存,避免频繁创建销毁线程。建议设置为100-200。

监控指标

-- 查看连接拒绝情况
SHOW STATUS LIKE 'Aborted_connects';

-- 查看慢连接
SHOW STATUS LIKE 'Slow_launch_threads';

1.2 连接池技术应用

应用层使用连接池是高并发的标配。以Java为例,使用HikariCP连接池:

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class DataSourceConfig {
    public static HikariDataSource createDataSource() {
        HikariConfig config = new H1kariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("password");
        
        // 核心连接池参数
        config.setMaximumPoolSize(50);        // 最大连接数
        config.setMinimumIdle(10);            // 最小空闲连接
        config.setConnectionTimeout(30000);   // 连接超时30秒
        config.setIdleTimeout(600000);        // 空闲超时10分钟
        config.setMaxLifetime(1800000);       // 连接最大存活30分钟
        config.setLeakDetectionThreshold(60000); // 泄漏检测60秒
        
        // 性能优化
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useLocalSessionState", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");
        
        return new HikariDataSource(config);
    }
}

关键配置说明

  • maximumPoolSize:根据应用服务器CPU和内存配置,通常每个连接消耗2-10MB内存。
  • rewriteBatchedStatements=true:启用批量SQL重写,大幅提升批量插入性能。
  • useServerPrepStmts=true:启用服务器端预编译语句,减少SQL解析开销。

1.3 读写分离架构

读写分离是分散并发压力的有效手段。主库处理写操作,多个从库处理读操作。

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7

-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
log_bin = mysql-bin
read_only = 1
super_read_only = 1

Java代码实现读写分离

public class RoutingDataSource extends AbstractRoutingDataSource {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    public static void setMaster() {
        contextHolder.set("master");
    }

    public static void setSlave() {
        contextHolder.set("slave");
    }

    public static void clear() {
        contextHolder.remove();
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return contextHolder.get();
    }
}

// 使用AOP进行读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.*(..))")
    public void before(JoinPoint joinPoint) {
        String methodName = joinPoint.getSignature().getName();
        if (methodName.startsWith("get") || methodName.startsWith("list") || methodName.startsWith("count")) {
            RoutingDataSource.setSlave();
        } else {
            RoutingDataSource.setMaster();
        }
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void after() {
        RoutingDataSource.clear();
    }
}

二、存储引擎优化:InnoDB深度调优

2.1 内存参数配置

InnoDB的性能很大程度上取决于内存配置。关键参数包括innodb_buffer_pool_sizeinnodb_log_file_sizeinnodb_flush_log_at_trx_commit

-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_log_file_size';
SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit';

-- 动态调整(MySQL 5.7+)
SET GLOBAL innodb_buffer_pool_size = 4*1024*1024*1024; -- 4GB

配置建议

  • innodb_buffer_pool_size核心参数,建议设置为系统内存的50%-75%。例如,32GB内存的服务器可设置为24GB。
  • innodb_buffer_pool_instances:缓冲池实例数,减少并发竞争。建议每个实例至少1GB,最大64个。例如24GB可设置为8-12个。
  • innodb_log_file_size:重做日志文件大小,建议1-2GB。太小会导致频繁checkpoint,太大会增加恢复时间。
  • innodb_flush_log_at_trx_commitACID保证级别
    • 1(默认):每次事务提交都写入磁盘,最安全但最慢。
    • 0:每秒写入磁盘,性能最高但可能丢失1秒数据。
    • 2:每次提交写入OS缓存,每秒刷盘,折中方案。

my.cnf完整配置示例

[mysqld]
# 内存配置
innodb_buffer_pool_size = 24G
innodb_buffer_pool_instances = 12
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON

# 日志配置
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT

# IO配置
innodb_io_capacity = 2000
innodb_io_capacity_max = 4000
innodb_flush_neighbors = 0

# 并发控制
innodb_thread_concurrency = 0
innodb_read_io_threads = 8
innodb_write_io_threads = 8

2.2 事务与锁优化

高并发下锁竞争是性能杀手。优化策略包括:

1. 减少事务粒度

-- 错误示例:大事务
START TRANSACTION;
UPDATE orders SET status = 'processing' WHERE user_id = 1001;
UPDATE order_items SET quantity = 5 WHERE order_id = 1001;
UPDATE inventory SET stock = stock - 5 WHERE product_id = 2001;
-- ... 更多操作
COMMIT;

-- 正确示例:小事务
START TRANSACTION;
UPDATE orders SET status = 'processing' WHERE order_id = 1001 AND user_id = 1001;
COMMIT;

START TRANSACTION;
UPDATE order_items SET quantity = 5 WHERE order_id = 1001;
COMMIT;

2. 乐观锁替代悲观锁

-- 悲观锁(性能差)
SELECT * FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1 WHERE id = 1;

-- 乐观锁(推荐)
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;

-- 更新时检查版本
UPDATE products 
SET stock = stock - 1, version = version + 1 
WHERE id = 1 AND version = 5; -- 假设当前版本是5

-- 检查影响行数,如果为0说明被其他事务修改,需要重试

3. 死锁检测与处理

-- 查看死锁信息
SHOW ENGINE INNODB STATUS\G

-- 开启死锁监控
SET GLOBAL innodb_print_all_deadlocks = ON;

-- 应用层重试机制(Java示例)
public <T> T executeWithRetry(Function<Void, T> operation, int maxRetries) {
    int attempt = 0;
    while (attempt < maxRetries) {
        try {
            return operation.apply(null);
        } catch (DeadlockException e) {
            attempt++;
            if (attempt >= maxRetries) throw e;
            // 指数退避
            try {
                Thread.sleep((long) (Math.random() * Math.pow(2, attempt) * 100));
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }
    throw new RuntimeException("Max retries exceeded");
}

2.3 索引优化策略

索引是查询性能的灵魂。高并发场景下,索引不当会导致全表扫描,引发灾难。

1. 覆盖索引(Covering Index)

-- 查询:只访问索引就能返回结果
SELECT user_id, order_id, amount FROM orders 
WHERE user_id = 1001 AND status = 'paid';

-- 优化:创建覆盖索引
CREATE INDEX idx_user_status_amount ON orders(user_id, status, amount);

2. 前缀索引

-- 对长字符串列使用前缀索引
SELECT COUNT(*) FROM users WHERE email = 'test@example.com';

-- 优化:只索引前20个字符(根据选择性调整)
CREATE INDEX idx_email_prefix ON users(email(20));

3. 索引下推(ICP): MySQL 5.6+ 支持索引下推,自动优化:

-- 查询条件
SELECT * FROM users WHERE age > 25 AND name LIKE 'A%';

-- 索引:idx_age_name(age, name)
-- MySQL会先在索引层过滤age,再过滤name,减少回表次数

4. 索引监控与清理

-- 查看未使用的索引(MySQL 8.0+)
SELECT * FROM sys.schema_unused_indexes;

-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;

-- 查看索引使用情况
SHOW INDEX FROM orders;

三、查询优化:从SQL到执行计划

3.1 执行计划分析

使用EXPLAIN分析查询性能,是SQL优化的第一步。

-- 基础用法
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;

-- 格式化输出
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 1001;

-- 查看实际执行计划(包括统计信息)
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 1001;

执行计划关键字段解读

  • type:访问类型,性能从好到坏:system > const > eq_ref > ref > range > index > ALL
  • key:实际使用的索引
  • rows:预估扫描行数
  • Extra:额外信息,如Using index(覆盖索引)、Using filesort(文件排序)

优化示例

-- 问题SQL:全表扫描
EXPLAIN SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01';
-- type: ALL, rows: 1000000

-- 优化1:使用日期范围
EXPLAIN SELECT * FROM orders 
WHERE created_at >= '2024-01-01 00:00:00' 
  AND created_at < '2024-01-02 00:00:00';
-- type: range, rows: 5000

-- 优化2:添加索引
CREATE INDEX idx_created_at ON orders(created_at);
-- type: range, rows: 5000, Extra: Using index condition

3.2 避免常见陷阱

1. 隐式类型转换

-- phone字段是VARCHAR类型
SELECT * FROM users WHERE phone = 13800138000; -- 隐式转换,索引失效
SELECT * FROM users WHERE phone = '13800138000'; -- 正确

2. OR条件优化

-- 问题SQL
SELECT * FROM orders WHERE user_id = 1001 OR amount > 1000;

-- 优化1:UNION ALL
(SELECT * FROM orders WHERE user_id = 1001)
UNION ALL
(SELECT * FROM orders WHERE amount > 1000 AND user_id != 1001);

-- 优化2:使用IN(如果值不多)
SELECT * FROM orders WHERE user_id IN (1001, 1002, 1003);

3. 分页优化

-- 问题SQL:深度分页性能差
SELECT * FROM orders WHERE status = 'paid' ORDER BY id LIMIT 1000000, 20;

-- 优化1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (SELECT id FROM orders WHERE status = 'paid' ORDER BY id LIMIT 1000000, 20) t
ON o.id = t.id;

-- 优化2:书签记录位置
SELECT * FROM orders 
WHERE status = 'paid' AND id > 1000000 
ORDER BY id LIMIT 20;

3.3 批量操作与预编译

批量插入优化

// 错误方式:逐条插入
for (Order order : orders) {
    jdbcTemplate.update("INSERT INTO orders (user_id, amount) VALUES (?, ?)", 
                        order.getUserId(), order.getAmount());
}

// 正确方式:批量插入
jdbcTemplate.batchUpdate("INSERT INTO orders (user_id, amount) VALUES (?, ?)", 
    new BatchPreparedStatementSetter() {
        @Override
        public void setValues(PreparedStatement ps, int i) throws SQLException {
            Order order = orders.get(i);
            ps.setLong(1, order.getUserId());
            ps.setBigDecimal(2, order.getAmount());
        }
        
        @Override
        public int getBatchSize() {
            return orders.size();
        }
    });

// 使用rewriteBatchedStatements=true后,性能提升10-100倍

预编译语句缓存

// 在连接URL中启用
String url = "jdbc:mysql://localhost:3306/mydb?" +
             "useServerPrepStmts=true&" +
             "cachePrepStmts=true&" +
             "prepStmtCacheSize=250&" +
             "prepStmtCacheSqlLimit=2048";

四、架构扩展:从单机到分布式

4.1 分库分表(Sharding)

当单表数据量超过千万级,需要分库分表。

垂直分表

-- 原表
CREATE TABLE orders (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status VARCHAR(20),
    -- 20+个字段...
    description TEXT
);

-- 拆分后
CREATE TABLE orders_base (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status VARCHAR(20)
);

CREATE TABLE orders_desc (
    order_id BIGINT PRIMARY KEY,
    description TEXT
);

水平分表(按用户ID哈希)

-- 分表规则:user_id % 4
CREATE TABLE orders_0 LIKE orders;
CREATE TABLE orders_1 LIKE orders;
CREATE TABLE orders_2 LIKE orders;
CREATE TABLE orders_3 LIKE orders;

-- 应用层路由
public class ShardingRouter {
    public String getTableName(Long userId) {
        int index = (int) (userId % 4);
        return "orders_" + index;
    }
}

// 查询时动态表名
String tableName = shardingRouter.getTableName(userId);
String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";

使用ShardingSphere(推荐)

# sharding.yaml
dataSources:
  ds_0: jdbc:mysql://localhost:3306/db0
  ds_1: jdbc:mysql://localhost:3306/db1

shardingRule:
  tables:
    orders:
      actualDataNodes: ds_${0..1}.orders_${0..3}
      tableStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: orders_${user_id % 4}
      databaseStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: ds_${user_id % 2}
  bindingTables:
    - orders,order_items

4.2 缓存策略

Redis缓存热点数据

@Service
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private static final String ORDER_CACHE_PREFIX = "order:";
    private static final long CACHE_TTL = 300; // 5分钟

    public Order getOrder(Long orderId) {
        String cacheKey = ORDER_CACHE_PREFIX + orderId;
        
        // 1. 先查缓存
        Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
        if (order != null) {
            return order;
        }
        
        // 2. 缓存未命中,查数据库
        order = orderMapper.selectById(orderId);
        if (order != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return order;
    }

    @CacheEvict(value = "orders", key = "#orderId")
    public void updateOrder(Long orderId, Order order) {
        orderMapper.updateById(order);
        // 删除缓存
        redisTemplate.delete(ORDER_CACHE_PREFIX + orderId);
    }
}

缓存穿透与雪崩防护

// 布隆过滤器防止缓存穿透
public class BloomFilterService {
    private BloomFilter<Long> bloomFilter = BloomFilter.create(
        Funnels.longFunnel(), 
        1000000, // 预期元素数量
        0.01     // 误判率
    );

    public Order getOrder(Long orderId) {
        // 先检查布隆过滤器
        if (!bloomFilter.mightContain(orderId)) {
            return null; // 肯定不存在
        }
        
        // 正常查缓存和数据库
        return getOrderFromCache(orderId);
    }
    
    public void addOrderId(Long orderId) {
        bloomFilter.put(orderId);
    }
}

// 缓存预热防止雪崩
@PostConstruct
public void warmUpCache() {
    List<Long> hotOrderIds = orderMapper.getHotOrderIds();
    for (Long id : hotOrderIds) {
        Order order = orderMapper.selectById(id);
        redisTemplate.opsForValue().set("order:" + id, order, 600, TimeUnit.SECONDS);
    }
}

4.3 异步处理与消息队列

削峰填谷

// Controller层:快速接收请求,返回受理成功
@PostMapping("/order")
public ResponseEntity<String> createOrder(@RequestBody OrderRequest request) {
    // 参数校验
    validateRequest(request);
    
    // 发送到消息队列
    messageQueue.send("order-topic", request);
    
    // 立即返回,不等待数据库写入
    return ResponseEntity.accepted().body("订单已受理,订单号:" + request.getOrderId());
}

// 消费者端:异步处理
@KafkaListener(topics = "order-topic")
public void processOrder(OrderRequest request) {
    try {
        // 1. 扣减库存(悲观锁/乐观锁)
        int updated = inventoryMapper.decreaseStock(request.getProductId(), request.getQuantity());
        if (updated == 0) {
            // 库存不足,发送失败消息
            messageQueue.send("order-fail-topic", request);
            return;
        }
        
        // 2. 创建订单
        Order order = createOrderFromRequest(request);
        orderMapper.insert(order);
        
        // 3. 发送成功消息
        messageQueue.send("order-success-topic", order);
        
    } catch (Exception e) {
        // 记录日志,发送重试消息
        log.error("订单处理失败: {}", request, e);
        messageQueue.send("order-retry-topic", request);
    }
}

五、监控与诊断:持续优化的基础

5.1 性能监控指标

慢查询日志

-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒记录
SET GLOBAL log_queries_not_using_indexes = ON;

-- 查看慢查询统计
SELECT COUNT(*), AVG(query_time) FROM mysql.slow_log WHERE start_time > NOW() - INTERVAL 1 DAY;

实时性能视图

-- 查看当前正在执行的查询
SELECT * FROM information_schema.processlist WHERE command != 'Sleep';

-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G

-- 查看锁等待
SELECT * FROM information_schema.innodb_lock_waits;
SELECT * FROM information_schema.innodb_locks;
SELECT * FROM information_schema.innodb_trx;

5.2 使用Performance Schema

MySQL 5.6+ 提供了强大的性能监控:

-- 启用性能模式
UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' 
WHERE NAME LIKE 'statement/%';

-- 查看最耗时的SQL
SELECT DIGEST_TEXT, COUNT_STAR, AVG_TIMER_WAIT/1000000000000 AS avg_seconds
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC LIMIT 10;

-- 查看表IO
SELECT TABLE_SCHEMA, TABLE_NAME, SUM_READS, SUM_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_READS + SUM_WRITE DESC LIMIT 10;

5.3 第三方监控工具

Prometheus + Grafana监控

# mysqld_exporter配置
data_source_name: root:password@(localhost:3306)/
collect.global_status: true
collect.info_schema.innodb_metrics: true
collect.auto_increment.columns: true
collect.info_schema.processlist: true
collect.binlog_size: true
collect.info_schema.tablestats: true
collect.global_variables: true
collect.info_schema.query_response_time: true
collect.info_schema.userstats: true
collect.info_schema.tables: true
collect.perf_schema.tablelocks: true
collect.perf_schema.file_events: true
collect.perf_schema.indexiowaits: true
collect.perf_schema.tableiowaits: true
collect.slave_status: true
collect.slave_hosts: true

关键监控指标

  • QPS/TPS
  • 慢查询数量
  • 连接数使用率
  • InnoDB缓冲池命中率
  • 锁等待时间
  • 磁盘I/O利用率

六、高级优化技巧

6.1 分区表(Partitioning)

对于超大表,分区可以提升查询性能和管理效率。

-- 按日期范围分区
CREATE TABLE logs (
    id BIGINT AUTO_INCREMENT,
    log_time DATETIME NOT NULL,
    message TEXT,
    PRIMARY KEY (id, log_time)
) PARTITION BY RANGE COLUMNS(log_time) (
    PARTITION p202401 VALUES LESS THAN ('2024-02-01'),
    PARTITION p202402 VALUES LESS THAN ('2024-03-01'),
    PARTITION p202403 VALUES LESS THAN ('2024-04-01'),
    PARTITION p_max VALUES LESS THAN (MAXVALUE)
);

-- 查询时自动分区裁剪
SELECT * FROM logs WHERE log_time >= '2024-03-01'; -- 只扫描p202403和p_max

6.2 全文索引

对于文本搜索场景,使用全文索引替代LIKE。

-- 创建全文索引
ALTER TABLE articles ADD FULLTEXT INDEX ft_title_content (title, content);

-- 自然语言搜索
SELECT * FROM articles 
WHERE MATCH(title, content) AGAINST('database optimization' IN NATURAL LANGUAGE MODE);

-- 布尔模式
SELECT * FROM articles 
WHERE MATCH(title, content) AGAINST('+database -optimization' IN BOOLEAN MODE);

6.3 虚拟列与函数索引

MySQL 5.7+ 支持虚拟列,可以创建基于表达式的索引。

-- 创建虚拟列
ALTER TABLE orders ADD COLUMN order_year YEAR AS (YEAR(created_at)) VIRTUAL;

-- 在虚拟列上创建索引
CREATE INDEX idx_order_year ON orders(order_year);

-- 查询优化
SELECT * FROM orders WHERE order_year = 2024; -- 使用索引

七、实战案例:秒杀系统优化

7.1 问题分析

秒杀场景特点:

  • 瞬时并发:QPS可达10万+
  • 库存有限:竞争激烈
  • 读多写少:大部分请求是查询

7.2 优化方案

1. 前端拦截

// 限流:按钮置灰,防止重复提交
let isSubmitting = false;
function seckill() {
    if (isSubmitting) return;
    isSubmitting = true;
    
    // 倒计时控制
    if (Date.now() < seckillStartTime) {
        alert("未到开始时间");
        return;
    }
    
    // 提交请求
    fetch('/api/seckill', { method: 'POST' })
        .then(() => { isSubmitting = false; })
        .catch(() => { isSubmitting = false; });
}

2. Redis预扣库存

@Service
public class SeckillService {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:orders:";
    
    public boolean seckill(Long userId, Long productId) {
        String stockKey = STOCK_KEY + productId;
        String orderKey = ORDER_KEY + productId + ":" + userId;
        
        // 1. 检查是否已购买
        if (redisTemplate.hasKey(orderKey)) {
            return false; // 已购买
        }
        
        // 2. 原子扣减库存(Lua脚本保证原子性)
        String luaScript = 
            "if redis.call('get', KEYS[1]) >= ARGV[1] then " +
            "   redis.call('decrby', KEYS[1], ARGV[1]); " +
            "   return 1; " +
            "else " +
            "   return 0; " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList(stockKey),
            "1"
        );
        
        if (result == 0) {
            return false; // 库存不足
        }
        
        // 3. 记录购买标记(5分钟过期)
        redisTemplate.opsForValue().set(orderKey, "1", 300, TimeUnit.SECONDS);
        
        // 4. 发送消息到MQ异步创建订单
        messageQueue.send("seckill-order", new SeckillRequest(userId, productId));
        
        return true;
    }
}

3. 数据库最终一致性

@KafkaListener(topics = "seckill-order")
public void createOrder(SeckillRequest request) {
    try {
        // 1. 检查数据库库存(防止超卖)
        Integer stock = inventoryMapper.selectStock(request.getProductId());
        if (stock <= 0) {
            // 回滚Redis库存
            redisTemplate.opsForValue().increment("seckill:stock:" + request.getProductId(), 1);
            return;
        }
        
        // 2. 创建订单(使用乐观锁)
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setProductId(request.getProductId());
        order.setStatus("CREATED");
        
        int inserted = orderMapper.insert(order);
        if (inserted > 0) {
            // 3. 扣减数据库库存
            int updated = inventoryMapper.decreaseStock(request.getProductId(), 1);
            if (updated == 0) {
                // 库存不足,回滚
                throw new RuntimeException("库存不足");
            }
        }
    } catch (Exception e) {
        // 记录失败,人工补偿
        log.error("秒杀订单创建失败", e);
    }
}

4. 数据库层优化

-- 库存表使用InnoDB,开启行级锁
CREATE TABLE inventory (
    product_id BIGINT PRIMARY KEY,
    stock INT NOT NULL,
    version INT DEFAULT 0, -- 乐观锁版本
    INDEX idx_product (product_id)
) ENGINE=InnoDB;

-- 扣减库存SQL(乐观锁)
UPDATE inventory 
SET stock = stock - 1, version = version + 1 
WHERE product_id = ? AND version = ? AND stock > 0;

八、总结与最佳实践

8.1 优化优先级

  1. SQL与索引优化:成本最低,效果最明显
  2. 配置优化:调整MySQL参数,释放硬件潜力
  3. 缓存策略:减少数据库访问
  4. 架构扩展:读写分离、分库分表
  5. 异步处理:削峰填谷,提升吞吐量

8.2 持续优化原则

  • 监控驱动:基于数据做优化,而不是猜测
  • 渐进式:每次只改一个变量,观察效果
  • 压测验证:使用sysbench、JMeter等工具验证优化效果
  • 回滚预案:任何变更都要有回滚方案

8.3 高并发黄金法则

  1. 能不查库就不查:缓存、静态化、CDN
  2. 能异步就不同步:消息队列、任务调度
  3. 能内存计算就不磁盘:Redis、本地缓存
  4. 能并行就不串行:线程池、批量处理
  5. 能限流就不硬抗:熔断、降级、限流

通过以上策略的综合运用,MySQL完全可以应对百万级QPS的高并发挑战。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。