在当今互联网应用中,高并发场景已成为常态。无论是电商平台的秒杀活动、社交媒体的实时互动,还是金融系统的交易处理,MySQL作为最流行的关系型数据库之一,常常面临海量请求冲击和性能瓶颈的挑战。本文将深入探讨MySQL在高并发环境下的处理策略,从架构设计、配置优化到代码实现,提供一套完整的解决方案。

一、高并发场景下的MySQL性能瓶颈分析

1.1 常见性能瓶颈点

在高并发场景下,MySQL通常会在以下几个方面遇到瓶颈:

  • 连接数瓶颈:当并发连接数超过MySQL的最大连接数(max_connections)时,新连接会被拒绝
  • CPU瓶颈:复杂查询、大量排序或临时表操作会导致CPU使用率飙升
  • I/O瓶颈:频繁的磁盘读写操作,特别是随机I/O,会成为性能瓶颈
  • 锁竞争:行锁、表锁、间隙锁等导致的等待和死锁
  • 内存瓶颈:缓冲池(Buffer Pool)不足导致频繁的磁盘I/O

1.2 性能监控与诊断

在优化之前,我们需要先诊断问题。以下是常用的监控工具和方法:

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看最大连接数
SHOW VARIABLES LIKE 'max_connections';

-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query_log';

-- 查看InnoDB缓冲池状态
SHOW ENGINE INNODB STATUS\G

-- 查看当前正在运行的查询
SHOW PROCESSLIST;

示例:使用Percona Toolkit进行性能分析

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log

# 分析表结构
pt-table-checksum h=localhost,u=root,p=password

# 检查复制延迟
pt-heartbeat --update --host=localhost --user=root --password=password

二、架构层面的优化策略

2.1 读写分离架构

读写分离是应对高并发读操作的有效策略。通过将读请求分发到多个从库,减轻主库压力。

实现方案:

  1. 使用ProxySQL作为中间件
-- ProxySQL配置示例
-- 添加主库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) 
VALUES (10, 'master.example.com', 3306, 100);

-- 添加从库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) 
VALUES (20, 'slave1.example.com', 3306, 100);
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) 
VALUES (20, 'slave2.example.com', 3306, 100);

-- 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply) 
VALUES (1, 1, '^SELECT.*FOR UPDATE', 10, 1);
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply) 
VALUES (2, 1, '^SELECT', 20, 1);
  1. 使用Spring Boot + ShardingSphere实现读写分离
@Configuration
public class DataSourceConfig {
    
    @Bean
    public DataSource masterDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl("jdbc:mysql://master:3306/mydb");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        return dataSource;
    }
    
    @Bean
    public DataSource slaveDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl("jdbc:mysql://slave:3306/mydb");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        return dataSource;
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

2.2 分库分表策略

当单表数据量超过千万级别时,需要考虑分库分表。

垂直分表示例:

-- 原始表结构
CREATE TABLE user (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    address TEXT,
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

-- 垂直分表后
CREATE TABLE user_basic (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);

CREATE TABLE user_address (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    address TEXT,
    FOREIGN KEY (user_id) REFERENCES user_basic(id)
);

水平分表示例(按用户ID哈希):

-- 分片规则:user_id % 4
-- 表结构
CREATE TABLE user_0 (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at TIMESTAMP
);

CREATE TABLE user_1 (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at TIMESTAMP
);

-- ... user_2, user_3

使用ShardingSphere实现分库分表:

# sharding.yaml
dataSources:
  ds_0: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_0
    username: root
    password: password
  ds_1: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_1
    username: root
    password: password

shardingRule:
  tables:
    user:
      actualDataNodes: ds_${0..1}.user_${0..3}
      tableStrategy:
        inline:
          shardingColumn: id
          algorithmExpression: user_${id % 4}
      databaseStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: ds_${user_id % 2}
  bindingTables:
    - user

2.3 缓存策略

引入缓存层可以显著减少数据库压力。

Redis缓存示例:

@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String USER_CACHE_KEY = "user:";
    private static final long CACHE_TTL = 3600; // 1小时
    
    public User getUserById(Long id) {
        String cacheKey = USER_CACHE_KEY + id;
        
        // 1. 先从缓存获取
        User user = (User) redisTemplate.opsForValue().get(cacheKey);
        if (user != null) {
            return user;
        }
        
        // 2. 缓存未命中,查询数据库
        user = userRepository.findById(id).orElse(null);
        
        // 3. 写入缓存
        if (user != null) {
            redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return user;
    }
    
    public void updateUser(User user) {
        // 更新数据库
        userRepository.save(user);
        
        // 删除缓存
        String cacheKey = USER_CACHE_KEY + user.getId();
        redisTemplate.delete(cacheKey);
    }
}

缓存穿透、击穿、雪崩解决方案:

// 解决缓存穿透:使用空值缓存
public User getUserByIdWithNullCache(Long id) {
    String cacheKey = USER_CACHE_KEY + id;
    String nullKey = cacheKey + ":null";
    
    // 检查空值缓存
    if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
        return null;
    }
    
    // 检查正常缓存
    User user = (User) redisTemplate.opsForValue().get(cacheKey);
    if (user != null) {
        return user;
    }
    
    // 查询数据库
    user = userRepository.findById(id).orElse(null);
    
    if (user != null) {
        redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
    } else {
        // 缓存空值,防止缓存穿透
        redisTemplate.opsForValue().set(nullKey, "", 60, TimeUnit.SECONDS);
    }
    
    return user;
}

// 解决缓存击穿:使用分布式锁
public User getUserByIdWithLock(Long id) {
    String cacheKey = USER_CACHE_KEY + id;
    String lockKey = "lock:" + cacheKey;
    
    // 尝试获取缓存
    User user = (User) redisTemplate.opsForValue().get(cacheKey);
    if (user != null) {
        return user;
    }
    
    // 获取分布式锁
    Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
    if (Boolean.TRUE.equals(lockAcquired)) {
        try {
            // 双重检查
            user = (User) redisTemplate.opsForValue().get(cacheKey);
            if (user != null) {
                return user;
            }
            
            // 查询数据库
            user = userRepository.findById(id).orElse(null);
            
            // 写入缓存
            if (user != null) {
                redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
            }
        } finally {
            // 释放锁
            redisTemplate.delete(lockKey);
        }
    } else {
        // 等待并重试
        try {
            Thread.sleep(100);
            return getUserByIdWithLock(id);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
    
    return user;
}

三、MySQL配置优化

3.1 关键参数调优

InnoDB缓冲池配置:

# my.cnf 配置示例
[mysqld]
# 缓冲池大小,通常设置为物理内存的50%-70%
innodb_buffer_pool_size = 16G

# 缓冲池实例数,根据CPU核心数调整
innodb_buffer_pool_instances = 8

# 缓冲池预热,重启后自动加载热点数据
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON

# 日志文件大小,建议1-2G
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M

# 刷新策略
innodb_flush_log_at_trx_commit = 1  # 事务提交时立即刷盘(ACID保证)
innodb_flush_method = O_DIRECT      # 直接I/O,避免双缓冲

# 并发设置
innodb_thread_concurrency = 0      # 0表示自动管理
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 锁相关
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = OFF

连接相关配置:

# 连接数配置
max_connections = 2000
max_user_connections = 1800

# 连接超时
wait_timeout = 600
interactive_timeout = 600

# 连接池配置
thread_cache_size = 100
thread_handling = pool-of-threads

# 线程栈大小(根据业务调整)
thread_stack = 256K

查询缓存配置(MySQL 8.0已移除,5.7及以下版本):

# 查询缓存(MySQL 5.7及以下)
query_cache_type = 1
query_cache_size = 128M
query_cache_limit = 2M

3.2 索引优化策略

复合索引设计原则:

-- 错误示例:索引列顺序不合理
CREATE INDEX idx_user ON orders (user_id, order_date, status);

-- 正确示例:遵循最左前缀原则
CREATE INDEX idx_user_date_status ON orders (user_id, order_date, status);

-- 查询示例
-- 能用到索引
SELECT * FROM orders WHERE user_id = 123 AND order_date = '2023-01-01';
SELECT * FROM orders WHERE user_id = 123;

-- 不能用到索引
SELECT * FROM orders WHERE order_date = '2023-01-01';

覆盖索引优化:

-- 原始查询(需要回表)
SELECT user_id, username, email FROM users WHERE age > 25;

-- 创建覆盖索引
CREATE INDEX idx_age_cover ON users (age, user_id, username, email);

-- 优化后的查询(索引覆盖,无需回表)
SELECT user_id, username, email FROM users WHERE age > 25;

索引下推优化(ICP):

-- MySQL 5.6+ 支持索引下推
-- 原始查询
SELECT * FROM users WHERE age > 25 AND name LIKE '张%';

-- 创建索引
CREATE INDEX idx_age_name ON users (age, name);

-- 索引下推会先在索引中过滤name条件,再回表
-- 通过EXPLAIN查看是否使用ICP
EXPLAIN SELECT * FROM users WHERE age > 25 AND name LIKE '张%';
-- 查看Extra列是否包含"Using index condition"

3.3 查询优化技巧

*避免SELECT **:**

-- 错误示例
SELECT * FROM orders WHERE user_id = 123;

-- 正确示例:只选择需要的列
SELECT order_id, order_date, total_amount FROM orders WHERE user_id = 123;

使用LIMIT分页优化:

-- 传统分页(深度分页性能差)
SELECT * FROM orders ORDER BY order_date DESC LIMIT 1000000, 10;

-- 优化方案1:使用子查询
SELECT * FROM orders 
WHERE order_id IN (
    SELECT order_id FROM orders 
    ORDER BY order_date DESC 
    LIMIT 1000000, 10
);

-- 优化方案2:使用游标(推荐)
SELECT * FROM orders 
WHERE order_date < '2023-01-01' 
ORDER BY order_date DESC 
LIMIT 10;

-- 优化方案3:使用ES或ClickHouse等搜索引擎

JOIN优化:

-- 错误示例:多表JOIN
SELECT o.*, u.username, p.product_name 
FROM orders o 
JOIN users u ON o.user_id = u.id 
JOIN products p ON o.product_id = p.id 
WHERE o.order_date > '2023-01-01';

-- 优化方案:拆分查询
-- 第一步:查询订单
SELECT o.* FROM orders o WHERE o.order_date > '2023-01-01';

-- 第二步:批量查询用户信息
SELECT u.id, u.username FROM users u WHERE u.id IN (1,2,3,...);

-- 第三步:批量查询产品信息
SELECT p.id, p.product_name FROM products p WHERE p.id IN (1,2,3,...);

四、应用层优化策略

4.1 连接池配置

HikariCP连接池配置示例:

@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        
        // 基本配置
        dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        
        // 连接池配置
        dataSource.setMaximumPoolSize(50);        // 最大连接数
        dataSource.setMinimumIdle(10);            // 最小空闲连接
        dataSource.setConnectionTimeout(30000);   // 连接超时30秒
        dataSource.setIdleTimeout(600000);        // 空闲超时10分钟
        dataSource.setMaxLifetime(1800000);       // 连接最大存活时间30分钟
        
        // 性能优化
        dataSource.setConnectionTestQuery("SELECT 1");
        dataSource.setValidationTimeout(5000);
        dataSource.setLeakDetectionThreshold(60000);
        
        // 连接预热
        dataSource.setInitializationFailTimeout(1);
        
        return dataSource;
    }
}

连接池监控:

@Component
public class ConnectionPoolMonitor {
    
    @Autowired
    private HikariDataSource dataSource;
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void monitorConnectionPool() {
        HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
        
        System.out.println("=== 连接池监控 ===");
        System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
        System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
        System.out.println("总连接数: " + poolMXBean.getTotalConnections());
        System.out.println("等待连接数: " + poolMXBean.getThreadsAwaitingConnection());
        
        // 如果等待连接数过多,发出告警
        if (poolMXBean.getThreadsAwaitingConnection() > 10) {
            System.err.println("警告:连接池等待队列过长!");
        }
    }
}

4.2 批量操作优化

批量插入优化:

// 批量插入示例
public void batchInsert(List<User> users) {
    String sql = "INSERT INTO user (username, email, age) VALUES (?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(sql)) {
        
        // 关闭自动提交
        conn.setAutoCommit(false);
        
        int batchSize = 1000; // 每批1000条
        for (int i = 0; i < users.size(); i++) {
            User user = users.get(i);
            ps.setString(1, user.getUsername());
            ps.setString(2, user.getEmail());
            ps.setInt(3, user.getAge());
            ps.addBatch();
            
            // 达到批次大小时执行
            if ((i + 1) % batchSize == 0) {
                ps.executeBatch();
                conn.commit();
            }
        }
        
        // 执行剩余记录
        ps.executeBatch();
        conn.commit();
        
    } catch (SQLException e) {
        e.printStackTrace();
    }
}

批量更新优化:

// 批量更新示例
public void batchUpdate(List<User> users) {
    String sql = "UPDATE user SET username = ?, email = ? WHERE id = ?";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false);
        
        for (User user : users) {
            ps.setString(1, user.getUsername());
            ps.setString(2, user.getEmail());
            ps.setLong(3, user.getId());
            ps.addBatch();
        }
        
        ps.executeBatch();
        conn.commit();
        
    } catch (SQLException e) {
        e.printStackTrace();
    }
}

4.3 异步处理与消息队列

使用消息队列削峰填谷:

@Service
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    // 同步接口(快速响应)
    @PostMapping("/orders")
    public ResponseEntity<String> createOrder(@RequestBody Order order) {
        // 1. 基本验证
        if (order == null || order.getUserId() == null) {
            return ResponseEntity.badRequest().body("参数错误");
        }
        
        // 2. 生成订单ID
        order.setId(generateOrderId());
        order.setStatus("PENDING");
        order.setCreatedAt(new Date());
        
        // 3. 发送到消息队列(异步处理)
        rabbitTemplate.convertAndSend("order.exchange", "order.create", order);
        
        // 4. 立即返回,不等待数据库写入
        return ResponseEntity.ok("订单已接收,ID: " + order.getId());
    }
    
    // 消费者处理消息
    @RabbitListener(queues = "order.queue")
    public void processOrder(Order order) {
        try {
            // 1. 业务逻辑处理
            validateOrder(order);
            calculateTotalAmount(order);
            
            // 2. 保存到数据库
            orderRepository.save(order);
            
            // 3. 发送成功事件
            rabbitTemplate.convertAndSend("order.exchange", "order.success", order);
            
        } catch (Exception e) {
            // 4. 失败处理
            order.setStatus("FAILED");
            order.setErrorMessage(e.getMessage());
            orderRepository.save(order);
            
            // 发送到死信队列
            rabbitTemplate.convertAndSend("order.exchange", "order.dead", order);
        }
    }
}

五、高并发场景下的特殊优化

5.1 秒杀场景优化

秒杀系统架构设计:

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private static final String SECKILL_KEY = "seckill:stock:";
    private static final String SECKILL_USER_KEY = "seckill:user:";
    
    // 秒杀接口
    @Transactional
    public SeckillResult seckill(Long userId, Long productId) {
        String stockKey = SECKILL_KEY + productId;
        String userKey = SECKILL_USER_KEY + productId;
        
        // 1. 校验用户是否已参与
        if (Boolean.TRUE.equals(redisTemplate.hasKey(userKey + ":" + userId))) {
            return SeckillResult.fail("您已参与过该秒杀");
        }
        
        // 2. 检查库存(Redis预减库存)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            // 库存不足,回滚
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 标记用户已参与
        redisTemplate.opsForValue().set(userKey + ":" + userId, "1", 30, TimeUnit.MINUTES);
        
        // 4. 发送消息到队列异步创建订单
        SeckillOrder order = new SeckillOrder();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setSeckillTime(new Date());
        order.setStatus("PROCESSING");
        
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.create", order);
        
        return SeckillResult.success("秒杀成功,订单处理中");
    }
    
    // 消费者处理秒杀订单
    @RabbitListener(queues = "seckill.queue")
    public void processSeckillOrder(SeckillOrder order) {
        try {
            // 1. 检查库存(数据库)
            Product product = productRepository.findById(order.getProductId());
            if (product == null || product.getStock() <= 0) {
                // 库存不足,回滚Redis库存
                redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
                return;
            }
            
            // 2. 扣减数据库库存
            int updated = productRepository.decrementStock(order.getProductId());
            if (updated == 0) {
                // 扣减失败,回滚Redis库存
                redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
                return;
            }
            
            // 3. 创建订单
            order.setStatus("SUCCESS");
            orderRepository.save(order);
            
            // 4. 发送成功通知
            sendSuccessNotification(order);
            
        } catch (Exception e) {
            // 异常处理:回滚Redis库存
            redisTemplate.opsForValue().increment(SECKILL_KEY + order.getProductId());
        }
    }
}

5.2 实时统计场景优化

使用物化视图或汇总表:

-- 创建汇总表
CREATE TABLE daily_sales_summary (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    date DATE NOT NULL,
    product_id BIGINT NOT NULL,
    total_sales DECIMAL(10,2) NOT NULL,
    total_orders INT NOT NULL,
    UNIQUE KEY uk_date_product (date, product_id)
);

-- 创建定时任务更新汇总表
DELIMITER $$
CREATE EVENT update_daily_sales_summary
ON SCHEDULE EVERY 1 DAY
STARTS '2023-01-01 00:00:00'
DO
BEGIN
    -- 清空当日数据
    DELETE FROM daily_sales_summary WHERE date = CURDATE();
    
    -- 插入新数据
    INSERT INTO daily_sales_summary (date, product_id, total_sales, total_orders)
    SELECT 
        DATE(o.created_at) as date,
        o.product_id,
        SUM(o.amount) as total_sales,
        COUNT(o.id) as total_orders
    FROM orders o
    WHERE DATE(o.created_at) = CURDATE()
    GROUP BY DATE(o.created_at), o.product_id;
END$$
DELIMITER ;

使用ClickHouse进行实时分析:

-- ClickHouse表结构
CREATE TABLE orders_analytics (
    order_id UInt64,
    user_id UInt64,
    product_id UInt64,
    amount Decimal(10,2),
    created_at DateTime,
    status String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY (user_id, created_at);

-- 实时查询
SELECT 
    toStartOfHour(created_at) as hour,
    count() as order_count,
    sum(amount) as total_amount
FROM orders_analytics
WHERE created_at >= now() - INTERVAL 1 HOUR
GROUP BY hour
ORDER BY hour;

六、监控与告警体系

6.1 监控指标

关键监控指标:

  1. 连接数监控
-- 监控连接数
SELECT 
    COUNT(*) as total_connections,
    SUM(CASE WHEN COMMAND = 'Sleep' THEN 1 ELSE 0 END) as sleep_connections,
    SUM(CASE WHEN COMMAND = 'Query' THEN 1 ELSE 0 END) as query_connections
FROM INFORMATION_SCHEMA.PROCESSLIST;
  1. 慢查询监控
-- 查看慢查询日志
SHOW VARIABLES LIKE 'slow_query_log%';
SHOW VARIABLES LIKE 'long_query_time';

-- 分析慢查询
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    SUM_TIMER_WAIT/1000000000 as total_time_sec,
    AVG_TIMER_WAIT/1000000000 as avg_time_sec
FROM performance_schema.events_statements_summary_by_digest
WHERE SUM_TIMER_WAIT > 0
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
  1. InnoDB状态监控
-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G

-- 重点关注:
-- - Transactions: 事务状态
-- - Row operations: 行操作统计
-- - Buffer pool hit rate: 缓冲池命中率

6.2 监控工具集成

Prometheus + Grafana监控方案:

# prometheus.yml 配置
scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysql-exporter:9104']
    metrics_path: '/metrics'
    params:
      collect[]:
        - global_status
        - global_variables
        - slave_status
        - innodb_metrics
        - performance_schema.tablelocks
        - performance_schema.table_io_waits_summary_by_table

MySQL Exporter配置:

# 启动MySQL Exporter
docker run -d \
  --name mysql-exporter \
  -p 9104:9104 \
  -e DATA_SOURCE_NAME="user:password@(mysql:3306)/" \
  prom/mysqld-exporter \
  --collect.global_status \
  --collect.global_variables \
  --collect.slave_status \
  --collect.innodb_metrics \
  --collect.performance_schema.tablelocks \
  --collect.performance_schema.table_io_waits_summary_by_table

Grafana Dashboard配置示例:

{
  "dashboard": {
    "title": "MySQL Performance Dashboard",
    "panels": [
      {
        "title": "Connections",
        "targets": [
          {
            "expr": "mysql_global_status_threads_connected",
            "legendFormat": "Connected"
          },
          {
            "expr": "mysql_global_variables_max_connections",
            "legendFormat": "Max"
          }
        ]
      },
      {
        "title": "Query Performance",
        "targets": [
          {
            "expr": "rate(mysql_global_status_queries[5m])",
            "legendFormat": "Queries/sec"
          },
          {
            "expr": "rate(mysql_global_status_slow_queries[5m])",
            "legendFormat": "Slow Queries/sec"
          }
        ]
      }
    ]
  }
}

6.3 告警规则

Prometheus告警规则示例:

# mysql_alerts.yml
groups:
  - name: mysql_alerts
    rules:
      - alert: MySQLHighConnections
        expr: mysql_global_status_threads_connected / mysql_global_variables_max_connections > 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "MySQL连接数过高"
          description: "MySQL连接数已达到最大连接数的80%以上"
      
      - alert: MySQLSlowQueries
        expr: rate(mysql_global_status_slow_queries[5m]) > 10
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "MySQL慢查询过多"
          description: "MySQL每秒慢查询超过10个"
      
      - alert: MySQLReplicationLag
        expr: mysql_slave_lag_seconds > 30
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "MySQL复制延迟"
          description: "MySQL从库复制延迟超过30秒"

七、实战案例:电商秒杀系统优化

7.1 系统架构设计

用户请求 → Nginx负载均衡 → 应用服务器集群 → Redis集群 → MySQL主从集群
                    ↓
                消息队列(RabbitMQ/Kafka)
                    ↓
                异步订单处理服务
                    ↓
                MySQL分库分表(订单表)

7.2 关键代码实现

Redis预减库存:

@Component
public class RedisStockService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 预减库存
     * @param productId 商品ID
     * @param quantity 数量
     * @return true: 成功,false: 失败
     */
    public boolean preReduceStock(Long productId, int quantity) {
        String key = "seckill:stock:" + productId;
        
        // 使用Lua脚本保证原子性
        String luaScript = 
            "if redis.call('exists', KEYS[1]) == 1 then " +
            "   local stock = tonumber(redis.call('get', KEYS[1])); " +
            "   if stock >= tonumber(ARGV[1]) then " +
            "       redis.call('decrby', KEYS[1], ARGV[1]); " +
            "       return 1; " +
            "   else " +
            "       return 0; " +
            "   end " +
            "else " +
            "   return 0; " +
            "end";
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        Long result = redisTemplate.execute(script, Collections.singletonList(key), String.valueOf(quantity));
        
        return result != null && result == 1;
    }
    
    /**
     * 回滚库存
     * @param productId 商品ID
     * @param quantity 数量
     */
    public void rollbackStock(Long productId, int quantity) {
        String key = "seckill:stock:" + productId;
        redisTemplate.opsForValue().increment(key, quantity);
    }
}

分布式锁实现:

@Component
public class DistributedLock {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String LOCK_PREFIX = "lock:";
    private static final long DEFAULT_TIMEOUT = 30000; // 30秒
    
    /**
     * 获取分布式锁
     * @param lockKey 锁键
     * @param timeout 超时时间(毫秒)
     * @return 锁标识
     */
    public String acquireLock(String lockKey, long timeout) {
        String lockId = UUID.randomUUID().toString();
        String fullLockKey = LOCK_PREFIX + lockKey;
        
        long startTime = System.currentTimeMillis();
        
        while (System.currentTimeMillis() - startTime < timeout) {
            Boolean acquired = redisTemplate.opsForValue()
                .setIfAbsent(fullLockKey, lockId, 10, TimeUnit.SECONDS);
            
            if (Boolean.TRUE.equals(acquired)) {
                return lockId;
            }
            
            // 等待一段时间后重试
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        
        return null;
    }
    
    /**
     * 释放分布式锁
     * @param lockKey 锁键
     * @param lockId 锁标识
     */
    public void releaseLock(String lockKey, String lockId) {
        String fullLockKey = LOCK_PREFIX + lockKey;
        
        // 使用Lua脚本保证原子性
        String luaScript = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "   return redis.call('del', KEYS[1]); " +
            "else " +
            "   return 0; " +
            "end";
        
        RedisScript<Long> script = RedisScript.of(luaScript, Long.class);
        redisTemplate.execute(script, Collections.singletonList(fullLockKey), lockId);
    }
}

订单分片策略:

@Component
public class OrderShardingService {
    
    /**
     * 根据用户ID计算分片
     * @param userId 用户ID
     * @return 分片索引
     */
    public int calculateShard(Long userId) {
        // 使用一致性哈希算法
        int hash = userId.hashCode();
        int shard = Math.abs(hash) % 4; // 4个分片
        return shard;
    }
    
    /**
     * 获取分片数据源
     * @param shard 分片索引
     * @return 数据源名称
     */
    public String getDataSourceName(int shard) {
        return "order_ds_" + shard;
    }
    
    /**
     * 获取分片表名
     * @param shard 分片索引
     * @return 表名
     */
    public String getTableName(int shard) {
        return "order_" + shard;
    }
}

八、总结与最佳实践

8.1 高并发优化检查清单

  1. 架构层面

    • [ ] 是否采用读写分离?
    • [ ] 是否需要分库分表?
    • [ ] 是否引入缓存层?
    • [ ] 是否使用消息队列削峰?
  2. 数据库配置

    • [ ] 缓冲池大小是否合理?
    • [ ] 连接数配置是否足够?
    • [ ] 日志文件大小是否合适?
    • [ ] 索引是否优化?
  3. 应用层

    • [ ] 连接池配置是否合理?
    • [ ] 是否使用批量操作?
    • [ ] 是否避免SELECT *?
    • [ ] 是否优化了JOIN查询?
  4. 监控告警

    • [ ] 是否监控连接数?
    • [ ] 是否监控慢查询?
    • [ ] 是否监控复制延迟?
    • [ ] 是否设置告警规则?

8.2 性能优化原则

  1. 先监控,后优化:没有数据支撑的优化是盲目的
  2. 分层优化:从架构、配置、代码逐层优化
  3. 渐进式优化:每次只优化一个点,验证效果
  4. 保持简单:避免过度设计,选择最合适的方案
  5. 持续迭代:性能优化是一个持续的过程

8.3 常见误区

  1. 盲目增加硬件:硬件升级不能解决所有问题
  2. 过度索引:索引会增加写操作开销
  3. 忽视锁竞争:锁是高并发的隐形杀手
  4. 缓存滥用:缓存不是银弹,需要合理设计
  5. 忽略监控:没有监控的系统是不可靠的

通过以上策略的综合应用,可以显著提升MySQL在高并发场景下的性能表现。记住,没有一劳永逸的解决方案,需要根据具体业务场景和数据特点,持续监控、分析和优化。