引言:理解高并发场景下的MySQL挑战

在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库之一,在高并发环境下常常面临性能瓶颈和稳定性挑战。

高并发对MySQL的主要影响包括:

  • 连接数耗尽:大量并发请求导致数据库连接池耗尽
  • CPU负载过高:复杂的查询和频繁的索引查找消耗大量CPU资源
  • I/O瓶颈:频繁的磁盘读写操作导致I/O等待
  • 锁竞争:行锁、表锁等资源竞争导致事务阻塞
  • 内存不足:缓冲池命中率下降,频繁的磁盘交换

本文将从架构设计、SQL优化、配置调优、缓存策略等多个维度,提供一套完整的MySQL高并发处理实战指南。

一、架构层面的优化策略

1.1 读写分离架构

读写分离是应对高并发最有效的架构模式之一。通过将读操作和写操作分离到不同的数据库实例,可以显著减轻主库压力。

实现方案:

  • 主库(Master):处理所有写操作(INSERT、UPDATE、DELETE)
  • 从库(Slave):处理所有读操作(SELECT)
  • 中间件:使用ProxySQL、MyCat或ShardingSphere进行路由

代码示例:使用Spring Boot实现读写分离

@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
    
    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(routingDataSource());
        return sessionFactory.getObject();
    }
}

// 数据源上下文管理器
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }
    
    public static String getDataSource() {
        return contextHolder.get("master");
    }
    
    public static void clear() {
        contextHolder.remove();
    }
}

// AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.*(..))")
    public void before(JoinPoint joinPoint) {
        String methodName = joinPoint.getSignature().getName();
        if (methodName.startsWith("get") || methodName.startsWith("list") || methodName.startsWith("find")) {
            DataSourceContextHolder.setDataSource("slave");
        } else {
            DataSourceContextHolder.setDataSource("master");
        }
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void after(JoinPoint joinPoint) {
        DataSourceContextHolder.clear();
    }
}

1.2 分库分表策略

当单表数据量超过千万级别或并发量极高时,需要考虑分库分表。

水平分表示例:

-- 用户表按用户ID取模分表
-- user_0, user_1, user_2, user_3

-- 创建分表结构
CREATE TABLE user_0 (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    created_at TIMESTAMP,
    INDEX idx_username (username)
) ENGINE=InnoDB;

-- 分表规则:user_id % 4
-- 查询时根据user_id计算表名

分库分表中间件配置(ShardingSphere):

# sharding.yaml
dataSources:
  ds_0: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_0
    username: root
    password: password
  ds_1: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_1
    username: root
    password: password

shardingRule:
  tables:
    user:
      actualDataNodes: ds_${0..1}.user_${0..3}
      tableStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: user_${user_id % 4}
      databaseStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: ds_${user_id % 2}
  bindingTables:
    - user

1.3 连接池优化

连接池是应用与数据库之间的缓冲层,合理的配置可以显著提升性能。

HikariCP最佳配置示例:

@Configuration
public class HikariConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        // 连接池大小配置
        dataSource.setMaximumPoolSize(50);        // 最大连接数
        dataSource.setMinimumIdle(10);            // 最小空闲连接
        dataSource.setConnectionTimeout(30000);   // 连接超时30秒
        dataSource.setIdleTimeout(600000);        // 空闲超时10分钟
        dataSource.setMaxLifetime(1800000);       // 连接最大存活时间30分钟
        
        // 性能优化配置
        dataSource.setLeakDetectionThreshold(60000); // 泄漏检测60秒
        dataSource.setInitializationFailTimeout(1);  // 初始化失败立即抛出异常
        
        // 连接测试
        dataSource.setTestWhileIdle(true);
        dataSource.setTestOnBorrow(false);
        dataSource.setTestOnReturn(false);
        dataSource.setValidationTimeout(3000);
        dataSource.setConnectionTestQuery("SELECT 1");
        
        return dataSource;
    }
}

连接池配置参数说明:

参数 推荐值 说明
maximumPoolSize CPU核心数 * 2 + 1 根据实际业务调整,一般不超过100
minimumIdle 与maximumPoolSize相同 避免频繁创建销毁连接
connectionTimeout 30秒 连接获取超时时间
idleTimeout 10分钟 空闲连接回收时间
maxLifetime 30分钟 连接最大存活时间

二、SQL优化策略

2.1 索引优化

索引是提升查询性能的关键,但不当的索引会成为性能杀手。

索引优化原则:

  • 遵循最左前缀原则
  • 避免索引失效的写法
  • 控制索引数量(单表不超过5个)
  • 使用覆盖索引减少回表

索引优化实战:

-- 原始低效查询
SELECT * FROM orders WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';

-- 优化步骤1:创建复合索引
CREATE INDEX idx_user_status_created ON orders(user_id, status, created_at);

-- 优化步骤2:使用覆盖索引(避免SELECT *)
SELECT order_id, amount, created_at 
FROM orders 
WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';

-- 优化步骤3:使用EXPLAIN分析
EXPLAIN SELECT order_id, amount, created_at 
FROM orders 
WHERE user_id = 123 AND status = 'PAID' AND created_at > '2024-01-01';

索引失效的常见情况:

-- 1. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000;  -- phone是VARCHAR类型,索引失效
-- 正确写法
SELECT * FROM users WHERE phone = '13800138000';

-- 2. 函数操作
SELECT * FROM orders WHERE DATE(created_at) = '2024-01-01';  -- 索引失效
-- 正确写法
SELECT * FROM orders WHERE created_at >= '2024-01-01 00:00:00' AND created_at < '2024-01-02 00:00:00';

-- 3. LIKE以%开头
SELECT * FROM users WHERE username LIKE '%john%';  -- 索引失效
-- 正确写法(如果必须这样,考虑全文索引)
SELECT * FROM users WHERE username LIKE 'john%';   -- 索引有效

-- 4. OR条件(部分版本优化不佳)
SELECT * FROM orders WHERE user_id = 123 OR amount > 1000;  -- 可能索引失效
-- 优化写法
SELECT * FROM orders WHERE user_id = 123
UNION ALL
SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;

2.2 查询语句优化

避免SELECT *:

-- 反例:查询所有列
SELECT * FROM products WHERE category_id = 1;

-- 正例:只查询需要的列
SELECT product_id, name, price, stock FROM products WHERE category_id = 1;

-- 为什么?减少网络传输和内存消耗,可能使用覆盖索引

分页查询优化:

-- 普通分页(大数据量时性能差)
SELECT * FROM orders ORDER BY created_at DESC LIMIT 1000000, 20;

-- 优化方案1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders 
    ORDER BY created_at DESC 
    LIMIT 1000000, 20
) t ON o.order_id = t.order_id;

-- 优化方案2:记录上次位置(游标分页)
SELECT * FROM orders 
WHERE created_at < '2024-01-01 10:00:00'  -- 上次最后一条的时间
ORDER BY created_at DESC 
LIMIT 20;

JOIN优化:

-- 反例:多表JOIN且没有索引
SELECT u.*, o.*, p.* 
FROM users u
JOIN orders o ON u.user_id = o.user_id
JOIN products p ON o.product_id = p.product_id
WHERE u.username = 'john';

-- 正例:小表驱动大表,确保JOIN字段有索引
-- 步骤1:先过滤小表
SELECT u.user_id FROM users u WHERE u.username = 'john';

-- 步骤2:再关联大表
SELECT o.*, p.* 
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.user_id IN (SELECT user_id FROM users WHERE username = 'john');

2.3 事务优化

事务设计原则:

  • 事务尽可能短
  • 避免在事务中执行耗时操作
  • 合理设置隔离级别
  • 避免大事务

事务优化示例:

// 反例:大事务(长时间持有锁)
@Transactional
public void processOrder(Long orderId) {
    // 1. 查询订单
    Order order = orderRepository.findById(orderId);
    
    // 2. 调用外部API(耗时操作)
    paymentService.verifyPayment(order.getPaymentId());  // 网络IO,耗时
    
    // 3. 更新库存
    inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
    
    // 4. 更新订单状态
    order.setStatus("COMPLETED");
    orderRepository.save(order);
}

// 正例:拆分事务,移出耗时操作
public void processOrder(Long orderId) {
    // 1. 先执行非事务性操作
    paymentService.verifyPayment(order.getPaymentId());
    
    // 2. 小事务处理核心逻辑
    transactionTemplate.execute(status -> {
        Order order = orderRepository.findById(orderId);
        inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
        order.setStatus("COMPLETED");
        orderRepository.save(order);
        return null;
    });
}

三、MySQL配置调优

3.1 核心参数配置

my.cnf 关键配置:

[mysqld]
# === 连接配置 ===
max_connections = 1000          # 最大连接数,根据业务调整
back_log = 500                  # 连接队列长度
wait_timeout = 600              # 非交互连接超时时间(秒)
interactive_timeout = 600       # 交互连接超时时间(秒)

# === 缓冲池配置 ===
innodb_buffer_pool_size = 8G    # 缓冲池大小(物理内存的50-75%)
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G       # 重做日志文件大小
innodb_log_buffer_size = 64M    # 重做日志缓冲区大小

# === 性能配置 ===
innodb_flush_log_at_trx_commit = 1  # 1:每次提交刷盘(安全)2:每秒刷盘(高性能)
innodb_flush_method = O_DIRECT      # 绕过OS缓存,直接IO
innodb_io_capacity = 2000           # InnoDB每秒IO操作数(SSD可设更高)
innodb_read_io_threads = 8          # 读线程数
innodb_write_io_threads = 8         # 写线程数

# === 并发配置 ===
innodb_thread_concurrency = 32      # InnoDB并发线程数
innodb_lock_wait_timeout = 50       # 锁等待超时(秒)
innodb_rollback_on_timeout = OFF    # 超时是否回滚

# === 查询缓存(MySQL 8.0已移除)===
query_cache_type = 0                # 关闭查询缓存
query_cache_size = 0

# === 日志配置 ===
slow_query_log = 1                  # 开启慢查询日志
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2                 # 慢查询阈值(秒)
log_queries_not_using_indexes = 1   # 记录未使用索引的查询

# === 临时表配置 ===
tmp_table_size = 256M               # 临时表大小
max_heap_table_size = 256M          # 内存表最大大小

3.2 监控与诊断

实时监控脚本:

#!/bin/bash
# MySQL性能监控脚本

MYSQL_CMD="mysql -u root -p'password' -e"

while true; do
    echo "=== MySQL Performance Monitor $(date) ==="
    
    # 1. 当前连接数
    $MYSQL_CMD "SHOW STATUS LIKE 'Threads_connected';"
    
    # 2. 活跃线程数
    $MYSQL_CMD "SHOW STATUS LIKE 'Threads_running';"
    
    # 3. 缓冲池命中率
    $MYSQL_CMD "SELECT 
        (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS buffer_hit_rate
        FROM performance_schema.global_status 
        WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';"
    
    # 4. 慢查询数量
    $MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Slow_queries';"
    
    # 5. 锁等待情况
    $MYSQL_CMD "SELECT * FROM performance_schema.data_lock_waits LIMIT 5;"
    
    sleep 10
done

慢查询分析:

-- 开启慢查询日志后,使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

-- 或使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

-- 在MySQL中分析慢查询
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 as avg_time_sec,
    SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

四、缓存策略

4.1 多级缓存架构

缓存层次:

  1. 客户端缓存(浏览器/APP)
  2. CDN缓存
  3. 应用层缓存(Redis)
  4. 数据库缓存(Query Cache,MySQL 8.0已移除)

4.2 Redis缓存实战

缓存设计模式:

@Service
public class ProductService {
    
    @Autowired
    private ProductRepository productRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String PRODUCT_CACHE_KEY = "product:%s";
    private static final long CACHE_TTL = 3600; // 1小时
    
    // 方案1:Cache Aside模式
    public Product getProductById(Long id) {
        String key = String.format(PRODUCT_CACHE_KEY, id);
        
        // 1. 先读缓存
        Product product = (Product) redisTemplate.opsForValue().get(key);
        if (product != null) {
            return product;
        }
        
        // 2. 缓存未命中,读数据库
        product = productRepository.findById(id);
        if (product != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(key, product, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return product;
    }
    
    // 更新数据时的缓存处理
    @Transactional
    public void updateProduct(Product product) {
        // 1. 更新数据库
        productRepository.save(product);
        
        // 2. 删除缓存(避免并发问题)
        String key = String.format(PRODUCT_CACHE_KEY, product.getId());
        redisTemplate.delete(key);
    }
    
    // 方案2:缓存穿透保护
    public Product getProductWithPenetrationProtection(Long id) {
        String key = String.format(PRODUCT_CACHE_KEY, id);
        
        // 1. 查询缓存
        Object cached = redisTemplate.opsForValue().get(key);
        if (cached != null) {
            // 如果是空值标记,返回null
            if (cached instanceof String && "NULL".equals(cached)) {
                return null;
            }
            return (Product) cached;
        }
        
        // 2. 查询数据库
        Product product = productRepository.findById(id);
        
        // 3. 缓存结果(包括空值)
        if (product != null) {
            redisTemplate.opsForValue().set(key, product, CACHE_TTL, TimeUnit.SECONDS);
        } else {
            // 缓存空值,防止缓存穿透(设置较短TTL)
            redisTemplate.opsForValue().set(key, "NULL", 60, TimeUnit.SECONDS);
        }
        
        return product;
    }
    
    // 方案3:缓存雪崩保护(随机TTL)
    public void setProductCacheWithRandomTTL(Long id, Product product) {
        String key = String.format(PRODUCT_CACHE_KEY, id);
        // 基础TTL + 随机值(0-300秒)
        long ttl = CACHE_TTL + (long) (Math.random() * 300);
        redisTemplate.opsForValue().set(key, product, ttl, TimeUnit.SECONDS);
    }
}

Redis配置优化:

# application-redis.yml
spring:
  redis:
    host: localhost
    port: 6379
    password: your_password
    database: 0
    timeout: 5000ms
    lettuce:
      pool:
        max-active: 100
        max-idle: 50
        min-idle: 10
        max-wait: 5000ms

4.3 缓存与数据库一致性

最终一致性方案:

// 使用消息队列保证最终一致性
@Service
public class CacheUpdateService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // 更新数据库后发送消息
    public void updateProductWithCache(Product product) {
        // 1. 更新数据库
        productRepository.save(product);
        
        // 2. 发送缓存更新消息
        CacheMessage message = new CacheMessage();
        message.setEntityType("PRODUCT");
        message.setEntityId(product.getId());
        message.setAction("UPDATE");
        
        kafkaTemplate.send("cache-update-topic", JSON.toJSONString(message));
    }
    
    // 消费者监听消息,更新缓存
    @KafkaListener(topics = "cache-update-topic")
    public void handleCacheUpdate(String messageStr) {
        CacheMessage message = JSON.parseObject(messageStr, CacheMessage.class);
        
        if ("PRODUCT".equals(message.getEntityId())) {
            String key = String.format(PRODUCT_CACHE_KEY, message.getEntityId());
            
            // 延迟双删策略
            redisTemplate.delete(key);
            
            // 延迟后再次删除(防止并发写入时的旧数据)
            scheduledExecutorService.schedule(() -> {
                redisTemplate.delete(key);
            }, 500, TimeUnit.MILLISECONDS);
        }
    }
}

五、高级优化技巧

5.1 并发控制

乐观锁实现:

-- 表结构增加版本号字段
CREATE TABLE product (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    stock INT,
    version INT DEFAULT 0  -- 版本号
);

-- 更新时检查版本号
UPDATE product 
SET stock = stock - 1, version = version + 1
WHERE id = 123 AND version = 5;  -- 原子操作

-- 如果更新失败(版本号不匹配),重试或返回错误

悲观锁实现:

-- 显式加锁
BEGIN;
SELECT * FROM inventory WHERE product_id = 123 FOR UPDATE;
-- 执行业务逻辑
UPDATE inventory SET stock = stock - 1 WHERE product_id = 123;
COMMIT;

-- 注意:FOR UPDATE会阻塞其他事务的写操作,需谨慎使用

5.2 分区表

Range分区示例:

-- 按时间分区
CREATE TABLE logs (
    id BIGINT,
    log_time DATETIME,
    message TEXT,
    INDEX idx_time (log_time)
) PARTITION BY RANGE (YEAR(log_time) * 100 + MONTH(log_time)) (
    PARTITION p202401 VALUES LESS THAN (202402),
    PARTITION p202402 VALUES LESS THAN (202403),
    PARTITION p202403 VALUES LESS THAN (202404),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 查询时自动分区裁剪
SELECT * FROM logs WHERE log_time >= '2024-02-01' AND log_time < '2024-03-01';
-- 只扫描p202402分区

5.3 并发插入优化

批量插入优化:

// 反例:逐条插入
for (Order order : orders) {
    orderRepository.save(order);  // 每次都要网络往返
}

// 正例:批量插入
public void batchInsert(List<Order> orders) {
    String sql = "INSERT INTO orders (order_id, user_id, amount, status) VALUES (?, ?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false);  // 关闭自动提交
        
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            ps.setLong(1, order.getOrderId());
            ps.setLong(2, order.getUserId());
            ps.setBigDecimal(3, order.getAmount());
            ps.setString(4, order.getStatus());
            ps.addBatch();
            
            // 每1000条提交一次
            if (i % 1000 == 0) {
                ps.executeBatch();
                conn.commit();
            }
        }
        
        // 提交剩余
        ps.executeBatch();
        conn.commit();
    } catch (SQLException e) {
        // 异常处理
    }
}

LOAD DATA INFILE(最高效):

-- 准备CSV文件
-- orders.csv
-- 1,1001,99.99,PAID
-- 2,1002,199.99,PAID

-- 批量导入
LOAD DATA LOCAL INFILE '/path/to/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(order_id, user_id, amount, status);

六、压力测试与监控

6.1 压力测试工具

sysbench测试:

# 安装sysbench
sudo apt-get install sysbench

# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=testdb --tables=10 --table-size=100000 \
         /usr/share/sysbench/oltp_read_write.lua prepare

# 执行测试
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=testdb --tables=10 --table-size=100000 \
         --threads=100 --time=300 --report-interval=10 \
         /usr/share/sysbench/oltp_read_write.lua run

# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=testdb \
         /usr/share/sysbench/oltp_read_write.lua cleanup

6.2 监控指标

关键监控指标:

-- 1. QPS(每秒查询数)
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Questions';

-- 2. TPS(每秒事务数)
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';

-- 3. 缓冲池命中率
SELECT 
    (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS hit_rate
FROM performance_schema.global_status 
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';

-- 4. 锁等待情况
SELECT * FROM performance_schema.data_lock_waits;

-- 5. 慢查询统计
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 as avg_time_sec
FROM performance_schema.events_statements_summary_by_digest
ORDER BY COUNT_STAR DESC
LIMIT 10;

6.3 自动化监控脚本

#!/usr/bin/env python3
# MySQL监控脚本

import pymysql
import time
import smtplib
from email.mime.text import MIMEText

class MySQLMonitor:
    def __init__(self, host, user, password, db):
        self.conn = pymysql.connect(
            host=host, user=user, password=password, db=db
        )
        self.thresholds = {
            'threads_connected': 800,  # 连接数阈值
            'slow_queries': 10,         # 慢查询阈值(每分钟)
            'buffer_hit_rate': 95       # 缓冲池命中率阈值
        }
    
    def check_metrics(self):
        cursor = self.conn.cursor()
        
        # 检查连接数
        cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
        threads_connected = int(cursor.fetchone()[1])
        
        # 检查慢查询
        cursor.execute("SHOW GLOBAL STATUS LIKE 'Slow_queries'")
        slow_queries = int(cursor.fetchone()[1])
        
        # 检查缓冲池命中率
        cursor.execute("""
            SELECT (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 
            FROM performance_schema.global_status 
            WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads'
        """)
        hit_rate = float(cursor.fetchone()[0])
        
        alerts = []
        if threads_connected > self.thresholds['threads_connected']:
            alerts.append(f"连接数过高: {threads_connected}")
        
        if slow_queries > self.thresholds['slow_queries']:
            alerts.append(f"慢查询过多: {slow_queries}")
        
        if hit_rate < self.thresholds['buffer_hit_rate']:
            alerts.append(f"缓冲池命中率过低: {hit_rate:.2f}%")
        
        return alerts
    
    def send_alert(self, message):
        # 发送邮件告警
        msg = MIMEText(message)
        msg['Subject'] = 'MySQL性能告警'
        msg['From'] = 'monitor@example.com'
        msg['To'] = 'admin@example.com'
        
        # 配置SMTP(示例)
        # server = smtplib.SMTP('smtp.example.com')
        # server.send_message(msg)
        print(f"ALERT: {message}")
    
    def run(self):
        while True:
            alerts = self.check_metrics()
            if alerts:
                self.send_alert('\n'.join(alerts))
            time.sleep(60)  # 每分钟检查一次

if __name__ == '__main__':
    monitor = MySQLMonitor('localhost', 'root', 'password', 'testdb')
    monitor.run()

七、实战案例:秒杀系统优化

7.1 秒杀场景特点

  • 瞬时高并发:QPS可达数十万
  • 库存有限:容易超卖
  • 读多写少:查询商品信息远多于下单

7.2 完整优化方案

1. 架构设计:

用户请求 → Nginx → 应用服务器 → Redis缓存 → MySQL

2. 核心代码实现:

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:%s";
    private static final String ORDER_KEY = "seckill:order:%s:%s"; // userId:productId
    
    /**
     * 秒杀下单
     */
    public SeckillResult seckill(Long productId, Long userId) {
        String stockKey = String.format(STOCK_KEY, productId);
        String orderKey = String.format(ORDER_KEY, userId, productId);
        
        // 1. 检查是否已下单(防重复)
        if (redisTemplate.hasKey(orderKey)) {
            return SeckillResult.fail("您已参与过秒杀");
        }
        
        // 2. 预扣库存(Redis原子操作)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            // 库存不足,回滚
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 标记已下单
        redisTemplate.opsForValue().set(orderKey, "1", 30, TimeUnit.MINUTES);
        
        // 4. 发送消息到队列,异步创建订单
        OrderMessage message = new OrderMessage();
        message.setProductId(productId);
        message.setUserId(userId);
        message.setTimestamp(System.currentTimeMillis());
        
        kafkaTemplate.send("seckill-order", JSON.toJSONString(message));
        
        return SeckillResult.success("秒杀成功,订单处理中");
    }
    
    /**
     * 异步处理订单(消费者)
     */
    @KafkaListener(topics = "seckill-order")
    public void processOrder(String messageStr) {
        OrderMessage message = JSON.parseObject(messageStr, OrderMessage.class);
        
        try {
            // 1. 检查Redis库存(双重验证)
            String stockKey = String.format(STOCK_KEY, message.getProductId());
            Long stock = redisTemplate.opsForValue().get(stockKey);
            if (stock == null || stock < 0) {
                return; // 库存不足,丢弃消息
            }
            
            // 2. 创建数据库事务
            transactionTemplate.execute(status -> {
                // 检查数据库库存
                Product product = productRepository.findById(message.getProductId());
                if (product.getStock() <= 0) {
                    status.setRollbackOnly();
                    return null;
                }
                
                // 扣减数据库库存
                int updated = productRepository.decreaseStock(message.getProductId());
                if (updated == 0) {
                    status.setRollbackOnly();
                    return null;
                }
                
                // 创建订单
                Order order = new Order();
                order.setProductId(message.getProductId());
                order.setUserId(message.getUserId());
                order.setStatus("CREATED");
                order.setAmount(product.getPrice());
                orderRepository.save(order);
                
                return order;
            });
            
        } catch (Exception e) {
            // 异常时回滚Redis库存
            redisTemplate.opsForValue().increment(stockKey);
            log.error("订单处理失败", e);
        }
    }
}

3. 数据库表结构优化:

-- 商品表
CREATE TABLE product (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    price DECIMAL(10,2),
    stock INT,
    version INT DEFAULT 0,  -- 乐观锁版本号
    INDEX idx_stock (stock)  -- 库存索引
) ENGINE=InnoDB;

-- 订单表(分表)
CREATE TABLE order_0 (
    order_id BIGINT PRIMARY KEY,
    product_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP,
    INDEX idx_user (user_id),
    INDEX idx_product (product_id)
) ENGINE=InnoDB;

-- 秒杀记录表(只记录成功订单)
CREATE TABLE seckill_record (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    product_id BIGINT,
    user_id BIGINT,
    order_id BIGINT,
    created_at TIMESTAMP,
    UNIQUE KEY uk_user_product (user_id, product_id)
) ENGINE=InnoDB;

4. MySQL配置调整(秒杀场景):

[mysqld]
# 降低连接超时,快速回收连接
wait_timeout = 30
interactive_timeout = 30

# 增大重做日志,减少刷盘频率
innodb_log_file_size = 4G
innodb_log_buffer_size = 128M

# 提升I/O性能
innodb_flush_log_at_trx_commit = 2  # 每秒刷盘(性能优先)
innodb_io_capacity = 5000           # SSD配置

# 连接数
max_connections = 2000
max_connect_errors = 100000

八、总结与最佳实践

8.1 高并发处理黄金法则

  1. 缓存为王:90%的查询应该在缓存层解决
  2. 异步化:耗时操作异步处理,提升响应速度
  3. 限流降级:保护数据库不被压垮
  4. 读写分离:分散读压力
  5. 分库分表:数据量过大时的终极方案

8.2 性能优化检查清单

  • [ ] 是否使用了合适的索引?
  • [ ] 是否避免了SELECT *?
  • [ ] 是否使用了连接池?
  • [ ] 是否配置了合适的缓冲池大小?
  • [ ] 是否开启了慢查询日志?
  • [ ] 是否使用了Redis缓存?
  • [ ] 是否实现了读写分离?
  • [ ] 是否进行了压力测试?
  • [ ] 是否有监控告警?

8.3 持续优化建议

  1. 定期分析慢查询:每周review慢查询日志
  2. 监控关键指标:QPS、TPS、连接数、缓冲池命中率
  3. 压力测试:每次大促前进行全链路压测
  4. 容量规划:根据业务增长提前扩容
  5. 故障演练:定期进行故障注入测试

通过以上策略的综合应用,可以有效应对高并发场景下的MySQL性能瓶颈,保障系统稳定运行。记住,优化是一个持续的过程,需要根据实际业务场景不断调整和完善。