引言

在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交网络还是金融交易系统,都面临着海量用户同时访问数据库的挑战。MySQL作为最流行的开源关系型数据库,在高并发环境下容易出现性能瓶颈,如连接数耗尽、锁竞争、I/O瓶颈等问题。本文将从架构优化、数据库设计、缓存机制等多个维度,系统性地探讨MySQL高并发处理策略,并提供实战代码示例。

一、架构层面的优化策略

1.1 读写分离架构

读写分离是提升MySQL并发能力的基础架构优化。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库压力。

实现方式:

  • 主库(Master):处理所有写操作(INSERT/UPDATE/DELETE)
  • 从库(Slave):处理所有读操作(SELECT)
  • 中间件:使用ProxySQL、MyCat或ShardingSphere等中间件进行路由

代码示例(使用Spring Boot + MyBatis实现读写分离):

// 1. 配置多数据源
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

// 2. 自定义路由数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

// 3. 数据源上下文管理器
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return contextHolder.get() != null ? contextHolder.get() : "master";
    }
    
    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

// 4. AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.*(..))")
    public void before(JoinPoint joinPoint) {
        String methodName = joinPoint.getSignature().getName();
        if (methodName.startsWith("get") || methodName.startsWith("list") || 
            methodName.startsWith("query") || methodName.startsWith("find")) {
            DataSourceContextHolder.setDataSourceType("slave");
        } else {
            DataSourceContextHolder.setDataSourceType("master");
        }
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void after() {
        DataSourceContextHolder.clearDataSourceType();
    }
}

注意事项:

  • 主从同步延迟可能导致数据不一致,对于强一致性要求的场景需谨慎使用
  • 可以结合半同步复制降低延迟
  • 读写分离中间件可以提供更灵活的路由策略

1.2 分库分表策略

当单表数据量超过千万级别时,需要考虑分库分表。分库分表分为垂直拆分和水平拆分。

垂直拆分:

  • 按业务模块拆分:用户表、订单表、商品表分到不同数据库
  • 按列拆分:将大字段(如文章内容)拆分到单独的表

水平拆分:

  • 按范围拆分:按时间范围(如按月分表)
  • 按哈希拆分:按用户ID取模分表

代码示例(使用ShardingSphere实现分表):

# sharding.yaml 配置文件
dataSources:
  ds_0: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_0
    username: root
    password: root
  ds_1: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_1
    username: root
    password: root

shardingRule:
  tables:
    order:
      actualDataNodes: ds_${0..1}.order_${0..3}
      tableStrategy:
        standard:
          shardingColumn: order_id
          preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
  defaultDatabaseStrategy:
    standard:
      shardingColumn: user_id
      preciseAlgorithmClassName: com.example.DatabaseShardingAlgorithm
// 分表算法实现
public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long orderId = shardingValue.getValue();
        // 根据订单ID取模分表
        int tableIndex = (int) (orderId % 4);
        String tableName = "order_" + tableIndex;
        
        // 检查目标表是否存在
        if (availableTargetNames.contains(tableName)) {
            return tableName;
        }
        throw new IllegalArgumentException("无法找到分表:" + tableName);
    }
}

// 分库算法实现
public class DatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        // 根据用户ID取模分库
        int dbIndex = (int) (userId % 2);
        String dbName = "ds_" + dbIndex;
        
        if (availableTargetNames.contains(dbName)) {
            return dbName;
        }
        throw new IllegalArgumentException("无法找到分库:" + dbName);
    }
}

分库分表后的查询优化:

  • 避免跨库JOIN,尽量在业务层组装数据
  • 使用全局表(如字典表)进行冗余存储
  • 分布式事务处理(使用Seata等框架)

1.3 连接池优化

连接池是数据库访问的入口,合理的配置能显著提升并发能力。

常用连接池对比:

  • HikariCP:性能最佳,推荐使用
  • Druid:功能丰富,监控完善
  • C3P0:老牌连接池,性能一般

HikariCP配置示例:

spring:
  datasource:
    hikari:
      # 连接池大小
      maximum-pool-size: 20
      minimum-idle: 5
      # 连接超时
      connection-timeout: 30000
      # 空闲连接存活时间
      idle-timeout: 600000
      # 连接最大生命周期
      max-lifetime: 1800000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 连接预热
      initialization-fail-timeout: 1
      # 连接泄漏检测
      leak-detection-threshold: 60000

连接池大小计算公式:

连接数 = (核心数 * 2) + 有效磁盘数

对于高并发读场景,可以适当增加连接数;对于写密集型场景,需要控制连接数避免锁竞争。

二、数据库设计优化

2.1 索引优化策略

索引是提升查询性能的关键。在高并发场景下,索引设计需要特别谨慎。

索引设计原则:

  • 覆盖索引:查询字段都在索引中,避免回表
  • 最左前缀原则:复合索引的使用顺序
  • 避免冗余索引和重复索引

代码示例(索引优化实战):

-- 1. 原始查询(性能差)
SELECT user_id, username, email 
FROM users 
WHERE age > 25 AND city = 'Beijing' 
ORDER BY create_time DESC 
LIMIT 10;

-- 2. 优化后的索引设计
-- 创建复合索引(注意顺序:等值查询在前,范围查询在后)
CREATE INDEX idx_age_city_create_time ON users(age, city, create_time);

-- 3. 覆盖索引示例
-- 查询只需要索引字段,无需回表
SELECT user_id, age, city 
FROM users 
WHERE age > 25 AND city = 'Beijing';

-- 4. 索引提示(强制使用特定索引)
SELECT /*+ INDEX(users idx_age_city_create_time) */ * 
FROM users 
WHERE age > 25 AND city = 'Beijing';

-- 5. 索引下推优化(MySQL 5.6+)
-- 索引下推可以在存储引擎层过滤数据,减少回表次数
EXPLAIN SELECT * FROM users WHERE age > 25 AND city = 'Beijing';
-- 查看Extra列是否包含"Using index condition"

索引监控与维护:

-- 查看索引使用情况
SELECT * FROM sys.schema_unused_indexes;

-- 查看索引大小
SELECT 
    table_name,
    index_name,
    ROUND(index_length / 1024 / 1024, 2) AS index_size_mb
FROM information_schema.tables
WHERE table_schema = 'your_database';

-- 重建索引(避免在业务高峰期执行)
ALTER TABLE users DROP INDEX idx_old, ADD INDEX idx_new(column1, column2);

2.2 表结构设计优化

数据类型选择:

  • 使用最小的数据类型:TINYINT代替INT,VARCHAR(100)代替VARCHAR(255)
  • 避免使用TEXT/BLOB类型,大字段单独存储
  • 使用ENUM代替字符串类型

表结构优化示例:

-- 优化前(数据类型过大)
CREATE TABLE user_info (
    id INT PRIMARY KEY,
    username VARCHAR(255),
    age INT,
    status TINYINT,
    create_time DATETIME,
    description TEXT
);

-- 优化后(使用合适的数据类型)
CREATE TABLE user_info (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(64) NOT NULL,
    age TINYINT UNSIGNED,
    status TINYINT DEFAULT 0,
    create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
    description VARCHAR(1000),
    INDEX idx_username (username),
    INDEX idx_age_status (age, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 大字段单独存储
CREATE TABLE user_description (
    user_id BIGINT PRIMARY KEY,
    description TEXT,
    FOREIGN KEY (user_id) REFERENCES user_info(id)
);

2.3 事务优化

高并发场景下,事务设计直接影响系统性能。

事务优化策略:

  • 缩短事务时间:减少锁持有时间
  • 合理设置隔离级别:根据业务需求选择
  • 避免长事务

代码示例(事务优化):

// 1. 优化前(长事务)
@Transactional
public void processOrder(Long orderId) {
    // 查询订单
    Order order = orderMapper.selectById(orderId);
    
    // 模拟耗时操作(如调用外部API)
    Thread.sleep(5000); // 5秒
    
    // 更新订单状态
    order.setStatus(2);
    orderMapper.updateById(order);
}

// 2. 优化后(拆分事务)
public void processOrder(Long orderId) {
    // 1. 查询订单(非事务操作)
    Order order = orderMapper.selectById(orderId);
    
    // 2. 调用外部API(非事务操作)
    callExternalAPI(order);
    
    // 3. 更新订单状态(短事务)
    updateOrderStatus(orderId);
}

@Transactional
public void updateOrderStatus(Long orderId) {
    Order order = orderMapper.selectById(orderId);
    order.setStatus(2);
    orderMapper.updateById(order);
}

// 3. 使用事务传播行为控制
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    // 外层事务,内层独立事务
    @Transactional(propagation = Propagation.REQUIRED)
    public void createOrderWithPayment(Order order) {
        // 创建订单(主事务)
        orderMapper.insert(order);
        
        // 调用支付服务(独立事务,即使支付失败也不影响订单创建)
        processPayment(order);
    }
    
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void processPayment(Order order) {
        // 支付逻辑
        paymentService.pay(order);
    }
}

三、缓存机制实战

3.1 多级缓存架构

在高并发场景下,缓存是减轻数据库压力的最有效手段。

缓存架构设计:

客户端 → CDN → 应用缓存(Redis) → 数据库缓存(MySQL Buffer Pool)

代码示例(Spring Boot + Redis缓存):

// 1. 配置Redis缓存
@Configuration
@EnableCaching
public class RedisCacheConfig {
    
    @Bean
    public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(10))
            .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
        
        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
    }
}

// 2. 缓存注解使用
@Service
public class UserService {
    
    @Autowired
    private UserMapper userMapper;
    
    // 缓存查询结果
    @Cacheable(value = "users", key = "#userId")
    public User getUserById(Long userId) {
        return userMapper.selectById(userId);
    }
    
    // 更新缓存
    @CachePut(value = "users", key = "#user.id")
    public User updateUser(User user) {
        userMapper.updateById(user);
        return user;
    }
    
    // 删除缓存
    @CacheEvict(value = "users", key = "#userId")
    public void deleteUser(Long userId) {
        userMapper.deleteById(userId);
    }
    
    // 复杂缓存逻辑
    @Cacheable(value = "users", key = "#userId", unless = "#result == null")
    public User getUserWithCache(Long userId) {
        // 先查缓存
        User user = redisTemplate.opsForValue().get("user:" + userId);
        if (user != null) {
            return user;
        }
        
        // 缓存未命中,查数据库
        user = userMapper.selectById(userId);
        
        // 写入缓存
        if (user != null) {
            redisTemplate.opsForValue().set("user:" + userId, user, Duration.ofMinutes(10));
        }
        
        return user;
    }
}

3.2 缓存穿透、击穿、雪崩防护

缓存穿透防护:

// 布隆过滤器防止缓存穿透
@Component
public class BloomFilterCache {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private BloomFilter<String> bloomFilter;
    
    @PostConstruct
    public void init() {
        // 初始化布隆过滤器
        bloomFilter = BloomFilter.create(
            Funnels.stringFunnel(Charset.defaultCharset()),
            1000000, // 预期元素数量
            0.01    // 误判率
        );
    }
    
    public User getUser(Long userId) {
        String key = "user:" + userId;
        
        // 1. 布隆过滤器检查
        if (!bloomFilter.mightContain(key)) {
            // 不存在,直接返回null
            return null;
        }
        
        // 2. 查询缓存
        User user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        }
        
        // 3. 查询数据库
        user = userMapper.selectById(userId);
        
        // 4. 更新缓存和布隆过滤器
        if (user != null) {
            redisTemplate.opsForValue().set(key, user, Duration.ofMinutes(10));
            bloomFilter.put(key);
        } else {
            // 缓存空值,防止重复查询
            redisTemplate.opsForValue().set(key, null, Duration.ofMinutes(1));
        }
        
        return user;
    }
}

缓存击穿防护:

// 使用分布式锁防止缓存击穿
@Service
public class CacheBreakdownService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    private static final String LOCK_PREFIX = "lock:";
    private static final String CACHE_PREFIX = "user:";
    
    public User getUserWithLock(Long userId) {
        String cacheKey = CACHE_PREFIX + userId;
        String lockKey = LOCK_PREFIX + userId;
        
        // 1. 查询缓存
        User user = (User) redisTemplate.opsForValue().get(cacheKey);
        if (user != null) {
            return user;
        }
        
        // 2. 获取分布式锁
        Boolean lockAcquired = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, "1", Duration.ofSeconds(30));
        
        if (Boolean.TRUE.equals(lockAcquired)) {
            try {
                // 3. 双重检查(防止锁竞争)
                user = (User) redisTemplate.opsForValue().get(cacheKey);
                if (user != null) {
                    return user;
                }
                
                // 4. 查询数据库
                user = userMapper.selectById(userId);
                
                // 5. 更新缓存
                if (user != null) {
                    redisTemplate.opsForValue().set(cacheKey, user, Duration.ofMinutes(10));
                } else {
                    // 缓存空值
                    redisTemplate.opsForValue().set(cacheKey, null, Duration.ofMinutes(1));
                }
                
                return user;
            } finally {
                // 6. 释放锁
                redisTemplate.delete(lockKey);
            }
        } else {
            // 等待并重试
            try {
                Thread.sleep(100);
                return getUserWithLock(userId);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }
}

缓存雪崩防护:

// 缓存过期时间随机化
@Service
public class CacheAvalancheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void setCacheWithRandomTTL(String key, Object value) {
        // 基础过期时间
        int baseTTL = 600; // 10分钟
        
        // 随机因子(±30%)
        Random random = new Random();
        int randomFactor = random.nextInt(600) - 300; // -300 ~ 300秒
        
        // 最终过期时间
        int finalTTL = baseTTL + randomFactor;
        
        redisTemplate.opsForValue().set(key, value, Duration.ofSeconds(finalTTL));
    }
    
    // 缓存预热
    @PostConstruct
    public void cacheWarmUp() {
        // 定时任务预热热点数据
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            // 预热热点数据
            List<Long> hotUserIds = getHotUserIds();
            for (Long userId : hotUserIds) {
                User user = userMapper.selectById(userId);
                if (user != null) {
                    setCacheWithRandomTTL("user:" + userId, user);
                }
            }
        }, 0, 5, TimeUnit.MINUTES);
    }
}

3.3 缓存与数据库一致性

最终一致性方案:

// 1. 更新数据库后异步更新缓存
@Service
public class CacheUpdateService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    // 更新数据库
    @Transactional
    public void updateUser(User user) {
        userMapper.updateById(user);
        
        // 异步更新缓存(使用消息队列或线程池)
        CompletableFuture.runAsync(() -> {
            try {
                // 延迟更新,避免缓存穿透
                Thread.sleep(1000);
                redisTemplate.delete("user:" + user.getId());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

// 2. 使用Canal监听数据库变更
@Component
public class CanalListener {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 监听数据库binlog变更
    @CanalEventListener
    public void handleUpdate(CanalEntry.Event event) {
        if (event.getTableName().equals("user")) {
            // 解析变更数据
            List<CanalEntry.RowData> rowDatas = event.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatas) {
                // 获取变更后的数据
                Map<String, String> afterData = parseRowData(rowData.getAfterColumnsList());
                String userId = afterData.get("id");
                
                // 更新缓存
                User user = userMapper.selectById(Long.parseLong(userId));
                if (user != null) {
                    redisTemplate.opsForValue().set("user:" + userId, user, Duration.ofMinutes(10));
                }
            }
        }
    }
}

四、SQL优化与执行计划

4.1 慢查询分析

慢查询日志配置:

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录

-- 查看慢查询统计
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

-- 分析慢查询日志
mysqldumpslow /var/log/mysql/slow.log

使用Percona Toolkit分析:

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 分析特定时间段的慢查询
pt-query-digest --since='2024-01-01 00:00:00' --until='2024-01-02 00:00:00' /var/log/mysql/slow.log

4.2 执行计划分析

EXPLAIN命令详解:

-- 基本使用
EXPLAIN SELECT * FROM users WHERE age > 25 AND city = 'Beijing';

-- 查看详细执行计划
EXPLAIN FORMAT=JSON SELECT * FROM users WHERE age > 25 AND city = 'Beijing';

-- 查看优化器建议
EXPLAIN ANALYZE SELECT * FROM users WHERE age > 25 AND city = 'Beijing';

执行计划关键字段解读:

  • type:访问类型(ALL > index > range > ref > eq_ref > const > system)
  • key:实际使用的索引
  • rows:预估扫描行数
  • Extra:额外信息(Using index, Using where, Using filesort等)

优化示例:

-- 优化前(全表扫描)
EXPLAIN SELECT * FROM orders WHERE status = 'pending' AND create_time > '2024-01-01';

-- 优化后(使用索引)
CREATE INDEX idx_status_create_time ON orders(status, create_time);
EXPLAIN SELECT * FROM orders WHERE status = 'pending' AND create_time > '2024-01-01';

-- 查看索引使用情况
SHOW INDEX FROM orders;

4.3 SQL重写技巧

避免使用SELECT *:

-- 优化前
SELECT * FROM users WHERE id = 1;

-- 优化后(只查询需要的字段)
SELECT id, username, email FROM users WHERE id = 1;

使用JOIN替代子查询:

-- 优化前(子查询)
SELECT * FROM orders 
WHERE user_id IN (SELECT id FROM users WHERE status = 'active');

-- 优化后(JOIN)
SELECT o.* FROM orders o
INNER JOIN users u ON o.user_id = u.id
WHERE u.status = 'active';

使用UNION ALL替代UNION:

-- 优化前(去重,性能差)
SELECT * FROM orders WHERE status = 'pending'
UNION
SELECT * FROM orders WHERE status = 'processing';

-- 优化后(不去重,性能好)
SELECT * FROM orders WHERE status = 'pending'
UNION ALL
SELECT * FROM orders WHERE status = 'processing';

五、监控与调优

5.1 监控指标

关键监控指标:

  • QPS/TPS:每秒查询/事务数
  • 连接数:当前连接数 vs 最大连接数
  • 缓存命中率:InnoDB Buffer Pool命中率
  • 锁等待:锁等待时间
  • 慢查询数:慢查询数量

监控SQL示例:

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';

-- 查看InnoDB Buffer Pool命中率
SELECT 
    (1 - (SELECT variable_value FROM information_schema.global_status 
          WHERE variable_name = 'Innodb_buffer_pool_reads') / 
          (SELECT variable_value FROM information_schema.global_status 
           WHERE variable_name = 'Innodb_buffer_pool_read_requests')) * 100 
    AS buffer_pool_hit_rate;

-- 查看锁等待
SELECT * FROM information_schema.innodb_locks;
SELECT * FROM information_schema.innodb_lock_waits;

5.2 性能调优工具

Percona Monitoring and Management (PMM):

# 安装PMM客户端
sudo apt-get install pmm2-client

# 连接到PMM服务器
pmm-admin config --server-url=http://pmm-server:80 --server-insecure-tls

# 添加MySQL监控
pmm-admin add mysql --username=pmm --password=pmm --query-source=perfschema

MySQL Workbench性能分析:

  • 使用Performance Dashboard查看实时性能
  • 使用Explain Plan可视化执行计划
  • 使用Schema Inspector分析表结构

5.3 自动化调优

使用MySQL 8.0的自动索引建议:

-- 开启索引建议(MySQL 8.0+)
SET GLOBAL innodb_adaptive_hash_index = ON;

-- 查看索引建议
SELECT * FROM sys.schema_unused_indexes;
SELECT * FROM sys.schema_redundant_indexes;

使用pt-archiver进行数据归档:

# 归档旧数据,减少表大小
pt-archiver --source h=localhost,u=root,p=password,D=test,t=orders \
            --dest h=localhost,u=root,p=password,D=test_archive,t=orders_archive \
            --where "create_time < '2023-01-01'" \
            --bulk-delete --bulk-insert --commit-each

六、实战案例:电商秒杀系统

6.1 架构设计

用户请求 → Nginx → 应用服务器 → Redis集群 → MySQL主库
                    ↓
                消息队列 → 异步处理 → MySQL从库

6.2 核心代码实现

// 秒杀服务
@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 秒杀商品库存key
    private static final String STOCK_KEY = "seckill:stock:";
    // 秒杀订单key
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 秒杀下单
     */
    @Transactional
    public SeckillResult seckill(Long userId, Long productId) {
        String stockKey = STOCK_KEY + productId;
        String orderKey = ORDER_KEY + userId + ":" + productId;
        
        // 1. 检查是否已秒杀
        if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
            return SeckillResult.fail("已秒杀成功");
        }
        
        // 2. 预减库存(使用Redis原子操作)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            // 库存不足,恢复库存
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 生成秒杀订单(异步)
        SeckillOrder order = new SeckillOrder();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setStatus(0); // 待支付
        order.setCreateTime(new Date());
        
        // 4. 发送消息到MQ,异步创建正式订单
        rabbitTemplate.convertAndSend("seckill.order", order);
        
        // 5. 记录已秒杀
        redisTemplate.opsForValue().set(orderKey, order, Duration.ofMinutes(30));
        
        return SeckillResult.success("秒杀成功,订单处理中");
    }
    
    /**
     * 异步创建订单(消费者)
     */
    @RabbitListener(queues = "seckill.order")
    public void createOrder(SeckillOrder order) {
        try {
            // 1. 检查库存(数据库)
            Product product = productMapper.selectById(order.getProductId());
            if (product.getStock() <= 0) {
                // 库存不足,回滚Redis库存
                redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
                return;
            }
            
            // 2. 扣减数据库库存
            int update = productMapper.decreaseStock(order.getProductId());
            if (update == 0) {
                // 库存不足,回滚Redis库存
                redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
                return;
            }
            
            // 3. 创建订单
            orderMapper.insert(order);
            
            // 4. 发送支付通知
            sendPaymentNotification(order);
            
        } catch (Exception e) {
            // 异常处理,回滚库存
            redisTemplate.opsForValue().increment(STOCK_KEY + order.getProductId());
            log.error("创建订单失败", e);
        }
    }
}

6.3 数据库表设计

-- 秒杀商品表
CREATE TABLE seckill_product (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    stock INT NOT NULL,
    start_time DATETIME NOT NULL,
    end_time DATETIME NOT NULL,
    version INT DEFAULT 0, -- 乐观锁版本号
    INDEX idx_time (start_time, end_time)
) ENGINE=InnoDB;

-- 秒杀订单表
CREATE TABLE seckill_order (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    status TINYINT NOT NULL DEFAULT 0,
    create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
    update_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
    INDEX idx_user_product (user_id, product_id),
    INDEX idx_status (status)
) ENGINE=InnoDB;

-- 库存流水表(用于对账)
CREATE TABLE stock_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    product_id BIGINT NOT NULL,
    change_amount INT NOT NULL, -- 正数增加,负数减少
    type TINYINT NOT NULL, -- 1:秒杀扣减,2:回滚,3:手动调整
    order_id BIGINT,
    create_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
    INDEX idx_product_time (product_id, create_time)
) ENGINE=InnoDB;

七、总结

MySQL高并发处理是一个系统工程,需要从架构、设计、缓存、监控等多个维度综合考虑。关键要点包括:

  1. 架构层面:读写分离、分库分表、连接池优化
  2. 数据库设计:合理的索引、表结构、事务设计
  3. 缓存机制:多级缓存、防护策略、一致性保证
  4. SQL优化:执行计划分析、慢查询优化
  5. 监控调优:实时监控、自动化调优

在实际应用中,需要根据业务特点和性能瓶颈选择合适的策略。建议先通过监控工具定位瓶颈,再针对性地优化,避免过度设计。同时,保持架构的可扩展性,为未来的业务增长预留空间。

最后,高并发优化是一个持续的过程,需要不断地监控、分析和调整,才能达到最佳的性能表现。