引言:理解高并发挑战

在现代互联网应用中,高并发场景已成为常态。电商平台的秒杀活动、社交媒体的热点事件、金融系统的交易高峰,都可能在瞬间产生海量数据库请求。MySQL作为最流行的关系型数据库,虽然具备优秀的性能表现,但在面对高并发压力时,若配置不当或设计不合理,极易出现响应延迟、连接耗尽甚至系统崩溃等问题。

高并发处理的核心目标是在保证数据一致性和完整性的前提下,最大化数据库的吞吐量和响应速度。这需要从架构设计、配置优化、SQL调优、缓存策略等多个维度进行系统性优化。本文将深入探讨MySQL高并发处理的完整策略体系。

一、连接层优化:构建高效通信通道

1.1 连接池配置优化

数据库连接是应用与MySQL之间的桥梁,频繁创建和销毁连接会消耗大量资源。连接池技术通过复用连接显著降低开销。

关键配置参数:

-- 查看当前连接配置
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'wait_timeout';
SHOW VARIABLES LIKE 'interactive_timeout';

-- 推荐配置(根据服务器内存调整)
SET GLOBAL max_connections = 2000;  -- 最大连接数
SET GLOBAL wait_timeout = 300;      -- 非交互连接超时(秒)
SET GLOBAL interactive_timeout = 300; -- 交互连接超时(秒)

连接池配置示例(HikariCP):

// Java应用中的HikariCP配置
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
config.setUsername("user");
config.setPassword("password");

// 核心优化参数
config.setMaximumPoolSize(50);        // 最大连接数,通常为CPU核心数*2
config.setMinimumIdle(10);            // 最小空闲连接
config.setConnectionTimeout(30000);   // 连接超时30秒
config.setIdleTimeout(600000);        // 空闲超时10分钟
config.setMaxLifetime(1800000);       // 连接最大生命周期30分钟
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值

// 关键优化:启用预编译SQL缓存
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");

HikariDataSource dataSource = new HikariDataSource(config);

连接池监控指标:

-- 查看当前连接状态
SHOW PROCESSLIST;

-- 查看连接数使用情况
SELECT 
    COUNT(*) as total_connections,
    SUM(CASE WHEN COMMAND = 'Sleep' THEN 1 ELSE 0 END) as idle_connections,
    SUM(CASE WHEN COMMAND != 'Sleep' THEN 1 ELSE 0 END) as active_connections
FROM INFORMATION_SCHEMA.PROCESSLIST;

-- 查看连接拒绝统计(需开启performance_schema)
SELECT * FROM performance_schema.events_waits_summary_global_by_event_name 
WHERE EVENT_NAME LIKE '%connection%';

1.2 连接数限制与排队机制

当连接数达到上限时,新连接会被拒绝。合理的连接数配置需要平衡资源消耗和并发能力。

动态连接数调整策略:

-- 创建连接数监控存储过程
DELIMITER $$
CREATE PROCEDURE check_connection_usage()
BEGIN
    DECLARE max_conn INT;
    DECLARE current_conn INT;
    DECLARE usage_ratio DECIMAL(5,2);
    
    SELECT VARIABLE_VALUE INTO max_conn 
    FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES 
    WHERE VARIABLE_NAME = 'max_connections';
    
    SELECT COUNT(*) INTO current_conn 
    FROM INFORMATION_SCHEMA.PROCESSLIST;
    
    SET usage_ratio = (current_conn / max_conn) * 100;
    
    -- 当连接使用率超过80%时发出警告
    IF usage_ratio > 80 THEN
        SELECT CONCAT('警告:连接使用率已达 ', usage_ratio, '%') as warning;
    END IF;
    
    SELECT current_conn as 当前连接数, 
           max_conn as 最大连接数,
           usage_ratio as 使用率;
END$$
DELIMITER ;

-- 定期执行检查
CALL check_connection_usage();

应用层限流策略(Java示例):

// 使用Guava RateLimiter进行应用层限流
import com.google.common.util.concurrent.RateLimiter;

public class DatabaseAccessLimiter {
    // 每秒允许100次数据库访问
    private static final RateLimiter dbAccessLimiter = RateLimiter.create(100.0);
    
    public void executeDatabaseOperation(Runnable operation) {
        // 获取许可,最多等待500毫秒
        if (dbAccessLimiter.tryAcquire(500, TimeUnit.MILLISECONDS)) {
            operation.run();
        } else {
            // 降级处理:返回缓存数据或提示服务繁忙
            throw new RuntimeException("系统繁忙,请稍后重试");
        }
    }
}

二、存储引擎优化:InnoDB深度调优

2.1 内存配置优化

InnoDB缓冲池是MySQL性能的核心,它缓存数据和索引以减少磁盘I/O。

关键配置参数:

-- 查看当前InnoDB配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_log_file_size';
SHOW VARIABLES LIKE 'innodb_flush_log_at_trx_commit';

-- 推荐配置(根据服务器内存调整)
-- 对于16GB内存服务器:
SET GLOBAL innodb_buffer_pool_size = 10737418240;  -- 10GB,通常为内存的50-70%
SET GLOBAL innodb_buffer_pool_instances = 8;       -- 缓冲池实例数,避免争用
SET GLOBAL innodb_log_file_size = 268435456;       -- 256MB,重做日志文件大小
SET GLOBAL innodb_log_buffer_size = 67108864;      -- 64MB,日志缓冲区
SET GLOBAL innodb_flush_log_at_trx_commit = 1;     -- 1=每次提交刷盘(最安全)
SET GLOBAL innodb_flush_method = O_DIRECT;         -- 绕过OS缓存,直接IO
SET GLOBAL innodb_io_capacity = 2000;              -- IO能力,SSD可设更高
SET GLOBAL innodb_io_capacity_max = 4000;          -- 最大IO能力

缓冲池状态监控:

-- 查看缓冲池使用情况
SHOW ENGINE INNODB STATUS\G

-- 查看缓冲池详细统计
SELECT 
    pool_id,
    pool_size,
    free_buffers,
    database_pages,
    old_database_pages,
    modified_database_pages,
    pending_reads,
    pending_flush_lru,
    pending_flush_list
FROM INFORMATION_SCHEMA.INNODB_BUFFER_POOL_STATS;

-- 查看缓冲池中的热点数据
SELECT 
    table_name,
    number_of_pages * 16 / 1024 as size_mb,
    number_of_pages
FROM INFORMATION_SCHEMA.INNODB_BUFFER_PAGE
WHERE table_name IS NOT NULL
GROUP BY table_name
ORDER BY number_of_pages DESC
LIMIT 10;

2.2 事务与日志优化

事务日志配置直接影响写入性能和数据安全性。

事务日志优化配置:

-- 查看重做日志状态
SHOW ENGINE INNODB STATUS\G

-- 查看日志文件
SHOW VARIABLES LIKE 'innodb_log_files_in_group';

-- 优化建议:
-- 1. 增大日志文件大小以减少日志切换频率
-- 2. 对于写密集型应用,可调整刷盘策略(牺牲部分安全性换取性能)
SET GLOBAL innodb_flush_log_at_trx_commit = 2;  -- 每秒刷盘,性能提升明显
SET GLOBAL innodb_flush_log_at_trx_commit = 0;  -- 由系统决定刷盘,风险最高

-- 查看日志写入等待
SELECT 
    EVENT_NAME,
    COUNT_STAR,
    SUM_TIMER_WAIT / 1000000000 as wait_time_ms
FROM performance_schema.events_waits_summary_global_by_event_name
WHERE EVENT_NAME LIKE '%innodb_log%'
ORDER BY SUM_TIMER_WAIT DESC;

2.3 表结构设计优化

分区表策略:

-- 按时间分区的订单表(适用于时间序列数据)
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    order_time DATETIME,
    amount DECIMAL(10,2),
    status INT,
    INDEX idx_user (user_id),
    INDEX idx_time (order_time)
) PARTITION BY RANGE (YEAR(order_time) * 100 + MONTH(order_time)) (
    PARTITION p202401 VALUES LESS THAN (202402),
    PARTITION p202402 VALUES LESS THAN (202403),
    PARTITION p202403 VALUES LESS THAN (202404),
    PARTITION p202404 VALUES LESS THAN (202405),
    PARTITION p_max VALUES LESS THAN MAXVALUE
);

-- 查看分区使用情况
EXPLAIN PARTITIONS SELECT * FROM orders WHERE order_time BETWEEN '2024-03-01' AND '2024-03-31';

-- 按哈希分区的用户表(适用于数据均匀分布)
CREATE TABLE user_logs (
    log_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    log_time DATETIME,
    action VARCHAR(50)
) PARTITION BY HASH(user_id) PARTITIONS 16;

垂直拆分示例:

-- 原始宽表
CREATE TABLE user_profile (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    address TEXT,
    avatar_url VARCHAR(255),
    bio TEXT,
    created_at DATETIME,
    updated_at DATETIME,
    -- 更多字段...
    INDEX idx_username (username),
    INDEX idx_email (email)
);

-- 垂直拆分:基本信息与扩展信息分离
CREATE TABLE user_base (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    created_at DATETIME,
    updated_at DATETIME,
    INDEX idx_username (username),
    INDEX idx_email (email)
);

CREATE TABLE user_profile_ext (
    user_id BIGINT PRIMARY KEY,
    address TEXT,
    avatar_url VARCHAR(255),
    bio TEXT,
    FOREIGN KEY (user_id) REFERENCES user_base(user_id) ON DELETE CASCADE
);

三、SQL语句优化:从查询层面提升性能

3.1 索引优化策略

索引设计原则:

  • 遵循最左前缀原则
  • 避免索引失效的写法
  • 合理使用覆盖索引
  • 控制索引数量(过多影响写入性能)

索引优化实战:

-- 创建复合索引示例
CREATE TABLE order_items (
    id BIGINT PRIMARY KEY,
    order_id BIGINT,
    product_id BIGINT,
    quantity INT,
    price DECIMAL(10,2),
    created_at DATETIME,
    INDEX idx_order_product (order_id, product_id),  -- 复合索引
    INDEX idx_created_at (created_at)
);

-- 优化前:索引失效的查询
SELECT * FROM order_items WHERE order_id = 1001 AND product_id = 2005;
-- 如果只有单列索引,可能无法充分利用

-- 优化后:使用复合索引
EXPLAIN SELECT order_id, product_id, quantity 
FROM order_items 
WHERE order_id = 1001 AND product_id = 2005;
-- 查看执行计划,确保使用了idx_order_product索引

-- 覆盖索引示例(避免回表)
-- 查询只需要索引中的列,无需访问数据行
EXPLAIN SELECT order_id, product_id 
FROM order_items 
WHERE order_id BETWEEN 1000 AND 2000;
-- 如果索引包含所有查询列,性能提升显著

-- 索引失效案例与避免
-- 1. 在索引列上使用函数
-- 错误:WHERE YEAR(created_at) = 2024
-- 正确:WHERE created_at BETWEEN '2024-01-01' AND '2024-12-31'

-- 2. 隐式类型转换
-- 错误:WHERE phone = 13800138000  (phone是varchar)
-- 正确:WHERE phone = '13800138000'

-- 3. 模糊查询前置通配符
-- 错误:WHERE username LIKE '%john'
-- 正确:WHERE username LIKE 'john%'  -- 后置通配符可以使用索引

-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    rows_read,
    rows_selected
FROM INFORMATION_SCHEMA.STATISTICS
WHERE table_schema = 'mydb' AND table_name = 'order_items';

3.2 查询语句优化

避免SELECT *:

-- 性能差:返回所有列,可能触发回表
SELECT * FROM orders WHERE order_id = 1001;

-- 性能好:只查询需要的列,可能使用覆盖索引
SELECT order_id, user_id, order_time, amount 
FROM orders 
WHERE order_id = 1001;

JOIN优化:

-- 优化前:笛卡尔积风险
SELECT * FROM orders o, users u WHERE o.user_id = u.user_id;

-- 优化后:显式JOIN,确保驱动表正确
SELECT o.order_id, o.amount, u.username
FROM orders o
INNER JOIN users u ON o.user_id = u.user_id
WHERE o.order_time > '2024-01-01'
ORDER BY o.order_time DESC
LIMIT 100;

-- 查看JOIN执行计划
EXPLAIN SELECT o.order_id, o.amount, u.username
FROM orders o
INNER JOIN users u ON o.user_id = u.user_id;
-- 关注type列:ALL > index > range > ref > eq_ref > const
-- 关注rows列:扫描行数越少越好

子查询优化为JOIN:

-- 优化前:相关子查询,性能差
SELECT user_id, username 
FROM users u 
WHERE EXISTS (
    SELECT 1 FROM orders o 
    WHERE o.user_id = u.user_id AND o.amount > 1000
);

-- 优化后:使用JOIN,性能更好
SELECT DISTINCT u.user_id, u.username
FROM users u
INNER JOIN orders o ON u.user_id = o.user_id
WHERE o.amount > 1000;

3.3 批量操作与分页优化

批量插入优化:

-- 优化前:逐条插入,性能差
INSERT INTO orders (order_id, user_id, amount) VALUES (1, 1001, 99.99);
INSERT INTO orders (order_id, user_id, amount) VALUES (2, 1002, 199.99);
-- ... 1000次插入

-- 优化后:批量插入,减少网络往返
INSERT INTO orders (order_id, user_id, amount) VALUES 
(1, 1001, 99.99),
(2, 1002, 199.99),
(3, 1003, 299.99),
-- ... 一次性插入1000条
(1000, 2000, 399.99);

-- Java代码批量插入示例
public void batchInsertOrders(List<Order> orders) {
    String sql = "INSERT INTO orders (order_id, user_id, amount) VALUES (?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement pstmt = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false);  // 关闭自动提交
        
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            pstmt.setLong(1, order.getOrderId());
            pstmt.setLong(2, order.getUserId());
            pstmt.setBigDecimal(3, order.getAmount());
            pstmt.addBatch();
            
            // 每1000条提交一次
            if (i % 1000 == 0 && i > 0) {
                pstmt.executeBatch();
                conn.commit();
            }
        }
        
        // 提交剩余记录
        pstmt.executeBatch();
        conn.commit();
        
    } catch (SQLException e) {
        // 异常处理
    }
}

深分页问题优化:

-- 优化前:性能随页码增加急剧下降
SELECT * FROM orders 
WHERE order_time > '2024-01-01'
ORDER BY order_time DESC
LIMIT 1000000, 100;  -- 跳过前100万条,性能极差

-- 优化后:使用延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders
    WHERE order_time > '2024-01-01'
    ORDER BY order_time DESC
    LIMIT 1000000, 100
) AS tmp ON o.order_id = tmp.order_id;

-- 或者使用keyset分页(推荐)
-- 第一页
SELECT * FROM orders 
WHERE order_time > '2024-01-01'
ORDER BY order_time DESC, order_id DESC
LIMIT 100;

-- 第二页(记录上一页最后一条的order_time和order_id)
SELECT * FROM orders 
WHERE (order_time < '2024-03-01 10:00:00' OR 
       (order_time = '2024-03-01 10:00:00' AND order_id < 12345))
  AND order_time > '2024-01-01'
ORDER BY order_time DESC, order_id DESC
LIMIT 100;

四、缓存策略:减轻数据库压力

4.1 多级缓存架构

应用层缓存(Redis):

// Redis缓存服务示例
@Service
public class OrderCacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    private static final String ORDER_KEY_PREFIX = "order:";
    private static final long CACHE_TTL = 300; // 5分钟
    
    // 查询订单(先查缓存)
    public Order getOrderById(Long orderId) {
        String key = ORDER_KEY_PREFIX + orderId;
        
        // 1. 先从Redis获取
        Order order = (Order) redisTemplate.opsForValue().get(key);
        if (order != null) {
            return order;
        }
        
        // 2. 缓存未命中,查询数据库
        order = orderMapper.selectById(orderId);
        if (order != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(key, order, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return order;
    }
    
    // 更新订单(删除缓存)
    public void updateOrder(Order order) {
        // 1. 更新数据库
        orderMapper.updateById(order);
        
        // 2. 删除缓存(延迟双删策略)
        String key = ORDER_KEY_PREFIX + order.getOrderId();
        redisTemplate.delete(key);
        
        // 延迟再次删除,防止主从延迟导致的脏数据
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(500);
                redisTemplate.delete(key);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    
    // 缓存穿透保护(布隆过滤器)
    public Order getOrderWithBloomFilter(Long orderId) {
        String key = ORDER_KEY_PREFIX + orderId;
        
        // 假设已有布隆过滤器判断订单是否存在
        if (!bloomFilter.mightContain(orderId)) {
            // 不存在的订单直接返回null,不查数据库
            return null;
        }
        
        return getOrderById(orderId);
    }
}

MySQL查询缓存(谨慎使用):

-- 查看查询缓存状态
SHOW VARIABLES LIKE 'query_cache_type';
SHOW VARIABLES LIKE 'query_cache_size';
SHOW STATUS LIKE 'Qcache%';

-- 查询缓存配置(MySQL 5.7及以下)
SET GLOBAL query_cache_size = 67108864;  -- 64MB
SET GLOBAL query_cache_type = ON;
SET GLOBAL query_cache_limit = 2097152;  -- 2MB,单条查询结果最大缓存大小

-- 注意:MySQL 8.0已移除查询缓存功能,建议使用外部缓存

4.2 缓存与数据库一致性策略

Cache-Aside模式(最常用):

读取流程:
1. 先读缓存
2. 缓存未命中,读数据库
3. 数据库数据写入缓存

更新流程:
1. 先更新数据库
2. 删除缓存

Write-Through模式:

更新流程:
1. 先更新缓存
2. 缓存再更新数据库
(较少使用,写入延迟高)

Write-Behind模式:

更新流程:
1. 先更新缓存
2. 缓存异步批量更新数据库
(数据一致性风险高,适用于允许短暂不一致场景)

五、读写分离与分库分表

5.1 主从复制配置

主库配置(my.cnf):

[mysqld]
# 主库唯一ID
server-id = 1

# 开启二进制日志
log_bin = mysql-bin
binlog_format = ROW  # 推荐ROW格式
expire_logs_days = 7  # 日志保留7天

# 需要同步的数据库(可选)
binlog_do_db = mydb

# 不需要同步的数据库(可选)
binlog_ignore_db = mysql
binlog_ignore_db = information_schema

从库配置(my.cnf):

[mysqld]
# 从库唯一ID,必须不同
server-id = 2

# 中继日志
relay_log = mysql-relay-bin
log_bin = mysql-bin

# 只读模式(防止误操作)
read_only = 1

# 级联复制时需要开启
log_slave_updates = 1

创建复制用户:

-- 在主库执行
CREATE USER 'repl'@'%' IDENTIFIED BY 'ReplPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 查看主库状态
SHOW MASTER STATUS;
-- 记录 File 和 Position 值

-- 在从库执行
CHANGE MASTER TO
MASTER_HOST='主库IP',
MASTER_USER='repl',
MASTER_PASSWORD='ReplPassword123!',
MASTER_LOG_FILE='mysql-bin.000001',  -- 从SHOW MASTER STATUS获取
MASTER_LOG_POS=1234;                 -- 从SHOW MASTER STATUS获取

-- 启动从库复制
START SLAVE;

-- 查看复制状态
SHOW SLAVE STATUS\G
-- 关注:Slave_IO_Running: Yes, Slave_SQL_Running: Yes

5.2 读写分离实现

ShardingSphere配置示例:

# sharding.yaml
dataSources:
  ds_0:  # 主库
    url: jdbc:mysql://master:3306/mydb
    username: root
    password: password
    connectionTimeout: 30000
    maxLifetime: 1800000
    maximumPoolSize: 50
    
  ds_1:  # 从库1
    url: jdbc:mysql://slave1:3306/mydb
    username: root
    password: password
    connectionTimeout: 30000
    maxLifetime: 1800000
    maximumPoolSize: 50
    
  ds_2:  # 从库2
    url: jdbc:mysql://slave2:3306/mydb
    username: root
    password: password
    connectionTimeout: 30000
    maxLifetime: 1800000
    maximumPoolSize: 50

shardingRule:
  masterSlaveRules:
    ds_ms:
      masterDataSourceName: ds_0
      slaveDataSourceNames:
        - ds_1
        - ds_2
      loadBalanceAlgorithmType: ROUND_ROBIN  # 轮询负载均衡
  
  tables:
    orders:
      actualDataNodes: ds_ms.orders
    users:
      actualDataNodes: ds_ms.users

应用层读写分离(Spring Boot):

@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
    
    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(routingDataSource());
        return bean.getObject();
    }
}

// 动态数据源上下文
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setMaster() {
        contextHolder.set("master");
    }
    
    public static void setSlave() {
        contextHolder.set("slave");
    }
    
    public static String get() {
        return contextHolder.get();
    }
    
    public static void clear() {
        contextHolder.remove();
    }
}

// AOP切面,根据方法名决定数据源
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.get*(..))")
    public void setSlaveDataSource(JoinPoint joinPoint) {
        DataSourceContextHolder.setSlave();
    }
    
    @Before("execution(* com.example.service.*.add*(..)) || " +
            "execution(* com.example.service.*.update*(..)) || " +
            "execution(* com.example.service.*.delete*(..))")
    public void setMasterDataSource(JoinPoint joinPoint) {
        DataSourceContextHolder.setMaster();
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void clearDataSource(JoinPoint joinPoint) {
        DataSourceContextHolder.clear();
    }
}

5.3 分库分表策略

垂直分库:

-- 按业务模块拆分数据库
-- 用户库:user_db
CREATE TABLE user_db.users (
    user_id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100)
);

-- 订单库:order_db
CREATE TABLE order_db.orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2)
);

-- 商品库:product_db
CREATE TABLE product_db.products (
    product_id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    price DECIMAL(10,2)
);

水平分表(按用户ID哈希):

-- 订单表分16张表
-- order_0 到 order_15
CREATE TABLE order_0 (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    order_time DATETIME,
    INDEX idx_user (user_id)
);

-- 分表规则:user_id % 16
-- 查询用户订单时,先计算表名
-- 例如:user_id = 1001,1001 % 16 = 9,查询 order_9

ShardingSphere分库分表配置:

dataSources:
  ds_0:  # 数据库0
    url: jdbc:mysql://db0:3306/mydb
    username: root
    password: password
    maximumPoolSize: 50
    
  ds_1:  # 数据库1
    url: jdbc:mysql://db1:3306/mydb
    username: root
    password: password
    maximumPoolSize: 50

shardingRule:
  tables:
    orders:
      actualDataNodes: ds_${0..1}.order_${0..3}  # 2库4表,共8张表
      tableStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: order_${user_id % 4}  # 按用户ID哈希分表
      databaseStrategy:
        inline:
          shardingColumn: user_id
          algorithmExpression: ds_${user_id % 2}     # 按用户ID哈希分库
  bindingTables:
    - orders
  defaultDatabaseStrategy:
    inline:
      shardingColumn: user_id
      algorithmExpression: ds_${user_id % 2}
  defaultTableStrategy:
    inline:
      shardingColumn: user_id
      algorithmExpression: order_${user_id % 4}

六、监控与诊断:持续性能优化

6.1 慢查询日志分析

开启慢查询日志:

-- 查看慢查询配置
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

-- 开启慢查询日志
SET GLOBAL slow_query_log = ON;
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 1;  -- 超过1秒的查询记录
SET GLOBAL log_queries_not_using_indexes = ON;  -- 记录未使用索引的查询

-- 分析慢查询日志
mysqldumpslow /var/log/mysql/slow.log

-- 或者使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

使用Performance Schema分析:

-- 查看最慢的查询
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT / 1000000000 as avg_time_ms,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看全表扫描的查询
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
WHERE SUM_ROWS_EXAMINED > 10000 AND SUM_ROWS_SENT < 100
ORDER BY SUM_ROWS_EXAMINED DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
ORDER BY COUNT_FETCH DESC
LIMIT 20;

6.2 实时性能监控

InnoDB状态监控:

-- 查看InnoDB实时状态(每秒刷新)
SHOW ENGINE INNODB STATUS\G

-- 关注以下关键指标:
-- 1. ROW OPERATIONS:行操作统计
-- 2. TRANSACTIONS:事务信息
-- 3. SEMAPHORES:锁等待信息
-- 4. FILE I/O:文件IO情况
-- 5. BUFFER POOL AND MEMORY:缓冲池使用情况

系统表监控:

-- 查看当前锁等待
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.innodb_lock_waits w
INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id;

-- 查看表级锁
SHOW OPEN TABLES WHERE In_use > 0;

-- 查看进程列表
SHOW FULL PROCESSLIST;
-- 关注State列:Locked, Waiting for table metadata lock, Waiting for table level lock

6.3 性能监控脚本

Python监控脚本示例:

#!/usr/bin/env python3
# mysql_monitor.py

import pymysql
import time
import smtplib
from email.mime.text import MIMEText

class MySQLMonitor:
    def __init__(self, host, user, password, db):
        self.conn = pymysql.connect(
            host=host, user=user, password=password, db=db,
            charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor
        )
        self.thresholds = {
            'connections': 80,  # 连接数使用率阈值(%)
            'slow_queries': 10,  # 慢查询数阈值(每分钟)
            'cpu_usage': 80,     # CPU使用率阈值(%)
            'memory_usage': 85   # 内存使用率阈值(%)
        }
    
    def check_connections(self):
        """检查连接数使用率"""
        with self.conn.cursor() as cursor:
            cursor.execute("""
                SELECT VARIABLE_VALUE as max_conn 
                FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES 
                WHERE VARIABLE_NAME = 'max_connections'
            """)
            max_conn = cursor.fetchone()['max_conn']
            
            cursor.execute("SELECT COUNT(*) as current_conn FROM INFORMATION_SCHEMA.PROCESSLIST")
            current_conn = cursor.fetchone()['current_conn']
            
            usage = (current_conn / max_conn) * 100
            
            return {
                'current': current_conn,
                'max': max_conn,
                'usage': usage,
                'alert': usage > self.thresholds['connections']
            }
    
    def check_slow_queries(self):
        """检查慢查询数量"""
        with self.conn.cursor() as cursor:
            cursor.execute("""
                SELECT COUNT(*) as slow_count 
                FROM INFORMATION_SCHEMA.PROCESSLIST 
                WHERE TIME > 10
            """)
            slow_count = cursor.fetchone()['slow_count']
            
            return {
                'count': slow_count,
                'alert': slow_count > self.thresholds['slow_queries']
            }
    
    def check_innodb_status(self):
        """检查InnoDB关键指标"""
        with self.conn.cursor() as cursor:
            cursor.execute("SHOW ENGINE INNODB STATUS")
            status = cursor.fetchone()['Status']
            
            # 解析关键指标
            metrics = {}
            for line in status.split('\n'):
                if 'History list length' in line:
                    metrics['history_list'] = int(line.split()[-1])
                elif 'queries inside InnoDB' in line:
                    metrics['queries_inside'] = int(line.split()[0])
                elif 'queries in queue' in line:
                    metrics['queries_queue'] = int(line.split()[0])
            
            return metrics
    
    def send_alert(self, message):
        """发送告警邮件"""
        msg = MIMEText(message)
        msg['Subject'] = 'MySQL性能告警'
        msg['From'] = 'monitor@example.com'
        msg['To'] = 'dba@example.com'
        
        try:
            server = smtplib.SMTP('smtp.example.com', 587)
            server.login('monitor@example.com', 'password')
            server.send_message(msg)
            server.quit()
            print("告警邮件已发送")
        except Exception as e:
            print(f"发送邮件失败: {e}")
    
    def run_monitor(self):
        """主监控循环"""
        while True:
            print(f"\n=== {time.strftime('%Y-%m-%d %H:%M:%S')} ===")
            
            # 检查连接数
            conn_status = self.check_connections()
            print(f"连接数: {conn_status['current']}/{conn_status['max']} ({conn_status['usage']:.1f}%)")
            if conn_status['alert']:
                alert_msg = f"警告:连接数使用率过高!{conn_status['usage']:.1f}%"
                print(alert_msg)
                self.send_alert(alert_msg)
            
            # 检查慢查询
            slow_status = self.check_slow_queries()
            print(f"慢查询: {slow_status['count']}个")
            if slow_status['alert']:
                alert_msg = f"警告:慢查询数量过多!{slow_status['count']}个"
                print(alert_msg)
                self.send_alert(alert_msg)
            
            # 检查InnoDB状态
            innodb_status = self.check_innodb_status()
            print(f"InnoDB历史列表长度: {innodb_status.get('history_list', 0)}")
            print(f"InnoDB内部查询: {innodb_status.get('queries_inside', 0)}")
            print(f"InnoDB等待队列: {innodb_status.get('queries_queue', 0)}")
            
            time.sleep(60)  # 每分钟检查一次

if __name__ == '__main__':
    monitor = MySQLMonitor(
        host='localhost',
        user='monitor',
        password='password',
        db='mydb'
    )
    monitor.run_monitor()

七、高级优化技巧

7.1 锁优化

减少锁竞争:

-- 优化前:长事务,持有锁时间长
BEGIN;
UPDATE orders SET status = 'paid' WHERE order_id = 1001;
-- 其他业务逻辑...
COMMIT;

-- 优化后:拆分事务,尽快提交
BEGIN;
UPDATE orders SET status = 'paid' WHERE order_id = 1001;
COMMIT;

-- 其他业务逻辑...
-- 如果需要,可以再开新事务

使用乐观锁:

-- 添加版本号字段
ALTER TABLE orders ADD COLUMN version INT DEFAULT 0;

-- 更新时检查版本号
UPDATE orders 
SET status = 'paid', version = version + 1
WHERE order_id = 1001 AND version = 0;

-- 检查影响行数,如果为0说明已被其他事务修改

避免间隙锁:

-- 间隙锁通常出现在范围查询或未使用索引的更新
-- 优化前:可能产生间隙锁
UPDATE orders SET status = 'cancelled' WHERE order_time > '2024-01-01';

-- 优化后:使用主键或唯一索引
UPDATE orders SET status = 'cancelled' WHERE order_id IN (
    SELECT order_id FROM orders WHERE order_time > '2024-01-01'
);

7.2 事务优化

事务最佳实践:

-- 1. 事务尽可能短
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE user_id = 1;
UPDATE accounts SET balance = balance + 100 WHERE user_id = 2;
COMMIT;  -- 尽快提交

-- 2. 避免在事务中执行外部调用
BEGIN;
UPDATE orders SET status = 'paid' WHERE order_id = 1001;
-- 不要在这里调用外部API
COMMIT;

-- 3. 合理设置隔离级别
-- 默认REPEATABLE READ,对于只读事务可使用READ COMMITTED
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 4. 使用SAVEPOINT处理异常
BEGIN;
SAVEPOINT sp1;
UPDATE orders SET status = 'paid' WHERE order_id = 1001;
-- 如果出错回滚到保存点
ROLLBACK TO SAVEPOINT sp1;
COMMIT;

7.3 参数动态调整

在线调整参数(无需重启):

-- 调整缓冲池大小(MySQL 5.7+支持在线调整)
SET GLOBAL innodb_buffer_pool_size = 12884901888;  -- 12GB

-- 调整连接数
SET GLOBAL max_connections = 2000;

-- 调整日志文件大小(需要重启,但可先调整其他参数)
SET GLOBAL innodb_log_file_size = 536870912;  -- 512MB

-- 调整IO能力(SSD环境)
SET GLOBAL innodb_io_capacity = 4000;
SET GLOBAL innodb_io_capacity_max = 8000;

-- 调整刷盘策略(权衡性能与安全性)
SET GLOBAL innodb_flush_log_at_trx_commit = 2;  -- 每秒刷盘

八、总结与最佳实践

8.1 高并发优化检查清单

架构层面:

  • [ ] 使用连接池,合理配置连接数
  • [ ] 实现读写分离,分散查询压力
  • [ ] 考虑分库分表,突破单机瓶颈
  • [ ] 引入多级缓存(Redis + 本地缓存)

数据库配置:

  • [ ] innodb_buffer_pool_size = 内存的50-70%
  • [ ] innodb_log_file_size = 256MB-2GB(根据写入量)
  • [ ] max_connections = 根据应用需求调整
  • [ ] 开启慢查询日志,定期分析

SQL优化:

  • [ ] 所有查询都有合适的索引
  • [ ] 避免SELECT *,只查询需要的列
  • [ ] 避免在索引列上使用函数
  • [ ] 使用批量操作替代单条操作
  • [ ] 优化深分页查询

应用层:

  • [ ] 实现限流和降级策略
  • [ ] 使用异步处理非核心逻辑
  • [ ] 监控关键指标,及时告警
  • [ ] 定期进行压力测试

8.2 性能优化原则

  1. 二八定律:80%的性能问题来自20%的慢查询,优先优化这些查询
  2. 空间换时间:合理使用缓存,减少数据库访问
  3. 异步化:非核心逻辑异步处理,提升响应速度
  4. 水平扩展:单机性能有限,通过架构设计实现线性扩展
  5. 持续监控:建立完善的监控体系,问题早发现早处理

8.3 常见误区避免

  • 误区1:索引越多越好 → 正确:索引会增加写入开销
  • 误区2:缓存一定能提升性能 → 正确:缓存一致性维护成本高
  • 误区3:分库分表是银弹 → 正确:会带来分布式事务等新问题
  • 误区4:配置参数调到最大最好 → 正确:需要根据业务特点调整

通过以上策略的综合应用,可以有效提升MySQL在高并发场景下的性能表现,确保系统稳定运行。记住,优化是一个持续的过程,需要根据业务发展和系统表现不断调整和完善。