引言:理解高并发挑战

在现代互联网应用中,高并发访问是常态。电商平台的秒杀活动、社交媒体的热点推送、金融系统的交易处理,都可能瞬间产生海量数据库请求。MySQL作为最流行的关系型数据库,如何在高并发场景下保持高性能和稳定性,是每个后端开发者和DBA必须掌握的核心技能。

高并发带来的主要挑战包括:

  • CPU资源耗尽:大量查询导致CPU使用率飙升
  • I/O瓶颈:磁盘读写成为系统瓶颈
  • 锁竞争:行锁、表锁导致事务等待
  • 连接数耗尽:超出max_connections限制
  • 缓存失效:缓存穿透、雪崩等问题

本文将从索引优化读写分离缓存机制三个层面,系统性地讲解MySQL高并发处理策略,并提供实战代码示例。


第一章:索引优化——高并发的基石

1.1 索引的本质与工作原理

索引是数据库的”目录”,它通过B+树结构将数据有序存储,将随机I/O转换为顺序I/O,大幅提升查询速度。

B+树结构详解

-- 示例:创建用户表
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    age TINYINT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username),
    INDEX idx_age_email (age, email)
);

B+树特性

  • 所有数据存储在叶子节点
  • 内部节点只存键值
  • 叶子节点通过双向链表连接
  • 查询复杂度稳定为O(log n)

1.2 索引优化实战策略

1.2.1 最左前缀原则

对于复合索引(a, b, c),以下查询能命中索引:

-- ✅ 命中索引
SELECT * FROM table WHERE a = 1;
SELECT * FROM table WHERE a = 1 AND b = 2;
SELECT * FROM table WHERE a = 1 AND b = 2 AND c = 3;

-- ❌ 无法命中索引
SELECT * FROM table WHERE b = 2;
SELECT * FROM table WHERE c = 3;
SELECT * FROM table WHERE b = 2 AND c = 3;

1.2.2 索引覆盖避免回表

-- 假设users表有复合索引 idx_age_email(age, email)

-- ❌ 需要回表(根据索引找到主键,再查整行)
SELECT * FROM users WHERE age = 25;

-- ✅ 索引覆盖(索引包含所有查询字段)
SELECT id, age, email FROM users WHERE age = 25;

-- ✅ 使用覆盖索引优化分页
SELECT id, email FROM users 
WHERE age = 25 
ORDER BY id 
LIMIT 10000, 20;

1.2.3 索引下推(ICP)

MySQL 5.6+ 支持索引下推,在索引遍历阶段就过滤数据:

-- 表结构:idx_age_name(age, name)
-- 查询:WHERE age > 25 AND name LIKE 'A%'

-- 旧版本:先通过索引找到age>25的记录,回表后再过滤name
-- 新版本:直接在索引中过滤age和name条件,减少回表次数
EXPLAIN SELECT * FROM users 
WHERE age > 25 AND name LIKE 'A%';
-- 查看Extra字段是否包含"Using index condition"

1.3 索引设计原则

1.3.1 区分度原则

-- 计算列的区分度(唯一值/总行数)
SELECT 
    COUNT(DISTINCT status) / COUNT(*) AS selectivity,
    COUNT(DISTINCT user_id) / COUNT(*) AS user_selectivity
FROM orders;

-- 结果:
-- selectivity > 0.3:适合建索引
-- selectivity < 0.01:不适合建索引

1.3.2 避免冗余索引

-- ❌ 冗余索引
ALTER TABLE users ADD INDEX idx_username (username);
ALTER TABLE users ADD INDEX idx_username_age (username, age); -- idx_username是冗余的

-- ✅ 删除冗余
DROP INDEX idx_username ON users;

1.3.3 索引维护

-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    rows_read,
    rows_selected
FROM sys.schema_index_statistics 
WHERE table_schema = 'your_database';

-- 未使用的索引(可能需要删除)
SELECT * FROM sys.schema_unused_indexes;

-- 重建索引(解决索引碎片)
ALTER TABLE users ENGINE=InnoDB; -- 在线DDL
-- 或
OPTIMIZE TABLE users; -- 锁表,慎用

1.4 索引失效的常见场景

1.4.1 函数操作导致失效

-- ❌ 索引失效
SELECT * FROM users WHERE DATE(created_at) = '2024-01-01';

-- ✅ 改写为范围查询
SELECT * FROM users 
WHERE created_at >= '2024-01-01 00:00:00' 
  AND created_at < '2024-01-02 00:00:00';

1.4.2 隐式类型转换

-- 假设phone字段是VARCHAR类型,但查询时用数字
-- ❌ 索引失效(MySQL会将字段转为数字)
SELECT * FROM users WHERE phone = 13800138000;

-- ✅ 使用正确类型
SELECT * FROM users WHERE phone = '13800138000';

1.4.3 OR条件导致失效

-- ❌ 索引失效(除非所有OR条件都有索引)
SELECT * FROM users WHERE age = 25 OR age = 30;

-- ✅ 改写为IN
SELECT * FROM users WHERE age IN (25, 30);

-- ✅ 或使用UNION ALL
SELECT * FROM users WHERE age = 25
UNION ALL
SELECT * FROM users WHERE age = 30;

第二章:读写分离架构设计

2.1 读写分离原理

读写分离的核心思想:写操作走主库,读操作走从库,通过MySQL主从复制实现数据同步。

主从复制架构

        应用层
           ↓
    读写分离中间件
    /      |      \
主库(M)  从库(S)  从库(S)
  ↓        ↓        ↓
binlog → IO线程 → SQL线程 → relay log

2.2 MySQL主从复制配置

2.2.1 主库配置(Master)

# my.cnf 或 my.ini
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW  # 推荐ROW格式
expire_logs_days = 7
binlog_cache_size = 4M
max_binlog_cache_size = 1G
-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'StrongPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 查看主库状态
SHOW MASTER STATUS;
-- 记录 File 和 Position 值

2.2.2 从库配置(Slave)

# my.cnf
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
log_bin = mysql-bin
read_only = 1  # 防止误写从库
-- 配置从库连接主库
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='StrongPassword123!',
MASTER_LOG_FILE='mysql-bin.000001',  -- 来自主库SHOW MASTER STATUS
MASTER_LOG_POS=1234;                -- 来自主库SHOW MASTER STATUS

-- 启动复制
START SLAVE;

-- 查看复制状态
SHOW SLAVE STATUS\G
-- 关键指标:
-- Slave_IO_Running: Yes
-- Slave_SQL_Running: Yes
-- Seconds_Behind_Master: 0

2.3 应用层读写分离实现

2.3.1 Spring Boot动态数据源配置

@Configuration
public class DataSourceConfig {
    
    // 主数据源(写)
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // 从数据源(读)
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // 动态数据源
    @Bean
    public DataSource dynamicDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DataSourceType.MASTER, masterDataSource());
        targetDataSources.put(DataSourceType.SLAVE, slaveDataSource());
        
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        dynamicDataSource.setTargetDataSources(targetDataSources);
        dynamicDataSource.setDefaultTargetDataSource(masterDataSource());
        return dynamicDataSource;
    }
}

2.3.2 自定义注解实现路由

// 定义数据源类型枚举
public enum DataSourceType {
    MASTER, SLAVE
}

// 自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
    DataSourceType value() default DataSourceType.SLAVE;
}

// AOP切面
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("@annotation(dataSource)")
    public void beforeSwitchDataSource(JoinPoint point, DataSource dataSource) {
        DataSourceContextHolder.setDataSource(dataSource.value());
    }
    
    @After("@annotation(dataSource)")
    public void afterSwitchDataSource(JoinPoint point, DataSource dataSource) {
        DataSourceContextHolder.clearDataSource();
    }
}

// ThreadLocal上下文
public class DataSourceContextHolder {
    private static final ThreadLocal<DataSourceType> CONTEXT = new ThreadLocal<>();
    
    public static void setDataSource(DataSourceType type) {
        CONTEXT.set(type);
    }
    
    public static DataSourceType getDataSource() {
        return CONTEXT.get() == null ? DataSourceType.SLAVE : CONTEXT.get();
    }
    
    public static void clearDataSource() {
        CONTEXT.remove();
    }
}

2.3.3 业务层使用示例

@Service
public class UserService {
    
    @Autowired
    private UserMapper userMapper;
    
    // 写操作走主库
    @DataSource(DataSourceType.MASTER)
    public void createUser(User user) {
        userMapper.insert(user);
    }
    
    // 读操作走从库
    @DataSource(DataSourceType.SLAVE)
    public User getUserById(Long id) {
        return userMapper.selectById(id);
    }
    
    // 复杂查询走从库
    @DataSource(DataSourceType.SLAVE)
    public List<User> searchUsers(String keyword) {
        return userMapper.search(keyword);
    }
}

2.4 读写分离的高级策略

2.4.1 主从延迟处理

// 方案1:强制走主库(关键业务)
@DataSource(DataSourceType.MASTER)
public Order getOrderAfterCreate(Long orderId) {
    // 刚创建的订单,强制从主库读取
    return orderMapper.selectById(orderId);
}

// 方案2:半同步复制
-- 在主库配置
plugin-load = "rpl_semi_sync_master=semisync_master.so"
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 500  -- 500ms超时

-- 在从库配置
plugin-load = "rpl_semi_sync_slave=semisync_slave.so"
rpl_semi_sync_slave_enabled = 1

2.4.2 多从库负载均衡

// 轮询策略
public class RoundRobinSlaveDataSource implements DataSourceRouter {
    private final List<DataSource> slaves;
    private final AtomicInteger counter = new AtomicInteger(0);
    
    @Override
    public DataSource getDataSource() {
        int index = counter.getAndIncrement() % slaves.size();
        return slaves.get(index);
    }
}

// 权重策略
public class WeightedSlaveDataSource implements DataSourceRouter {
    private final Map<DataSource, Integer> weightMap;
    
    @Override
    public DataSource getDataSource() {
        int totalWeight = weightMap.values().stream().mapToInt(Integer::intValue).sum();
        int random = ThreadLocalRandom.current().nextInt(totalWeight);
        
        int current = 0;
        for (Map.Entry<DataSource, Integer> entry : weightMap.entrySet()) {
            current += entry.getValue();
            if (random < current) {
                return entry.getKey();
            }
        }
        return null;
    }
}

2.5 读写分离中间件

2.5.1 ShardingSphere-JDBC

# application.yml
spring:
  shardingsphere:
    datasource:
      names: master, slave0, slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master:3306/db
        username: root
        password: password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave0:3306/db
        username: root
        password: password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1:3306/db
        username: root
        password: password
    
    rules:
      readwrite-splitting:
        data-sources:
          ds0:
            type: Static
            props:
              write-data-source-name: master
              read-data-source-names: slave0,slave1
              load-balancer-name: round_robin
        load-balancers:
          round_robin:
            type: ROUND_ROBIN

第三章:缓存机制——性能倍增器

3.1 缓存架构设计

3.1.1 多级缓存架构

        浏览器缓存
           ↓
    CDN缓存(静态资源)
           ↓
    应用层缓存(Caffeine/Guava)
           ↓
    分布式缓存(Redis)
           ↓
    数据库缓存(Buffer Pool)
           ↓
    磁盘

3.2 Redis缓存实战

3.2.1 缓存数据模型设计

// 用户信息缓存
@Component
public class UserCache {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String USER_KEY_PREFIX = "user:";
    private static final long USER_TTL = 3600; // 1小时
    
    // 缓存穿透保护(布隆过滤器)
    @Autowired
    private BloomFilter<Long> userBloomFilter;
    
    public User getUser(Long userId) {
        String key = USER_KEY_PREFIX + userId;
        
        // 1. 先查布隆过滤器
        if (!userBloomFilter.mightContain(userId)) {
            return null; // 肯定不存在
        }
        
        // 2. 查Redis缓存
        User user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        }
        
        // 3. 缓存未命中,查数据库
        user = userMapper.selectById(userId);
        
        if (user != null) {
            // 4. 写入缓存
            redisTemplate.opsForValue().set(key, user, USER_TTL, TimeUnit.SECONDS);
        } else {
            // 5. 缓存空值(防止缓存穿透)
            redisTemplate.opsForValue().set(key, new User(), 60, TimeUnit.SECONDS);
        }
        
        return user;
    }
    
    public void deleteUser(Long userId) {
        String key = USER_KEY_PREFIX + userId;
        redisTemplate.delete(key);
        // 更新布隆过滤器(需要重建)
    }
}

3.2.2 缓存穿透、击穿、雪崩防护

@Component
public class CacheProtectionService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 1. 缓存穿透:查询不存在的数据
    public Object queryWithPenetrationProtection(String key, Callable<Object> dbQuery) {
        // 使用空值缓存
        Object value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            return value instanceof NullObject ? null : value;
        }
        
        synchronized (key.intern()) { // 分布式锁简化版
            value = redisTemplate.opsForValue().get(key);
            if (value != null) {
                return value instanceof NullObject ? null : value;
            }
            
            try {
                value = dbQuery.call();
                if (value == null) {
                    redisTemplate.opsForValue().set(key, new NullObject(), 60, TimeUnit.SECONDS);
                    return null;
                } else {
                    redisTemplate.opsForValue().set(key, value, 3600, TimeUnit.SECONDS);
                    return value;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    // 2. 缓存击穿:热点key过期瞬间大量请求
    public Object queryWithBreakdownProtection(String key, Callable<Object> dbQuery) {
        String lockKey = "lock:" + key;
        
        while (true) {
            // 尝试获取分布式锁(SET NX PX)
            Boolean locked = redisTemplate.opsForValue().setIfAbsent(
                lockKey, 
                "1", 
                10, 
                TimeUnit.SECONDS
            );
            
            if (Boolean.TRUE.equals(locked)) {
                try {
                    Object value = redisTemplate.opsForValue().get(key);
                    if (value == null) {
                        value = dbQuery.call();
                        redisTemplate.opsForValue().set(key, value, 3600, TimeUnit.SECONDS);
                    }
                    return value;
                } finally {
                    redisTemplate.delete(lockKey);
                }
            } else {
                // 短暂等待后重试
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        return null;
    }
    
    // 3. 缓存雪崩:大量key同时过期
    public void setWithAvalancheProtection(String key, Object value, long ttl) {
        // 添加随机值(基础TTL + 随机时间)
        long randomTtl = ttl + ThreadLocalRandom.current().nextInt(300); // 0-5分钟随机
        redisTemplate.opsForValue().set(key, value, randomTtl, TimeUnit.SECONDS);
    }
    
    // 空值标记类
    static class NullObject implements Serializable {}
}

3.2.3 Redis集群配置

# Redis集群配置(application.yml)
spring:
  redis:
    cluster:
      nodes:
        - 192.168.1.101:6379
        - 192.168.1.102:6379
        - 192.168.1.103:6379
        - 192.1168.1.104:6379
        - 192.168.1.105:6379
        - 192.168.1.106:6379
      max-redirects: 3
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: 1000ms
    timeout: 2000ms

3.3 应用层本地缓存

3.3.1 Caffeine缓存实现

@Configuration
public class LocalCacheConfig {
    
    @Bean
    public Cache<Long, User> userCache() {
        return Caffeine.newBuilder()
            .maximumSize(10_000) // 最大条目
            .expireAfterWrite(10, TimeUnit.MINUTES) // 写入后过期
            .expireAfterAccess(5, TimeUnit.MINUTES)  // 访问后过期
            .recordStats() // 开启统计
            .build();
    }
}

@Service
public class UserServiceWithLocalCache {
    
    @Autowired
    private Cache<Long, User> userCache;
    
    @Autowired
    private UserMapper userMapper;
    
    public User getUser(Long userId) {
        // 本地缓存查询
        User user = userCache.getIfPresent(userId);
        if (user != null) {
            return user;
        }
        
        // 本地缓存未命中,查Redis
        user = redisTemplate.opsForValue().get("user:" + userId);
        if (user != null) {
            userCache.put(userId, user); // 回填本地缓存
            return user;
        }
        
        // Redis未命中,查数据库
        user = userMapper.selectById(userId);
        if (user != null) {
            redisTemplate.opsForValue().set("user:" + userId, user, 3600, TimeUnit.SECONDS);
            userCache.put(userId, user);
        }
        
        return user;
    }
    
    // 缓存更新
    public void updateUser(User user) {
        userMapper.updateById(user);
        // 删除缓存(先删Redis,再删本地缓存)
        redisTemplate.delete("user:" + user.getId());
        userCache.invalidate(user.getId());
    }
}

3.4 缓存更新策略

3.4.1 Cache Aside模式

// 读操作
public Object read(String key) {
    // 1. 先查缓存
    Object value = cache.get(key);
    if (value != null) {
        return value;
    }
    
    // 2. 缓存未命中,查数据库
    value = db.query(key);
    
    // 3. 写入缓存
    if (value != null) {
        cache.put(key, value);
    }
    
    return value;
}

// 写操作(先更新数据库,再删除缓存)
public void write(String key, Object value) {
    // 1. 更新数据库
    db.update(key, value);
    
    // 2. 删除缓存(让下次读操作重新加载)
    cache.delete(key);
}

3.4.2 延迟双删策略(解决主从延迟)

public void updateUserWithDelayDelete(User user) {
    // 1. 删除缓存
    redisTemplate.delete("user:" + user.getId());
    
    // 2. 更新数据库
    userMapper.updateById(user);
    
    // 3. 延迟再次删除(防止主从延迟导致脏数据)
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    executor.schedule(() -> {
        redisTemplate.delete("user:" + user.getId());
    }, 500, TimeUnit.MILLISECONDS);
}

3.4.3 读写并发冲突处理

// 使用分布式锁保证缓存一致性
public void updateUserWithLock(User user) {
    String lockKey = "update:user:" + user.getId();
    
    try {
        // 获取分布式锁
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(
            lockKey, "1", 10, TimeUnit.SECONDS
        );
        
        if (Boolean.TRUE.equals(locked)) {
            // 1. 更新数据库
            userMapper.updateById(user);
            
            // 2. 删除缓存
            redisTemplate.delete("user:" + user.getId());
        }
    } finally {
        redisTemplate.delete(lockKey);
    }
}

3.5 缓存预热与监控

3.5.1 缓存预热

@Component
public class CacheWarmupService {
    
    @EventListener(ApplicationReadyEvent.class)
    public void warmup() {
        // 系统启动时预热热点数据
        List<Long> hotUserIds = Arrays.asList(1L, 2L, 3L, 4L, 5L);
        
        for (Long userId : hotUserIds) {
            User user = userMapper.selectById(userId);
            if (user != null) {
                redisTemplate.opsForValue().set(
                    "user:" + userId, 
                    user, 
                    3600, 
                    TimeUnit.SECONDS
                );
            }
        }
    }
}

3.5.2 缓存监控

@Component
public class CacheMonitor {
    
    @Autowired
    private Cache<Long, User> userCache;
    
    @Scheduled(fixedRate = 60000) // 每分钟打印统计
    public void logStats() {
        CacheStats stats = userCache.stats();
        log.info("本地缓存统计 - 命中率: {}, 命中次数: {}, 未命中次数: {}", 
            stats.hitRate(),
            stats.hitCount(),
            stats.missCount()
        );
        
        // Redis监控
        Long hitCount = redisTemplate.execute(
            (RedisCallback<Long>) connection -> 
                connection.dbStats().get("keyspace_hits")
        );
        log.info("Redis命中次数: {}", hitCount);
    }
}

3.6 缓存设计最佳实践

  1. 缓存粒度:缓存对象而非字段,减少网络开销
  2. 过期时间:根据业务特点设置,避免同时过期
  3. 热点识别:使用Redis的OBJECT FREQ命令识别热点key
  4. 多级缓存:本地缓存 + 分布式缓存组合
  5. 缓存降级:缓存失效时直接查数据库,保证可用性
  6. 监控告警:监控命中率、响应时间、内存使用率

第四章:综合实战案例——秒杀系统设计

4.1 架构设计

用户请求 → Nginx → 应用服务 → 缓存层 → 数据库层
                ↓
            消息队列(削峰)
                ↓
            异步写入数据库

4.2 核心代码实现

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private StockMapper stockMapper;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 秒杀流程
     */
    @Transactional
    public SeckillResult seckill(Long userId, Long goodsId) {
        String stockKey = STOCK_KEY + goodsId;
        
        // 1. 预减库存(Redis原子操作)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock < 0) {
            redisTemplate.opsForValue().increment(stockKey); // 回滚
            return SeckillResult.fail("库存不足");
        }
        
        // 2. 判断是否已秒杀(防止重复下单)
        String orderKey = ORDER_KEY + userId + ":" + goodsId;
        Boolean exists = redisTemplate.hasKey(orderKey);
        if (Boolean.TRUE.equals(exists)) {
            return SeckillResult.fail("请勿重复下单");
        }
        
        // 3. 发送消息到队列(异步下单)
        SeckillMessage message = new SeckillMessage(userId, goodsId);
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.key", message);
        
        // 4. 标记已下单
        redisTemplate.opsForValue().set(orderKey, "1", 30, TimeUnit.MINUTES);
        
        return SeckillResult.success("秒杀成功,正在处理订单");
    }
    
    /**
     * 异步处理订单(消费者)
     */
    @RabbitListener(queues = "seckill.queue")
    public void processSeckillMessage(SeckillMessage message) {
        try {
            // 1. 扣减数据库库存
            int affected = stockMapper.decreaseStock(message.getGoodsId());
            if (affected == 0) {
                log.error("数据库库存不足,goodsId: {}", message.getGoodsId());
                return;
            }
            
            // 2. 创建订单
            Order order = new Order();
            order.setUserId(message.getUserId());
            order.setGoodsId(message.getGoodsId());
            order.setStatus(1);
            orderMapper.insert(order);
            
            log.info("订单创建成功,orderId: {}", order.getId());
        } catch (Exception e) {
            log.error("处理秒杀消息失败", e);
            // 库存回滚
            redisTemplate.opsForValue().increment(STOCK_KEY + message.getGoodsId());
        }
    }
}

4.3 关键优化点

  1. 库存预减:Redis原子操作,避免数据库直接抗压
  2. 消息队列削峰:将瞬时流量平滑写入数据库
  3. 防重复下单:Redis标记已操作状态
  4. 库存回滚:异常时Redis库存补偿
  5. 缓存预热:启动时加载库存到Redis

第五章:监控与调优

5.1 MySQL性能监控

-- 1. 查看慢查询
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

-- 2. 查看当前连接
SHOW PROCESSLIST;

-- 3. 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G

-- 4. 查看索引使用情况
SELECT * FROM sys.schema_unused_indexes;

-- 5. 查看锁等待
SELECT * FROM sys.innodb_lock_waits;

5.2 慢查询分析工具

# 使用pt-query-digest分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 使用Percona Toolkit
pt-index-usage /var/log/mysql/slow.log --host=localhost

5.3 性能调优参数

# my.cnf 关键参数
[mysqld]
# 连接相关
max_connections = 1000
max_user_connections = 950
wait_timeout = 600

# InnoDB缓冲池(建议内存的50-70%)
innodb_buffer_pool_size = 8G
innodb_buffer_pool_instances = 8

# 日志相关
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M

# 刷新策略
innodb_flush_log_at_trx_commit = 2  # 性能优先用2,数据安全用1
sync_binlog = 1000                  # 性能优先用1000,安全用1

# 并发相关
innodb_thread_concurrency = 16
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 查询缓存(MySQL 8.0已移除)
# query_cache_type = 0
# query_cache_size = 0

5.4 性能测试

// 使用JMeter进行压测
// 配置线程组:100线程,循环1000次
// 监控指标:
// - TPS
// - 平均响应时间
// - 错误率
// - 数据库连接池使用率
// - CPU/Memory使用率

总结

MySQL高并发处理是一个系统工程,需要从多个层面综合优化:

  1. 索引优化:是基础,直接影响查询效率
  2. 读写分离:分担主库压力,提升读性能
  3. 缓存机制:减少数据库访问,倍增性能
  4. 架构设计:消息队列、分库分表等高级策略
  5. 监控调优:持续优化,形成闭环

记住:没有银弹,需要根据业务特点、数据规模、并发量等因素,选择合适的组合策略。建议从索引优化开始,逐步引入读写分离和缓存,最后考虑分库分表等复杂架构。


附录:常用命令速查

# MySQL状态查看
mysqladmin -u root -p extended-status
mysqladmin -u root -p processlist

# Redis监控
redis-cli --stat
redis-cli --bigkeys

# 系统监控
iostat -x 1
vmstat 1
top -H -p $(pgrep -f mysqld)

通过本文的系统学习和实践,相信你已经掌握了MySQL高并发处理的核心技能。在实际应用中,务必结合监控数据持续优化,才能构建真正高性能的数据库系统。