在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交网络还是在线支付系统,MySQL作为最流行的开源关系型数据库,常常面临巨大的并发压力。本文将从数据库优化、应用层优化、架构升级三个层面,系统性地探讨MySQL高并发处理的实战策略,并提供详细的代码示例和配置说明。

一、数据库层优化:榨取MySQL的每一滴性能

1.1 索引优化:高并发的基石

索引是数据库性能的基石,不当的索引设计会导致全表扫描,严重拖累高并发场景下的性能。

1.1.1 索引设计原则

  • 覆盖索引:查询所需字段全部包含在索引中,避免回表操作
  • 最左前缀原则:复合索引必须从左到右连续使用
  • 避免冗余索引:定期检查并删除重复索引

1.1.2 实战案例:订单表索引优化

假设我们有一个订单表 orders,包含以下字段:

CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    order_no VARCHAR(64) NOT NULL,
    status TINYINT NOT NULL,
    amount DECIMAL(10,2) NOT NULL,
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    INDEX idx_user_id (user_id),
    INDEX idx_order_no (order_no),
    INDEX idx_status_create_time (status, create_time)
);

问题场景:用户查询自己的订单列表,按创建时间倒序排列

-- 原始查询(可能使用idx_user_id索引,但需要回表)
SELECT * FROM orders WHERE user_id = 12345 ORDER BY create_time DESC;

-- 优化后的查询(使用覆盖索引)
SELECT order_no, status, amount, create_time 
FROM orders 
WHERE user_id = 12345 
ORDER BY create_time DESC;

索引优化方案

-- 创建覆盖索引
ALTER TABLE orders ADD INDEX idx_user_create_cover (user_id, create_time, order_no, status, amount);

验证索引使用情况

EXPLAIN SELECT order_no, status, amount, create_time 
FROM orders 
WHERE user_id = 12345 
ORDER BY create_time DESC;

输出结果中 key 字段应显示 idx_user_create_coverExtra 字段显示 Using index,表示使用了覆盖索引。

1.2 查询优化:避免慢查询

1.2.1 慢查询日志配置

# my.cnf 配置
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录执行时间超过1秒的查询
log_queries_not_using_indexes = 1  # 记录未使用索引的查询

1.2.2 使用EXPLAIN分析查询

-- 示例:分析一个复杂查询
EXPLAIN 
SELECT o.order_no, u.username, p.product_name 
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 1 
  AND o.create_time >= '2024-01-01'
  AND u.region = '北京'
ORDER BY o.amount DESC
LIMIT 100;

关键指标解读

  • type:访问类型(ALL > index > range > ref > eq_ref > const > system)
  • rows:预估扫描行数
  • Extra:额外信息(Using filesort, Using temporary等)

1.2.3 避免常见性能陷阱

*陷阱1:SELECT **

-- 不推荐
SELECT * FROM orders WHERE user_id = 12345;

-- 推荐(明确指定字段)
SELECT id, order_no, status, amount FROM orders WHERE user_id = 12345;

陷阱2:在WHERE子句中使用函数

-- 不推荐(无法使用索引)
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- 推荐(使用范围查询)
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

陷阱3:隐式类型转换

-- 不推荐(order_no是VARCHAR,传入数字会导致全表扫描)
SELECT * FROM orders WHERE order_no = 123456789;

-- 推荐(使用正确的类型)
SELECT * FROM orders WHERE order_no = '123456789';

1.3 表结构优化

1.3.1 数据类型选择

-- 不推荐(使用过大的数据类型)
CREATE TABLE user_info (
    id BIGINT PRIMARY KEY,
    age INT,  -- 年龄用TINYINT足够(0-255)
    status TINYINT,  -- 状态用TINYINT足够
    price DECIMAL(20,10)  -- 价格用DECIMAL(10,2)足够
);

-- 推荐(选择合适的数据类型)
CREATE TABLE user_info (
    id BIGINT PRIMARY KEY,
    age TINYINT UNSIGNED,  -- 0-255
    status TINYINT,
    price DECIMAL(10,2)
);

1.3.2 分区表设计

对于数据量大的表,可以使用分区表来提升查询性能:

-- 按时间范围分区
CREATE TABLE logs (
    id BIGINT AUTO_INCREMENT,
    log_time DATETIME NOT NULL,
    content TEXT,
    PRIMARY KEY (id, log_time)
) PARTITION BY RANGE (YEAR(log_time) * 100 + MONTH(log_time)) (
    PARTITION p202401 VALUES LESS THAN (202402),
    PARTITION p202402 VALUES LESS THAN (202403),
    PARTITION p202403 VALUES LESS THAN (202404),
    PARTITION p_max VALUES LESS THAN MAXVALUE
);

-- 查询时会自动选择分区
SELECT * FROM logs WHERE log_time >= '2024-01-01' AND log_time < '2024-02-01';

1.4 配置优化

1.4.1 InnoDB关键参数配置

# my.cnf 配置示例(根据服务器内存调整)
[mysqld]
# 内存配置
innodb_buffer_pool_size = 16G  # 建议设置为总内存的70-80%
innodb_buffer_pool_instances = 8  # 缓冲池实例数,建议4-8

# 日志配置
innodb_log_file_size = 2G  # 日志文件大小
innodb_log_buffer_size = 64M  # 日志缓冲区大小
innodb_flush_log_at_trx_commit = 1  # 1:每次提交都刷盘(安全),2:每秒刷盘(性能更好)

# 并发配置
innodb_thread_concurrency = 32  # InnoDB线程并发数
innodb_read_io_threads = 8  # 读线程数
innodb_write_io_threads = 8  # 写线程数

# 锁配置
innodb_lock_wait_timeout = 50  # 锁等待超时时间(秒)
innodb_rollback_on_timeout = 0  # 超时是否回滚

# 连接配置
max_connections = 1000  # 最大连接数
thread_cache_size = 100  # 线程缓存

1.4.2 连接池配置

使用连接池避免频繁创建连接的开销:

// HikariCP 连接池配置示例
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
config.setUsername("root");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");

// 连接池配置
config.setMaximumPoolSize(100);  // 最大连接数
config.setMinimumIdle(20);       // 最小空闲连接
config.setConnectionTimeout(30000);  // 连接超时时间(毫秒)
config.setIdleTimeout(600000);   // 空闲连接超时时间
config.setMaxLifetime(1800000);  // 连接最大存活时间
config.setLeakDetectionThreshold(2000);  // 连接泄漏检测阈值

// 性能优化
config.setReadOnly(false);       // 是否只读
config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
config.setConnectionTestQuery("SELECT 1");

HikariDataSource dataSource = new HikariDataSource(config);

二、应用层优化:减轻数据库压力

2.1 缓存策略

2.1.1 多级缓存架构

用户请求 → 本地缓存 → 分布式缓存 → 数据库

2.1.2 Redis缓存实战

// Spring Boot + Redis 缓存示例
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 缓存配置
    private static final String ORDER_CACHE_PREFIX = "order:";
    private static final long CACHE_TTL = 300; // 5分钟
    
    /**
     * 获取订单详情(带缓存)
     */
    public Order getOrderWithCache(Long orderId) {
        String cacheKey = ORDER_CACHE_PREFIX + orderId;
        
        // 1. 先从缓存获取
        Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
        if (order != null) {
            return order;
        }
        
        // 2. 缓存未命中,查询数据库
        order = orderMapper.selectById(orderId);
        if (order != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return order;
    }
    
    /**
     * 更新订单(更新缓存)
     */
    @Transactional
    public void updateOrder(Order order) {
        // 1. 更新数据库
        orderMapper.updateById(order);
        
        // 2. 删除缓存(或更新缓存)
        String cacheKey = ORDER_CACHE_PREFIX + order.getId();
        redisTemplate.delete(cacheKey);
        
        // 3. 异步更新缓存(避免缓存穿透)
        CompletableFuture.runAsync(() -> {
            Order freshOrder = orderMapper.selectById(order.getId());
            if (freshOrder != null) {
                redisTemplate.opsForValue().set(cacheKey, freshOrder, CACHE_TTL, TimeUnit.SECONDS);
            }
        });
    }
}

2.1.3 缓存穿透、击穿、雪崩解决方案

缓存穿透(查询不存在的数据):

// 布隆过滤器 + 空值缓存
public Order getOrderWithBloomFilter(Long orderId) {
    String cacheKey = ORDER_CACHE_PREFIX + orderId;
    
    // 1. 检查布隆过滤器(防止查询不存在的数据)
    if (!bloomFilter.mightContain(orderId)) {
        return null;
    }
    
    // 2. 查询缓存
    Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
    if (order != null) {
        return order;
    }
    
    // 3. 查询数据库
    order = orderMapper.selectById(orderId);
    
    // 4. 缓存结果(包括null)
    if (order != null) {
        redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
    } else {
        // 缓存空值,设置较短的TTL
        redisTemplate.opsForValue().set(cacheKey, "NULL", 60, TimeUnit.SECONDS);
    }
    
    return order;
}

缓存击穿(热点key过期):

// 使用分布式锁
public Order getOrderWithLock(Long orderId) {
    String cacheKey = ORDER_CACHE_PREFIX + orderId;
    String lockKey = "lock:" + orderId;
    
    // 1. 查询缓存
    Order order = (Order) redisTemplate.opsForValue().get(cacheKey);
    if (order != null) {
        return order;
    }
    
    // 2. 获取分布式锁
    RLock lock = redissonClient.getLock(lockKey);
    try {
        // 尝试加锁,最多等待3秒,锁持有时间10秒
        if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {
            // 双重检查
            order = (Order) redisTemplate.opsForValue().get(cacheKey);
            if (order != null) {
                return order;
            }
            
            // 查询数据库
            order = orderMapper.selectById(orderId);
            if (order != null) {
                redisTemplate.opsForValue().set(cacheKey, order, CACHE_TTL, TimeUnit.SECONDS);
            }
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
    
    return order;
}

缓存雪崩(大量key同时过期):

// 随机化TTL
public void setCacheWithRandomTTL(String key, Object value) {
    // 基础TTL + 随机值(避免同时过期)
    long ttl = CACHE_TTL + ThreadLocalRandom.current().nextLong(0, 300);
    redisTemplate.opsForValue().set(key, value, ttl, TimeUnit.SECONDS);
}

// 热点数据永不过期 + 后台更新
public void updateHotData(Long orderId) {
    String cacheKey = ORDER_CACHE_PREFIX + orderId;
    
    // 1. 更新数据库
    Order order = orderMapper.selectById(orderId);
    
    // 2. 异步更新缓存(永不过期)
    CompletableFuture.runAsync(() -> {
        redisTemplate.opsForValue().set(cacheKey, order);
    });
}

2.2 读写分离

2.2.1 主从复制配置

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7

-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read_only = 1  # 从库只读

2.2.2 应用层读写分离实现

// Spring Boot + ShardingSphere 读写分离配置
@Configuration
public class DataSourceConfig {
    
    @Bean
    public DataSource masterDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://master:3306/mydb");
        config.setUsername("root");
        config.setPassword("password");
        return new HikariDataSource(config);
    }
    
    @Bean
    public DataSource slaveDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://slave:3306/mydb");
        config.setUsername("root");
        config.setPassword("password");
        return new HikariDataSource(config);
    }
    
    @Bean
    public DataSource routingDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        
        RoutingDataSource routingDataSource = new RoutingDataSource();
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        
        return routingDataSource;
    }
    
    @Bean
    public DataSourceTransactionManager transactionManager(DataSource routingDataSource) {
        return new DataSourceTransactionManager(routingDataSource);
    }
    
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource routingDataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(routingDataSource);
        return bean.getObject();
    }
}

// 自定义路由数据源
public class RoutingDataSource extends AbstractRoutingDataSource {
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setMaster() {
        CONTEXT_HOLDER.set("master");
    }
    
    public static void setSlave() {
        CONTEXT_HOLDER.set("slave");
    }
    
    public static void clear() {
        CONTEXT_HOLDER.remove();
    }
    
    @Override
    protected Object determineCurrentLookupKey() {
        return CONTEXT_HOLDER.get();
    }
}

// 服务层使用
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RoutingDataSource routingDataSource;
    
    /**
     * 写操作使用主库
     */
    @Transactional
    public void createOrder(Order order) {
        RoutingDataSource.setMaster();
        try {
            orderMapper.insert(order);
        } finally {
            RoutingDataSource.clear();
        }
    }
    
    /**
     * 读操作使用从库
     */
    public Order getOrder(Long orderId) {
        RoutingDataSource.setSlave();
        try {
            return orderMapper.selectById(orderId);
        } finally {
            RoutingDataSource.clear();
        }
    }
}

2.3 异步处理

2.3.1 消息队列解耦

// RabbitMQ 异步处理订单
@Service
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 创建订单(异步处理)
     */
    @Transactional
    public void createOrderAsync(Order order) {
        // 1. 保存订单到数据库(核心数据)
        orderMapper.insert(order);
        
        // 2. 发送消息到队列(异步处理非核心业务)
        OrderMessage message = new OrderMessage();
        message.setOrderId(order.getId());
        message.setAction("CREATE");
        
        rabbitTemplate.convertAndSend("order.exchange", "order.create", message);
        
        // 3. 返回响应(不等待异步任务完成)
    }
    
    /**
     * 消费消息处理订单
     */
    @RabbitListener(queues = "order.queue")
    public void processOrderMessage(OrderMessage message) {
        // 异步处理:发送邮件、更新统计、通知库存等
        switch (message.getAction()) {
            case "CREATE":
                sendOrderEmail(message.getOrderId());
                updateOrderStatistics(message.getOrderId());
                notifyInventory(message.getOrderId());
                break;
            case "UPDATE":
                // 处理更新逻辑
                break;
        }
    }
    
    private void sendOrderEmail(Long orderId) {
        // 发送邮件逻辑
    }
    
    private void updateOrderStatistics(Long orderId) {
        // 更新统计信息
    }
    
    private void notifyInventory(Long orderId) {
        // 通知库存系统
    }
}

2.3.2 批量处理

// 批量插入优化
@Service
public class BatchService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 批量插入订单(使用MyBatis批量插入)
     */
    public void batchInsertOrders(List<Order> orders) {
        // 分批处理,避免单次SQL过大
        int batchSize = 1000;
        for (int i = 0; i < orders.size(); i += batchSize) {
            int end = Math.min(i + batchSize, orders.size());
            List<Order> batch = orders.subList(i, end);
            
            // 使用MyBatis批量插入
            orderMapper.batchInsert(batch);
            
            // 每批提交后清理session,释放内存
            if (i % 10000 == 0) {
                System.gc();
            }
        }
    }
    
    /**
     * 批量更新(使用ON DUPLICATE KEY UPDATE)
     */
    public void batchUpdateOrders(List<Order> orders) {
        // 生成批量更新SQL
        StringBuilder sql = new StringBuilder();
        sql.append("INSERT INTO orders (id, order_no, status, amount) VALUES ");
        
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            if (i > 0) sql.append(",");
            sql.append(String.format("(%d, '%s', %d, %.2f)", 
                order.getId(), order.getOrderNo(), order.getStatus(), order.getAmount()));
        }
        
        sql.append(" ON DUPLICATE KEY UPDATE status=VALUES(status), amount=VALUES(amount)");
        
        // 执行批量更新
        orderMapper.batchUpdate(sql.toString());
    }
}

三、架构升级:从单机到分布式

3.1 读写分离架构

3.1.1 架构图

应用层
  ↓
负载均衡(Nginx)
  ↓
应用服务器集群
  ↓
数据库代理层(ShardingSphere/ProxySQL)
  ↓
主库(写) ← 同步 → 从库1(读)
          ← 同步 → 从库2(读)
          ← 同步 → 从库3(读)

3.1.2 使用ShardingSphere实现读写分离

# sharding.yaml 配置
dataSources:
  ds_0:
    url: jdbc:mysql://master:3306/mydb
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 600000
    maxLifetimeMilliseconds: 1800000
    maximumPoolSize: 50
  ds_1:
    url: jdbc:mysql://slave1:3306/mydb
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 600000
    maxLifetimeMilliseconds: 1800000
    maximumPoolSize: 50
  ds_2:
    url: jdbc:mysql://slave2:3306/mydb
    username: root
    password: password
    connectionTimeoutMilliseconds: 30000
    idleTimeoutMilliseconds: 600000
    maxLifetimeMilliseconds: 1800000
    maximumPoolSize: 50

shardingRule:
  tables:
    orders:
      actualDataNodes: ds_0.orders
      tableStrategy:
        standard:
          shardingColumn: id
          preciseAlgorithmClassName: com.example.OrderPreciseShardingAlgorithm
      databaseStrategy:
        standard:
          shardingColumn: user_id
          preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
  defaultDatabaseStrategy:
    standard:
      shardingColumn: user_id
      preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
  defaultTableStrategy:
    complex:
      shardingColumns: id, user_id
      algorithmClassName: com.example.OrderComplexShardingAlgorithm
  
  # 读写分离配置
  masterSlaveRules:
    ds_0:
      name: ds_0
      masterDataSourceName: ds_0
      slaveDataSourceNames:
        - ds_1
        - ds_2
      loadBalanceAlgorithmType: ROUND_ROBIN  # 轮询负载均衡

# 分片算法实现
public class UserDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        // 根据user_id取模决定数据源
        int index = (int) (userId % availableTargetNames.size());
        return availableTargetNames.toArray(new String[0])[index];
    }
}

3.2 分库分表架构

3.2.1 分库分表策略

垂直分库:按业务模块拆分数据库

用户库(user_db):用户表、用户画像表
订单库(order_db):订单表、订单详情表
商品库(product_db):商品表、库存表

水平分表:按数据特征拆分表

订单表按用户ID分片:
orders_0: user_id % 4 == 0
orders_1: user_id % 4 == 1
orders_2: user_id % 4 == 2
orders_3: user_id % 4 == 3

3.2.2 使用ShardingSphere实现分库分表

# sharding.yaml 分库分表配置
dataSources:
  ds_0:
    url: jdbc:mysql://db0:3306/mydb
    username: root
    password: password
  ds_1:
    url: jdbc:mysql://db1:3306/mydb
    username: root
    password: password
  ds_2:
    url: jdbc:mysql://db2:3306/mydb
    username: root
    password: password
  ds_3:
    url: jdbc:mysql://db3:3306/mydb
    username: root
    password: password

shardingRule:
  tables:
    orders:
      actualDataNodes: ds_${0..3}.orders_${0..3}
      tableStrategy:
        standard:
          shardingColumn: user_id
          preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
      databaseStrategy:
        standard:
          shardingColumn: user_id
          preciseAlgorithmClassName: com.example.OrderDatabaseShardingAlgorithm
      keyGenerator:
        column: id
        type: SNOWFLAKE  # 使用雪花算法生成分布式ID
        props:
          worker.id: 1
  
  bindingTables:
    - orders,order_items  # 绑定表,避免跨库join
  
  defaultDatabaseStrategy:
    standard:
      shardingColumn: user_id
      preciseAlgorithmClassName: com.example.DefaultDatabaseShardingAlgorithm

# 分片算法实现
public class OrderDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        // 根据user_id取模决定数据库
        int dbIndex = (int) (userId % 4);
        return "ds_" + dbIndex;
    }
}

public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        // 根据user_id取模决定表
        int tableIndex = (int) (userId % 4);
        return "orders_" + tableIndex;
    }
}

3.2.3 分布式ID生成

// 雪花算法实现
public class SnowflakeIdGenerator {
    private final long datacenterId;
    private final long workerId;
    private long sequence = 0L;
    private long lastTimestamp = -1L;
    
    // 起始时间戳(2020-01-01 00:00:00)
    private final long twepoch = 1577836800000L;
    
    // 位数分配:41位时间戳 + 10位机器ID + 12位序列号
    private final long workerIdBits = 10L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;
    
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);
    
    public SnowflakeIdGenerator(long datacenterId, long workerId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }
    
    public synchronized long nextId() {
        long timestamp = timeGen();
        
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
        
        lastTimestamp = timestamp;
        
        return ((timestamp - twepoch) << timestampLeftShift)
                | (datacenterId << datacenterIdShift)
                | (workerId << workerIdShift)
                | sequence;
    }
    
    private long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
    
    private long timeGen() {
        return System.currentTimeMillis();
    }
}

// 使用示例
public class IdGeneratorService {
    private final SnowflakeIdGenerator idGenerator;
    
    public IdGeneratorService() {
        // 根据机器ID和数据中心ID初始化
        this.idGenerator = new SnowflakeIdGenerator(1, 1);
    }
    
    public Long generateOrderId() {
        return idGenerator.nextId();
    }
}

3.3 分布式事务

3.3.1 Seata分布式事务框架

// Seata AT模式示例
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private AccountService accountService;
    
    /**
     * 创建订单(分布式事务)
     */
    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
    public void createOrderWithDistributedTransaction(Order order) {
        // 1. 创建订单
        orderMapper.insert(order);
        
        // 2. 扣减库存(跨服务调用)
        inventoryService.deductStock(order.getProductId(), order.getQuantity());
        
        // 3. 扣减账户余额(跨服务调用)
        accountService.deductBalance(order.getUserId(), order.getAmount());
        
        // 4. 发送消息(异步,不参与分布式事务)
        sendOrderMessage(order);
    }
    
    /**
     * 扣减库存服务
     */
    @Transactional
    public void deductStock(Long productId, Integer quantity) {
        // 更新库存
        inventoryMapper.deductStock(productId, quantity);
    }
    
    /**
     * 扣减账户余额服务
     */
    @Transactional
    public void deductBalance(Long userId, BigDecimal amount) {
        // 更新账户余额
        accountMapper.deductBalance(userId, amount);
    }
}

3.3.2 Saga模式实现

// Saga模式:补偿事务
@Service
public class OrderSagaService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private AccountService accountService;
    
    /**
     * Saga模式创建订单
     */
    public void createOrderWithSaga(Order order) {
        try {
            // 1. 创建订单(主事务)
            orderMapper.insert(order);
            
            // 2. 扣减库存(补偿事务:恢复库存)
            inventoryService.deductStockWithCompensation(order.getProductId(), order.getQuantity());
            
            // 3. 扣减账户余额(补偿事务:恢复余额)
            accountService.deductBalanceWithCompensation(order.getUserId(), order.getAmount());
            
            // 4. 提交事务
            commitSaga(order);
            
        } catch (Exception e) {
            // 5. 异常时执行补偿操作
            compensateSaga(order);
            throw e;
        }
    }
    
    private void commitSaga(Order order) {
        // 更新订单状态为成功
        orderMapper.updateStatus(order.getId(), 1);
    }
    
    private void compensateSaga(Order order) {
        // 1. 恢复库存
        inventoryService.restoreStock(order.getProductId(), order.getQuantity());
        
        // 2. 恢复账户余额
        accountService.restoreBalance(order.getUserId(), order.getAmount());
        
        // 3. 更新订单状态为失败
        orderMapper.updateStatus(order.getId(), -1);
    }
}

3.4 数据库中间件

3.4.1 使用ProxySQL实现读写分离

-- ProxySQL配置示例
-- 1. 添加后端MySQL服务器
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) VALUES
(10, 'master', 3306, 100),  -- 主库,hostgroup_id=10
(20, 'slave1', 3306, 50),   -- 从库1,hostgroup_id=20
(20, 'slave2', 3306, 50);   -- 从库2,hostgroup_id=20

-- 2. 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_pattern, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*FOR UPDATE', 10, 1),  -- SELECT FOR UPDATE走主库
(2, 1, '^SELECT', 20, 1),              -- 普通SELECT走从库
(3, 1, '^(INSERT|UPDATE|DELETE)', 10, 1);  -- 写操作走主库

-- 3. 配置健康检查
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_connect_string';
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_ping_string';
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-monitor_read_only_string';

-- 4. 加载配置
LOAD MYSQL VARIABLES TO RUNTIME;
LOAD MYSQL SERVERS TO RUNTIME;
LOAD MYSQL QUERY RULES TO RUNTIME;
LOAD MYSQL USERS TO RUNTIME;

-- 5. 保存配置到磁盘
SAVE MYSQL VARIABLES TO DISK;
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
SAVE MYSQL USERS TO DISK;

3.4.2 使用ShardingSphere Proxy

# server.yaml
mode:
  type: Standalone
  repository:
    type: File
    props:
      path: ./conf

dataSources:
  ds_0:
    url: jdbc:mysql://master:3306/mydb
    username: root
    password: password
  ds_1:
    url: jdbc:mysql://slave1:3306/mydb
    username: root
    password: password
  ds_2:
    url: jdbc:mysql://slave2:3306/mydb
    username: root
    password: password

shardingRule:
  tables:
    orders:
      actualDataNodes: ds_0.orders
      tableStrategy:
        standard:
          shardingColumn: id
          preciseAlgorithmClassName: com.example.OrderShardingAlgorithm
      databaseStrategy:
        standard:
          shardingColumn: user_id
          preciseAlgorithmClassName: com.example.UserDatabaseShardingAlgorithm
  masterSlaveRules:
    ds_0:
      name: ds_0
      masterDataSourceName: ds_0
      slaveDataSourceNames:
        - ds_1
        - ds_2
      loadBalanceAlgorithmType: ROUND_ROBIN

# 配置ShardingSphere Proxy
# 启动命令:sharding-proxy -c server.yaml

四、监控与调优

4.1 监控指标

4.1.1 关键监控指标

-- 查看数据库连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 查看InnoDB缓冲池命中率
SHOW STATUS LIKE 'Innodb_buffer_pool_read%';

-- 计算缓冲池命中率
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100%

-- 查看锁等待情况
SHOW ENGINE INNODB STATUS\G

-- 查看表锁情况
SHOW OPEN TABLES WHERE In_use > 0;

4.1.2 使用Prometheus + Grafana监控

# prometheus.yml 配置
scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysql-exporter:9104']
    metrics_path: /metrics
    scrape_interval: 15s

# mysqld_exporter 配置
# 启动命令:mysqld_exporter --config.my-cnf=/etc/mysql/my.cnf
// Spring Boot Actuator + Micrometer 集成
@Configuration
public class MetricsConfig {
    
    @Bean
    public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
        return registry -> registry.config().commonTags("application", "order-service");
    }
    
    @Bean
    public DataSourcePoolMetrics dataSourcePoolMetrics(DataSource dataSource) {
        return new DataSourcePoolMetrics((HikariDataSource) dataSource, "hikari", Tags.empty());
    }
}

// 自定义监控指标
@Service
public class OrderMetricsService {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 记录订单创建耗时
     */
    public void recordOrderCreationTime(long duration) {
        Timer.builder("order.creation.time")
              .description("Order creation time")
              .publishPercentiles(0.5, 0.95, 0.99)
              .register(meterRegistry)
              .record(duration, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 记录数据库查询次数
     */
    public void recordDatabaseQuery(String queryType) {
        Counter.builder("database.query.count")
               .description("Database query count")
               .tag("type", queryType)
               .register(meterRegistry)
               .increment();
    }
}

4.2 性能调优工具

4.2.1 MySQL性能分析工具

# 1. 使用pt-query-digest分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 2. 使用Percona Toolkit
pt-mysql-summary --user=root --password=password

# 3. 使用MySQLTuner
perl mysqltuner.pl --user root --password password

# 4. 使用sys schema(MySQL 5.7+)
-- 查看最耗时的查询
SELECT * FROM sys.statements_with_temp_tables;

-- 查看未使用索引的表
SELECT * FROM sys.schema_unused_indexes;

-- 查看锁等待
SELECT * FROM sys.innodb_lock_waits;

4.2.2 性能测试工具

// JMeter 性能测试脚本示例
// 1. 创建线程组:100个线程,循环1000次
// 2. 添加HTTP请求:POST /api/orders
// 3. 添加监听器:查看结果树、聚合报告、图形结果

// 2. 使用Gatling进行高并发测试
public class OrderSimulation extends Simulation {
    HttpProtocolBuilder httpProtocol = http
        .baseUrl("http://localhost:8080")
        .acceptHeader("application/json")
        .contentTypeHeader("application/json");
    
    ScenarioBuilder scn = scenario("Order Creation")
        .exec(http("Create Order")
            .post("/api/orders")
            .body(StringBody("""
                {
                    "userId": 12345,
                    "productId": 1001,
                    "quantity": 2,
                    "amount": 199.99
                }
                """))
            .check(status().is(200)));
    
    public OrderSimulation() {
        setUp(
            scn.injectOpen(
                atOnceUsers(100),      // 立即启动100个用户
                rampUsersPerSec(100).to(500).during(60), // 60秒内从100增加到500用户/秒
                constantUsersPerSec(500).during(120)     // 保持500用户/秒持续120秒
            )
        ).protocols(httpProtocol);
    }
}

五、实战案例:电商秒杀系统

5.1 系统架构设计

用户请求 → CDN → 负载均衡 → 应用服务器集群
          ↓
      Redis集群(缓存库存)
          ↓
      消息队列(RabbitMQ/Kafka)
          ↓
      MySQL分库分表(订单库、商品库)
          ↓
      异步处理(邮件、通知、统计)

5.2 核心代码实现

5.2.1 秒杀服务

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private ProductMapper productMapper;
    
    // 秒杀商品库存Key前缀
    private static final String SECKILL_STOCK_PREFIX = "seckill:stock:";
    
    // 秒杀订单Key前缀
    private static final String SECKILL_ORDER_PREFIX = "seckill:order:";
    
    /**
     * 秒杀下单
     */
    @Transactional
    public SeckillResult seckill(Long productId, Long userId) {
        String stockKey = SECKILL_STOCK_PREFIX + productId;
        String orderKey = SECKILL_ORDER_PREFIX + productId + ":" + userId;
        
        // 1. 检查是否已秒杀过
        if (redisTemplate.hasKey(orderKey)) {
            return SeckillResult.fail("您已秒杀过该商品");
        }
        
        // 2. 扣减库存(使用Redis原子操作)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            // 恢复库存
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 生成秒杀订单
        Long orderId = generateSeckillOrderId();
        SeckillOrder order = new SeckillOrder();
        order.setId(orderId);
        order.setProductId(productId);
        order.setUserId(userId);
        order.setQuantity(1);
        order.setStatus(0); // 待支付
        
        // 4. 保存订单到Redis(临时存储)
        redisTemplate.opsForValue().set(orderKey, order, 30, TimeUnit.MINUTES);
        
        // 5. 发送消息到队列(异步持久化到数据库)
        SeckillMessage message = new SeckillMessage();
        message.setOrderId(orderId);
        message.setProductId(productId);
        message.setUserId(userId);
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
        
        return SeckillResult.success(orderId);
    }
    
    /**
     * 异步处理秒杀订单
     */
    @RabbitListener(queues = "seckill.queue")
    public void processSeckillOrder(SeckillMessage message) {
        try {
            // 1. 从Redis获取订单
            String orderKey = SECKILL_ORDER_PREFIX + message.getProductId() + ":" + message.getUserId();
            SeckillOrder order = (SeckillOrder) redisTemplate.opsForValue().get(orderKey);
            
            if (order == null) {
                log.error("秒杀订单不存在: {}", message);
                return;
            }
            
            // 2. 持久化到数据库
            orderMapper.insert(order);
            
            // 3. 扣减数据库库存(异步)
            productMapper.decrementStock(message.getProductId(), 1);
            
            // 4. 发送支付通知
            sendPaymentNotification(order);
            
            // 5. 删除Redis临时订单(可选,保留一段时间用于查询)
            // redisTemplate.delete(orderKey);
            
        } catch (Exception e) {
            log.error("处理秒杀订单失败", e);
            // 异常处理:恢复库存等
        }
    }
    
    /**
     * 生成分布式订单ID
     */
    private Long generateSeckillOrderId() {
        // 使用雪花算法生成ID
        SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(1, 1);
        return idGenerator.nextId();
    }
    
    /**
     * 发送支付通知
     */
    private void sendPaymentNotification(SeckillOrder order) {
        // 发送短信、邮件等通知
    }
}

5.2.2 库存预热

@Service
public class StockPreheatService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    /**
     * 秒杀前预热库存到Redis
     */
    public void preheatStock(Long productId, Integer stock) {
        String stockKey = SECKILL_STOCK_PREFIX + productId;
        
        // 1. 检查Redis中是否已有库存
        Long currentStock = (Long) redisTemplate.opsForValue().get(stockKey);
        if (currentStock != null && currentStock > 0) {
            log.warn("Redis中已存在库存,跳过预热");
            return;
        }
        
        // 2. 预热库存到Redis
        redisTemplate.opsForValue().set(stockKey, stock, 30, TimeUnit.MINUTES);
        
        // 3. 设置库存预警(当库存低于阈值时报警)
        redisTemplate.opsForValue().set("seckill:stock:alert:" + productId, stock, 30, TimeUnit.MINUTES);
    }
    
    /**
     * 定时同步库存到数据库
     */
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void syncStockToDatabase() {
        // 获取所有秒杀商品
        Set<String> keys = redisTemplate.keys(SECKILL_STOCK_PREFIX + "*");
        if (keys == null || keys.isEmpty()) {
            return;
        }
        
        for (String key : keys) {
            Long stock = (Long) redisTemplate.opsForValue().get(key);
            if (stock == null) {
                continue;
            }
            
            // 解析商品ID
            Long productId = Long.parseLong(key.substring(SECKILL_STOCK_PREFIX.length()));
            
            // 更新数据库库存
            productMapper.updateStock(productId, stock);
        }
    }
}

5.2.3 限流与降级

@Component
public class RateLimiter {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 令牌桶限流
     */
    public boolean tryAcquire(String key, int permits, long timeout) {
        String luaScript = """
            local key = KEYS[1]
            local permits = tonumber(ARGV[1])
            local timeout = tonumber(ARGV[2])
            
            local current = redis.call('GET', key)
            if current == false then
                redis.call('SET', key, permits)
                redis.call('EXPIRE', key, timeout)
                return 1
            end
            
            local currentNum = tonumber(current)
            if currentNum >= permits then
                redis.call('DECRBY', key, permits)
                return 1
            else
                return 0
            end
            """;
        
        RedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(key), permits, timeout);
        return result != null && result == 1;
    }
    
    /**
     * 限流注解
     */
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RateLimit {
        String key() default "";
        int permits() default 100;
        long timeout() default 60;
        String message() default "请求过于频繁,请稍后再试";
    }
    
    /**
     * AOP切面处理限流
     */
    @Aspect
    @Component
    public class RateLimitAspect {
        
        @Autowired
        private RateLimiter rateLimiter;
        
        @Around("@annotation(rateLimit)")
        public Object around(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
            String key = rateLimit.key();
            if (StringUtils.isEmpty(key)) {
                // 生成默认key:类名+方法名
                String className = joinPoint.getTarget().getClass().getSimpleName();
                String methodName = joinPoint.getSignature().getName();
                key = className + ":" + methodName;
            }
            
            if (!rateLimiter.tryAcquire(key, rateLimit.permits(), rateLimit.timeout())) {
                throw new RuntimeException(rateLimit.message());
            }
            
            return joinPoint.proceed();
        }
    }
}

// 使用示例
@Service
public class SeckillController {
    
    @Autowired
    private SeckillService seckillService;
    
    @RateLimit(key = "seckill:limit", permits = 1000, timeout = 60, message = "秒杀过于火爆,请稍后再试")
    @PostMapping("/seckill")
    public SeckillResult seckill(@RequestBody SeckillRequest request) {
        return seckillService.seckill(request.getProductId(), request.getUserId());
    }
}

六、总结与最佳实践

6.1 优化优先级

  1. 索引优化:成本最低,效果最明显
  2. 查询优化:避免慢查询,减少数据库压力
  3. 缓存策略:减少数据库访问次数
  4. 读写分离:提升读性能,分担写压力
  5. 分库分表:解决数据量和并发量瓶颈
  6. 架构升级:从单机到分布式,从同步到异步

6.2 常见误区

  1. 过早优化:不要在没有性能问题时进行复杂的优化
  2. 过度分片:分片会增加复杂度,应根据实际数据量决定
  3. 缓存滥用:缓存不是万能的,需要考虑数据一致性
  4. 忽略监控:没有监控的优化是盲目的

6.3 持续优化

  1. 定期审查:每季度审查慢查询和索引使用情况
  2. 压力测试:定期进行压力测试,发现性能瓶颈
  3. 监控告警:建立完善的监控体系,及时发现问题
  4. 技术演进:关注MySQL新版本特性,适时升级

通过以上策略的综合运用,可以有效提升MySQL在高并发场景下的性能表现。记住,没有银弹,需要根据具体业务场景选择合适的优化方案,并持续监控和调整。