引言:理解高并发场景下的数据库挑战

在现代互联网应用中,高并发访问已经成为常态。无论是电商平台的秒杀活动,还是社交媒体的热点事件,数据库都面临着前所未有的压力。MySQL作为最流行的关系型数据库,在高并发场景下容易出现性能瓶颈、死锁甚至崩溃。本文将从索引优化、查询优化、事务控制、架构设计等多个维度,详细探讨如何构建高可用的MySQL系统,避免数据库崩溃与死锁问题。

一、索引优化:高并发的第一道防线

1.1 索引的基本原理与选择策略

索引是MySQL性能优化的核心。在高并发场景下,合理的索引可以将查询性能提升几个数量级。理解B+树索引的结构是优化的基础。

主键索引与二级索引的区别

  • 主键索引(聚簇索引):叶子节点存储整行数据
  • 二级索引(非聚簇索引):叶子节点存储主键值
-- 创建用户表示例
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username),
    INDEX idx_email (email),
    INDEX idx_status_created (status, created_at)
) ENGINE=InnoDB;

-- 查看索引使用情况
SHOW INDEX FROM users;

1.2 覆盖索引与回表优化

在高并发查询中,回表操作是性能杀手。覆盖索引可以避免回表,直接从索引中获取所需数据。

覆盖索引示例:

-- 假设需要查询用户ID和用户名
SELECT id, username FROM users WHERE username = 'john_doe';

-- 如果只有idx_username索引,需要回表获取id
-- 优化方案:创建复合索引
ALTER TABLE users ADD INDEX idx_username_id (username, id);

-- 或者查询时只查询索引包含的列
SELECT username FROM users WHERE username = 'john_doe'; -- 使用idx_username即可

1.3 索引失效的常见场景与避免方法

在高并发环境下,索引失效会导致全表扫描,瞬间拖垮数据库。

常见索引失效场景:

  1. 函数操作WHERE YEAR(created_at) = 2024
  2. 隐式类型转换WHERE phone = 13800138000(phone是varchar)
  3. 前缀模糊查询WHERE username LIKE '%john'
  4. OR条件WHERE status = 1 OR age > 18(部分版本优化)
  5. 范围查询后的索引失效WHERE age > 18 AND score > 90

优化方案:

-- 错误示例
SELECT * FROM users WHERE YEAR(created_at) = 2024;

-- 正确示例:使用范围查询
SELECT * FROM users WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01';

-- 错误示例:隐式类型转换
SELECT * FROM orders WHERE order_no = 123456; -- order_no是varchar

-- 正确示例
SELECT * FROM orders WHERE order_no = '123456';

1.4 高并发下的索引维护成本

索引不是越多越好。每个索引都会增加写操作的开销(INSERT/UPDATE/DELETE需要维护索引)。

索引维护成本分析:

  • 单个索引的写操作成本约为原数据的10-15%
  • 5个索引的表,写操作成本增加50-75%
  • 高并发写场景下,过多索引会导致严重的性能问题

优化建议:

-- 监控索引使用频率
SELECT * FROM sys.schema_unused_indexes;

-- 删除未使用的索引
DROP INDEX idx_unused ON users;

-- 对于低频查询,考虑使用联合索引替代多个单列索引
-- 原:INDEX idx_a, INDEX idx_b
-- 优化:INDEX idx_a_b (a, b)

二、查询优化:减少数据库负载

2.1 避免SELECT * 和大字段查询

在高并发场景下,网络传输和内存消耗也是重要考虑因素。

优化示例:

-- 错误:查询大字段
SELECT * FROM articles WHERE id = 123; -- 包含content大文本字段

-- 正确:按需查询
SELECT id, title, summary, author FROM articles WHERE id = 123;

-- 错误:查询不需要的列
SELECT * FROM users WHERE id = 1;

-- 正确:只查询需要的列
SELECT id, username, email FROM users WHERE id = 1;

2.2 分页查询优化

传统LIMIT分页在深度分页时性能极差。

问题示例:

-- 当page=1000时,性能极差
SELECT * FROM orders ORDER BY id LIMIT 1000000, 20;

优化方案1:延迟关联

-- 使用子查询先定位主键,再关联获取数据
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders ORDER BY id LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

优化方案2:位置记录法

-- 记录上一页最后一条记录的id
-- 第一页
SELECT * FROM orders WHERE id > 0 ORDER BY id LIMIT 20;

-- 第二页(假设上一页最后id=100)
SELECT * FROM orders WHERE id > 100 ORDER BY id LIMIT 20;

2.3 JOIN优化策略

JOIN是高并发场景下的性能瓶颈,需要谨慎使用。

JOIN优化原则:

  1. 小表驱动大表
  2. ON字段必须有索引
  3. 避免笛卡尔积

优化示例:

-- 错误:大表驱动小表
SELECT u.*, o.*
FROM large_table l
JOIN small_table s ON l.id = s.large_id
WHERE l.status = 1;

-- 正确:小表驱动大表
SELECT s.*, l.*
FROM small_table s
JOIN large_table l ON s.large_id = l.id
WHERE l.status = 1;

-- 确保JOIN字段有索引
ALTER TABLE small_table ADD INDEX idx_large_id (large_id);
ALTER TABLE large_table ADD INDEX idx_id (id);

2.4 批量操作与减少交互

高并发下,减少数据库交互次数至关重要。

批量插入优化:

-- 错误:逐条插入
INSERT INTO logs (user_id, action, created_at) VALUES (1, 'login', NOW());
INSERT INTO logs (user_id, action, created_at) VALUES (2, 'login', NOW());
INSERT INTO logs (user_id, action, created_at) VALUES (3, 'login', NOW());

-- 正确:批量插入
INSERT INTO logs (user_id, action, created_at) VALUES
(1, 'login', NOW()),
(2, 'login', NOW()),
(3, 'login', NOW());

-- 批量更新
UPDATE users SET status = 2 WHERE id IN (1, 2, 3, 4, 5);

三、事务与锁机制:避免死锁的核心

3.1 MySQL锁机制详解

理解MySQL的锁机制是避免死锁的关键。

锁的类型:

  • 共享锁(S锁):读锁,多个事务可以同时持有
  • 排他锁(X锁):写锁,只有一个事务可以持有
  • 意向锁:表级锁,表示事务将在某行加S锁或X锁

锁的粒度:

  • 行锁:锁定单行数据
  • 间隙锁:锁定索引范围
  • 临键锁:行锁+间隙锁(InnoDB默认)

3.2 死锁的产生与检测

死锁的四个必要条件:

  1. 互斥条件:资源不能被共享
  2. 请求与保持条件:已获得资源的进程可以请求新资源
  3. 不剥夺条件:已分配的资源不能被强制剥夺
  4. 循环等待条件:进程之间形成环形等待链

死锁示例:

-- 事务A
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
-- 等待事务B释放id=2的锁

-- 事务B
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;
-- 等待事务A释放id=1的锁

-- 结果:死锁!

3.3 避免死锁的最佳实践

1. 固定加锁顺序

-- 错误:不同事务加锁顺序不一致
-- 事务A:UPDATE table SET ... WHERE id IN (1, 2, 3)
-- 事务B:UPDATE table SET ... WHERE id IN (3, 2, 1)

-- 正确:统一按主键排序
-- 所有事务都按id升序处理
UPDATE accounts SET balance = balance - 100 WHERE id IN (1, 2, 3) ORDER BY id;

2. 使用SELECT FOR UPDATE显式加锁

-- 事务A
START TRANSACTION;
-- 先查询并锁定记录
SELECT * FROM inventory WHERE product_id = 100 FOR UPDATE;
-- 执行更新
UPDATE inventory SET stock = stock - 1 WHERE product_id = 100;
COMMIT;

-- 事务B
START TRANSACTION;
-- 同样先锁定记录(会等待事务A释放)
SELECT * FROM inventory WHERE product_id = 100 FOR UPDATE;
UPDATE inventory SET stock = stock - 1 WHERE product_id = 100;
COMMIT;

3. 优化事务粒度

-- 错误:大事务
START TRANSACTION;
UPDATE users SET status = 2 WHERE id IN (SELECT id FROM users WHERE age > 18);
UPDATE orders SET status = 'processed' WHERE user_id IN (SELECT id FROM users WHERE age > 18);
-- 可能锁住大量行,增加死锁概率

-- 正确:小事务
-- 分批处理
START TRANSACTION;
UPDATE users SET status = 2 WHERE id BETWEEN 1 AND 1000;
COMMIT;

START TRANSACTION;
UPDATE orders SET status = 'processed' WHERE user_id BETWEEN 1 AND 1000;
COMMIT;

4. 设置合理的超时时间

-- 设置锁等待超时(默认50秒)
SET SESSION innodb_lock_wait_timeout = 10;

-- 设置事务超时
SET SESSION max_execution_time = 5000;

3.4 死锁监控与诊断

查看死锁日志:

-- 开启死锁监控
SET GLOBAL innodb_print_all_deadlocks = ON;

-- 查看最近死锁信息
SHOW ENGINE INNODB STATUS\G

死锁信息分析:

LATEST DETECTED DEADLOCK
2024-01-15 10:30:45 0x7f0a1b2c3d4e
*** (1) TRANSACTION:
TRANSACTION 12345, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s)
MySQL thread id 12345, OS thread handle 0x7f0a1b2c3d4e, query id 100
UPDATE accounts SET balance = balance - 100 WHERE id = 1

*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12345 lock_mode X locks rec but not gap waiting

*** (2) TRANSACTION:
TRANSACTION 12346, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
3 lock struct(s), heap size 1136, 2 row lock(s)
MySQL thread id 12346, OS thread handle 0x7f0a1b2c3d4f, query id 101
UPDATE accounts SET balance = balance - 100 WHERE id = 2

*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12346 lock_mode X locks rec but not gap

*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 5 n bits 72 index PRIMARY of table `test`.`accounts` trx id 12346 lock_mode X locks rec but not gap waiting

*** WE ROLL BACK TRANSACTION (2)

四、架构优化:从单库到分布式

4.1 读写分离架构

读写分离是提升并发能力的基础架构优化。

架构图:

应用层
  ↓
负载均衡
  ↓
主库(写) ←→ 从库(读)
  ↓
数据同步(binlog)

实现方案:

// Spring Boot配置多数据源
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

// 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

// 使用示例
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    // 写操作使用主库
    @Transactional
    public User createUser(User user) {
        DataSourceContextHolder.setDataSource("master");
        return userRepository.save(user);
    }
    
    // 读操作使用从库
    public User getUser(Long id) {
        DataSourceContextHolder.setDataSource("slave");
        return userRepository.findById(id).orElse(null);
    }
}

4.2 分库分表策略

当单库无法支撑时,需要分库分表。

分片策略:

  1. 垂直分库:按业务模块拆分
  2. 水平分表:按数据特征拆分

分片键选择原则:

  • 数据分布均匀
  • 查询命中率高
  • 避免跨片查询

分库分表示例:

-- 用户表按user_id分16个表
-- user_0 到 user_15

-- 分片函数(Java实现)
public class ShardingUtil {
    private static final int SHARD_COUNT = 16;
    
    public static String getTableName(Long userId) {
        int shardIndex = (int) (userId % SHARD_COUNT);
        return "user_" + shardIndex;
    }
    
    public static String getDatabaseName(Long userId) {
        int dbIndex = (int) (userId % 8); // 8个数据库
        return "db_" + dbIndex;
    }
}

// 使用示例
public class UserRepository {
    
    public void saveUser(User user) {
        String dbName = ShardingUtil.getDatabaseName(user.getId());
        String tableName = ShardingUtil.getTableName(user.getId());
        
        // 动态切换数据源和表名
        DataSourceContextHolder.setDataSource(dbName);
        String sql = String.format("INSERT INTO %s (id, username, email) VALUES (?, ?, ?)", 
                                   tableName);
        jdbcTemplate.update(sql, user.getId(), user.getUsername(), user.getEmail());
    }
}

4.3 分布式事务解决方案

分库分表后,事务成为挑战。

解决方案:

1. 两阶段提交(2PC)

// 伪代码示例
@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
    try {
        // 阶段1:准备
        boolean fromSuccess = prepareDeduct(fromId, amount);
        boolean toSuccess = prepareAdd(toId, amount);
        
        if (fromSuccess && toSuccess) {
            // 阶段2:提交
            commitDeduct(fromId);
            commitAdd(toId);
        } else {
            // 回滚
            rollbackDeduct(fromId);
            rollbackAdd(toId);
        }
    } catch (Exception e) {
        // 超时处理
        handleTimeout();
    }
}

2. TCC模式(Try-Confirm-Cancel)

// Try阶段:资源检查和预留
public boolean tryDeduct(Long userId, BigDecimal amount) {
    String sql = "UPDATE account SET frozen = frozen + ? WHERE id = ? AND balance >= ?";
    return jdbcTemplate.update(sql, amount, userId, amount) > 0;
}

// Confirm阶段:确认扣款
public boolean confirmDeduct(Long userId, BigDecimal amount) {
    String sql = "UPDATE account SET balance = balance - ?, frozen = frozen - ? WHERE id = ?";
    return jdbcTemplate.update(sql, amount, amount, userId) > 0;
}

// Cancel阶段:取消预留
public boolean cancelDeduct(Long userId, BigDecimal amount) {
    String sql = "UPDATE account SET frozen = frozen - ? WHERE id = ?";
    return jdbcTemplate.update(sql, amount, userId) > 0;
}

3. 消息队列最终一致性

// 发送方
@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
    // 1. 扣减余额(本地事务)
    deductBalance(fromId, amount);
    
    // 2. 发送消息(同事务)
    sendTransferMessage(fromId, toId, amount);
}

// 接收方
@RocketMQMessageListener
public void handleTransferMessage(TransferMessage message) {
    try {
        // 幂等性检查
        if (isProcessed(message.getId())) {
            return;
        }
        
        // 增加余额
        addBalance(message.getToId(), message.getAmount());
        
        // 记录处理
        markProcessed(message.getId());
    } catch (Exception e) {
        // 重试或死信队列
        throw new RuntimeException(e);
    }
}

4.4 缓存策略:减轻数据库压力

缓存设计原则:

  1. 缓存穿透:查询不存在的数据
  2. 缓存击穿:热点key过期
  3. 缓存雪崩:大量key同时过期

解决方案:

// 缓存穿透解决方案:布隆过滤器
public class BloomFilterUtil {
    private static final int EXPECTED_INSERTIONS = 1000000;
    private static final double FALSE_POSITIVE_RATE = 0.01;
    
    private BloomFilter<Long> bloomFilter;
    
    public BloomFilterUtil() {
        this.bloomFilter = BloomFilter.create(
            Funnels.longFunnel(), 
            EXPECTED_INSERTIONS, 
            FALSE_POSITIVE_RATE
        );
    }
    
    public boolean mightContain(Long id) {
        return bloomFilter.mightContain(id);
    }
    
    public void put(Long id) {
        bloomFilter.put(id);
    }
}

// 缓存击穿解决方案:互斥锁
public User getUserWithCache(Long id) {
    String key = "user:" + id;
    
    // 1. 先查缓存
    User user = redisTemplate.opsForValue().get(key);
    if (user != null) {
        return user;
    }
    
    // 2. 获取分布式锁
    String lockKey = "lock:" + key;
    Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
    
    if (Boolean.TRUE.equals(locked)) {
        try {
            // 双重检查
            user = redisTemplate.opsForValue().get(key);
            if (user != null) {
                return user;
            }
            
            // 查询数据库
            user = userRepository.findById(id).orElse(null);
            
            // 写入缓存
            if (user != null) {
                redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
            } else {
                // 缓存空值,防止穿透
                redisTemplate.opsForValue().set(key, new User(), 5, TimeUnit.MINUTES);
            }
            return user;
        } finally {
            redisTemplate.delete(lockKey);
        }
    } else {
        // 等待并重试
        try {
            Thread.sleep(50);
            return getUserWithCache(id);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
}

// 缓存雪崩解决方案:随机过期时间
public void setWithRandomExpire(String key, Object value, int baseExpireSeconds) {
    // 随机时间:基础时间 + 随机偏移(0-300秒)
    int randomExpire = baseExpireSeconds + new Random().nextInt(300);
    redisTemplate.opsForValue().set(key, value, randomExpire, TimeUnit.SECONDS);
}

五、高并发配置调优

5.1 MySQL核心参数配置

连接相关参数:

[mysqld]
# 最大连接数
max_connections = 2000

# 每个连接的最大请求数
max_connections_per_user = 1000

# 连接超时时间
connect_timeout = 10

# 交互式超时时间
interactive_timeout = 300

# 非交互式超时时间
wait_timeout = 300

InnoDB引擎参数:

# 缓冲池大小(建议物理内存的50-70%)
innodb_buffer_pool_size = 16G

# 缓冲池实例数(建议与CPU核数相同)
innodb_buffer_pool_instances = 8

# 日志文件大小
innodb_log_file_size = 2G

# 刷新日志策略
innodb_flush_log_at_trx_commit = 1  # 1=每次提交刷盘,0=每秒刷盘

# 最大并发线程数
innodb_thread_concurrency = 32

# 启用死锁检测
innodb_deadlock_detect = ON

# 锁等待超时
innodb_lock_wait_timeout = 50

5.2 操作系统参数调优

文件描述符限制:

# 查看当前限制
ulimit -n

# 临时修改
ulimit -n 65535

# 永久修改(/etc/security/limits.conf)
* soft nofile 65535
* hard nofile 65535

TCP参数优化:

# 增加端口范围
sysctl -w net.ipv4.ip_local_port_range="1024 65535"

# 增大TCP队列
sysctl -w net.core.somaxconn=4096

# 开启TIME_WAIT重用
sysctl -w net.ipv4.tcp_tw_reuse=1

# 减少TIME_WAIT超时
sysctl -w net.ipv4.tcp_fin_timeout=30

5.3 监控与告警

关键监控指标:

  1. QPS/TPS:每秒查询/事务数
  2. 连接数:当前连接数 vs 最大连接数
  3. 慢查询:执行时间超过阈值的查询
  4. 锁等待:锁等待时间和数量
  5. 缓存命中率:InnoDB缓冲池命中率

监控SQL示例:

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 查看InnoDB缓冲池命中率
SELECT 
    (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM performance_schema.global_status 
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';

-- 查看锁等待
SELECT * FROM performance_schema.data_lock_waits;

-- 查看当前运行的事务
SELECT * FROM information_schema.INNODB_TRX;

六、实战案例:秒杀系统设计

6.1 秒杀业务特点

  • 瞬时高并发:百万级QPS
  • 库存有限:超卖问题严重
  • 逻辑简单:减库存+下单

6.2 架构设计

分层架构:

客户端 → CDN → Nginx → 应用服务 → 缓存 → 消息队列 → 数据库

核心代码实现:

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 秒杀流程
     */
    public SeckillResult seckill(Long userId, Long productId) {
        // 1. 参数校验
        if (userId == null || productId == null) {
            return SeckillResult.error("参数错误");
        }
        
        // 2. 预扣减库存(Redis)
        String stockKey = STOCK_KEY + productId;
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        
        if (stock == null || stock < 0) {
            redisTemplate.opsForValue().increment(stockKey); // 回滚
            return SeckillResult.error("库存不足");
        }
        
        // 3. 检查是否已购买(防重复)
        String orderKey = ORDER_KEY + productId + ":" + userId;
        Boolean existed = redisTemplate.hasKey(orderKey);
        if (Boolean.TRUE.equals(existed)) {
            return SeckillResult.error("您已参与过秒杀");
        }
        
        // 4. 发送消息到MQ(异步下单)
        SeckillMessage message = new SeckillMessage();
        message.setUserId(userId);
        message.setProductId(productId);
        message.setTimestamp(System.currentTimeMillis());
        
        rocketMQTemplate.sendOneWay("seckill-topic", message);
        
        // 5. 标记已参与
        redisTemplate.opsForValue().set(orderKey, 1, 30, TimeUnit.MINUTES);
        
        return SeckillResult.success("秒杀成功,正在处理订单");
    }
    
    /**
     * MQ消费端:真正创建订单
     */
    @RocketMQMessageListener(topic = "seckill-topic", consumerGroup = "seckill-group")
    public void createOrder(SeckillMessage message) {
        try {
            // 1. 幂等性检查
            if (orderRepository.existsByUserIdAndProductId(message.getUserId(), message.getProductId())) {
                return;
            }
            
            // 2. 创建订单
            Order order = new Order();
            order.setUserId(message.getUserId());
            order.setProductId(message.getProductId());
            order.setStatus(OrderStatus.PENDING);
            order.setCreateTime(new Date());
            
            orderRepository.save(order);
            
            // 3. 记录日志
            log.info("秒杀订单创建成功: {}", order.getId());
            
        } catch (Exception e) {
            log.error("创建订单失败", e);
            // 失败补偿:恢复库存
            String stockKey = STOCK_KEY + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey);
        }
    }
}

6.3 数据库表设计

-- 秒杀商品表
CREATE TABLE seckill_products (
    id BIGINT PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    original_price DECIMAL(10,2) NOT NULL,
    seckill_price DECIMAL(10,2) NOT NULL,
    stock INT NOT NULL,
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL,
    version INT DEFAULT 0, -- 乐观锁
    INDEX idx_time (start_time, end_time)
) ENGINE=InnoDB;

-- 秒杀订单表(分表)
CREATE TABLE seckill_orders_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    status TINYINT NOT NULL,
    create_time DATETIME NOT NULL,
    INDEX idx_user (user_id),
    INDEX idx_product (product_id)
) ENGINE=InnoDB;

-- 库存流水表(用于对账)
CREATE TABLE stock_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    product_id BIGINT NOT NULL,
    change_amount INT NOT NULL,
    type TINYINT NOT NULL, -- 1:扣减, 2:回滚
    ref_id VARCHAR(100), -- 关联业务ID
    create_time DATETIME NOT NULL,
    INDEX idx_product (product_id)
) ENGINE=InnoDB;

6.4 防刷与限流

限流实现:

@Component
public class RateLimiter {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 令牌桶算法
     */
    public boolean tryAcquire(String key, int permits, int maxPermits, int refillRate) {
        String luaScript = 
            "local current = redis.call('GET', KEYS[1]) " +
            "if current == false then " +
            "  current = tonumber(ARGV[1]) " +
            "else " +
            "  current = tonumber(current) " +
            "end " +
            "local capacity = tonumber(ARGV[1]) " +
            "local rate = tonumber(ARGV[2]) " +
            "local now = tonumber(ARGV[3]) " +
            "local lastRefill = tonumber(redis.call('GET', KEYS[1] .. ':last') or now) " +
            "local elapsed = now - lastRefill " +
            "local refill = math.floor(elapsed * rate / 1000) " +
            "current = math.min(capacity, current + refill) " +
            "if current >= tonumber(ARGV[4]) then " +
            "  current = current - tonumber(ARGV[4]) " +
            "  redis.call('SET', KEYS[1], current) " +
            "  redis.call('SET', KEYS[1] .. ':last', now) " +
            "  redis.call('EXPIRE', KEYS[1], 60) " +
            "  return 1 " +
            "else " +
            "  return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList(key),
            maxPermits, refillRate, System.currentTimeMillis(), permits
        );
        
        return result != null && result == 1;
    }
}

// 使用示例
@RestController
public class SeckillController {
    
    @Autowired
    private RateLimiter rateLimiter;
    
    @Autowired
    private SeckillService seckillService;
    
    @PostMapping("/seckill")
    public SeckillResult seckill(@RequestBody SeckillRequest request) {
        // 限流:每个用户每秒最多3次请求
        String rateKey = "rate_limit:user:" + request.getUserId();
        if (!rateLimiter.tryAcquire(rateKey, 1, 3, 3)) {
            return SeckillResult.error("请求过于频繁,请稍后再试");
        }
        
        // 业务处理
        return seckillService.seckill(request.getUserId(), request.getProductId());
    }
}

七、总结与最佳实践清单

7.1 索引优化检查清单

  • [ ] 所有查询都使用索引
  • [ ] 避免索引失效的写法
  • [ ] 定期清理无用索引
  • [ ] 复合索引遵循最左前缀原则
  • [ ] 高频查询使用覆盖索引

7.2 查询优化检查清单

  • [ ] 禁止SELECT *
  • [ ] 分页使用延迟关联或位置记录
  • [ ] 大表JOIN谨慎使用
  • [ ] 批量操作替代循环单条
  • [ ] 合理使用EXISTS和IN

7.3 事务与锁检查清单

  • [ ] 事务粒度最小化
  • [ ] 固定加锁顺序
  • [ ] 使用SELECT FOR UPDATE
  • [ ] 设置合理超时时间
  • [ ] 监控死锁日志

7.4 架构优化检查清单

  • [ ] 实现读写分离
  • [ ] 合理使用缓存
  • [ ] 消息队列解耦
  • [ ] 分库分表规划
  • [ ] 监控告警体系

7.5 配置调优检查清单

  • [ ] 缓冲池大小合理
  • [ ] 连接数配置恰当
  • [ ] 日志文件大小合适
  • [ ] 操作系统参数优化
  • [ ] 监控指标全覆盖

结语

MySQL高并发优化是一个系统工程,需要从索引、查询、事务、架构等多个维度综合考虑。没有银弹,只有最适合业务场景的方案。建议从监控入手,找到性能瓶颈,然后针对性优化。记住:先测量,再优化

在实际生产环境中,还需要结合业务特点进行定制化优化。希望本文能为您的数据库优化之路提供有价值的参考。