引言:理解高并发挑战

在当今互联网应用中,高并发访问已成为常态。无论是电商秒杀、社交网络还是金融交易系统,MySQL数据库都面临着前所未有的并发压力。高并发场景下,数据库性能瓶颈往往成为系统稳定性的关键因素。本文将从基础的索引优化开始,逐步深入到架构升级,为您提供一套完整的MySQL高并发处理策略。

高并发的定义与影响

高并发通常指系统在同一时间段内处理大量请求的能力。当并发连接数超过MySQL的处理能力时,会出现连接超时、响应延迟甚至服务崩溃等问题。根据经验,当QPS(每秒查询数)超过10000或并发连接数超过200时,就需要重点关注数据库优化。

第一部分:索引优化——高并发的基础

1.1 索引设计的核心原则

索引是MySQL性能优化的基石。在高并发场景下,合理的索引设计可以将查询性能提升10-100倍。

1.1.1 覆盖索引(Covering Index)

覆盖索引是指索引包含查询所需的所有字段,避免回表操作。

-- 示例:用户订单查询
-- 原始查询(需要回表)
SELECT order_id, user_id, amount, status 
FROM orders 
WHERE user_id = 12345 AND status = 'paid';

-- 优化后的覆盖索引
ALTER TABLE orders ADD INDEX idx_user_status_amount (user_id, status, amount);

-- 优化后的查询(索引覆盖)
SELECT order_id, user_id, amount, status 
FROM orders 
WHERE user_id = 12345 AND status = 'paid';

原理说明:覆盖索引idx_user_status_amount包含了WHERE条件中的user_id和status,以及查询结果中需要的amount字段。这样MySQL可以直接从索引中获取所有数据,无需访问数据行,大幅减少I/O操作。

1.1.2 最左前缀原则与索引下推

-- 复合索引示例
ALTER TABLE users ADD INDEX idx_name_age_city (name, age, city);

-- 符合最左前缀的查询(高效)
SELECT * FROM users WHERE name = '张三';
SELECT * FROM users WHERE name = '张三' AND age = 25;
SELECT * FROM users WHERE name = '张三' AND age = 25 AND city = '北京';

-- 不符合最左前缀的查询(无法使用该索引)
SELECT * FROM users WHERE age = 25;
SELECT * FROM users WHERE city = '北京';

-- 部分符合的查询(索引下推优化)
SELECT * FROM users WHERE name = '张三' AND city = '北京';
-- MySQL 5.6+会使用索引下推优化,先过滤name,再在索引中过滤city

索引下推(ICP):MySQL 5.6引入的优化,可以在存储引擎层过滤不符合条件的索引记录,减少回表次数。

1.1.3 索引选择性与基数

-- 计算索引选择性
SELECT 
    COUNT(DISTINCT city) / COUNT(*) AS city_selectivity,
    COUNT(DISTINCT gender) / COUNT(*) AS gender_selectivity,
    COUNT(DISTINCT age) / COUNT(*) AS age_selectivity
FROM users;

-- 高选择性字段(选择性>0.1)适合建索引
-- 低选择性字段(如gender)不适合单独建索引,但可作为复合索引的一部分

选择性原则:选择性越高,索引效率越高。通常选择性>0.1的字段适合单独建索引。

1.2 索引维护与监控

1.2.1 索引使用情况监控

-- 查看索引使用情况
SELECT 
    object_schema,
    object_name,
    index_name,
    count_star,
    count_read,
    count_write,
    count_fetch,
    count_insert,
    count_update,
    count_delete
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE object_schema = 'your_database'
ORDER BY count_star DESC;

-- 查找未使用的索引(可能浪费空间和写性能)
SELECT 
    object_schema,
    object_name,
    index_name
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE index_name IS NOT NULL
AND count_star = 0
AND object_schema != 'mysql';

1.2.2 索引碎片整理

-- 查看表碎片情况
SELECT 
    table_name,
    ROUND((data_length + index_length) / 1024 / 1024, 2) AS total_mb,
    ROUND(data_free / 1024 / 1024, 2) AS free_mb,
    ROUND(data_free / (data_length + index_length) * 100, 2) AS free_percent
FROM information_schema.tables
WHERE table_schema = 'your_database';

-- 优化表(整理碎片)
OPTIMIZE TABLE orders;

-- 在线DDL(MySQL 5.6+)
ALTER TABLE orders ALGORITHM=INPLACE, LOCK=NONE;

第二部分:SQL语句优化

2.1 避免索引失效的常见场景

2.1.1 隐式类型转换

-- 错误示例:phone字段是varchar类型,但用数字查询
SELECT * FROM users WHERE phone = 13800138000;
-- 索引失效,因为发生了类型转换

-- 正确示例
SELECT * FROM users WHERE phone = '13800138000';

2.1.2 函数操作导致索引失效

-- 错误示例:对索引列使用函数
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 索引失效

-- 正确示例:使用范围查询
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
AND create_time < '2024-01-02 00:00:00';

2.1.3 LIKE查询优化

-- 错误示例:前导模糊查询
SELECT * FROM users WHERE name LIKE '%张三';
-- 索引失效

-- 正确示例:后缀模糊查询(可使用索引)
SELECT * FROM users WHERE name LIKE '张三%';

-- 高级方案:全文索引
ALTER TABLE articles ADD FULLTEXT INDEX ft_content (content);
SELECT * FROM articles WHERE MATCH(content) AGAINST('数据库' IN NATURAL LANGUAGE MODE);

2.2 分页查询优化

2.2.1 传统分页的性能问题

-- 传统分页(深度分页问题)
SELECT * FROM orders ORDER BY id LIMIT 1000000, 20;
-- 性能急剧下降,因为需要扫描前1000000行

2.2.2 优化方案1:延迟关联

-- 延迟关联(先获取主键,再关联详情)
SELECT o.* 
FROM orders o
JOIN (
    SELECT id 
    FROM orders 
    ORDER BY id 
    LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

2.2.3 优化方案2:位置记录法

-- 业务层记录上次查询的位置
-- 第一次查询
SELECT * FROM orders WHERE id > 0 ORDER BY id LIMIT 20;
-- 返回最后一条记录的id=100020

-- 第二次查询
SELECT * FROM orders WHERE id > 100020 ORDER BY id LIMIT 20;

2.3 批量操作优化

2.3.1 批量插入优化

-- 低效:逐条插入
INSERT INTO orders (user_id, amount) VALUES (1, 100);
INSERT INTO orders (user_id, amount) VALUES (2, 200);
-- ... 1000次

-- 高效:批量插入
INSERT INTO orders (user_id, amount) VALUES 
(1, 100),
(2, 200),
(3, 300),
-- ... 1000条
(1000, 100000);

-- 更高效:LOAD DATA INFILE(适合大数据量)
LOAD DATA LOCAL INFILE '/tmp/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount);

2.3.2 批量更新优化

-- 低效:逐条更新
UPDATE orders SET status = 'paid' WHERE id = 1;
UPDATE orders SET status = 'paid' WHERE id = 2;
-- ...

-- 高效:CASE WHEN批量更新
UPDATE orders 
SET status = CASE id
    WHEN 1 THEN 'paid'
    WHEN 2 THEN 'paid'
    WHEN 3 THEN 'paid'
END
WHERE id IN (1, 2, 3);

-- 或者使用INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO orders (id, user_id, amount, status) VALUES 
(1, 1, 100, 'paid'),
(2, 2, 200, 'paid')
ON DUPLICATE KEY UPDATE
status = VALUES(status),
amount = VALUES(amount);

第三部分:数据库配置优化

3.1 InnoDB核心参数调优

3.1.1 缓冲池配置

# my.cnf 配置示例
[mysqld]
# 缓冲池大小(通常设置为物理内存的50-70%)
innodb_buffer_pool_size = 8G

# 缓冲池实例数(CPU核心数的1-2倍,最大64)
innodb_buffer_pool_instances = 8

# 缓冲池预热(MySQL 5.7+)
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON

# 页大小(默认16K,可根据业务调整)
innodb_page_size = 16384

3.1.2 日志文件配置

# 重做日志文件大小(建议1-2G)
innodb_log_file_size = 2G

# 日志文件组数(建议2-3)
innodb_log_files_in_group = 3

# 刷新策略(0:每秒,1:每次提交,2:每次提交但每秒刷盘)
innodb_flush_log_at_trx_commit = 1  # ACID严格模式
# 高并发非事务场景可设为2或0提升性能,但有数据丢失风险

# 刷新方法(O_DIRECT避免双缓存)
innodb_flush_method = O_DIRECT

3.1.3 并发相关参数

# 最大并发线程数(高并发需调大)
thread_cache_size = 50
max_connections = 500

# InnoDB并发线程数(MySQL 5.5+)
innodb_thread_concurrency = 0  # 0表示不限制

# 读写队列大小
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 合并排序缓冲区
innodb_sort_buffer_size = 2M

3.2 查询缓存与连接管理

3.2.1 查询缓存(MySQL 8.0已移除)

# MySQL 5.7及以下
query_cache_type = 0  # 建议关闭,高并发下有锁竞争问题
query_cache_size = 0

3.2.2 连接管理优化

# 连接超时设置
wait_timeout = 600  # 非交互连接超时时间(秒)
interactive_timeout = 600

# 连接队列
back_log = 500  # TCP连接等待队列长度

# 连接限制
max_connect_errors = 100000  # 防止暴力破解

3.3 临时表与排序优化

# 内存临时表大小限制
tmp_table_size = 64M
max_heap_table_size = 64M

# 磁盘临时表使用InnoDB(MySQL 5.7+)
internal_tmp_disk_storage_engine = InnoDB

# 排序缓冲区
sort_buffer_size = 2M  # 每个线程分配,不宜过大

# Join缓冲区
join_buffer_size = 2M  # 每个线程分配,非索引Join时使用

第四部分:架构升级策略

4.1 读写分离架构

4.1.1 主从复制原理

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW  # 推荐ROW格式
expire_logs_days = 7
sync_binlog = 1

-- 从库配置
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1  # 从库只读
log_slave_updates = 1  # 级联复制需要

-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

-- 在从库执行
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=123456;

START SLAVE;
SHOW SLAVE STATUS\G

4.1.2 应用层读写分离

// Java Spring Boot 示例
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        return new DynamicDataSource();
    }
    
    // 动态数据源路由
    public static class DynamicDataSource extends AbstractRoutingDataSource {
        @Override
        protected Object determineCurrentLookupKey() {
            return TransactionSynchronizationManager.isCurrentTransactionReadOnly() 
                ? "slave" : "master";
        }
    }
}

4.1.3 中间件方案(ShardingSphere)

# ShardingSphere 配置示例
spring:
  shardingsphere:
    datasource:
      names: master, slave0, slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master:3306/db
        username: root
        password: password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave0:3306/db
        username: root
        password: 123456
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1:3306/db
        username: root
        123456
    rules:
      replica-query:
        data-sources:
          ds_0:
            primary-data-source-name: master
            replica-data-source-names: slave0,slave1
            load-balance-algorithm-type: ROUND_ROBIN

4.2 分库分表架构

4.2.1 垂直分库

-- 按业务模块拆分数据库
-- 原始单库
-- user_db: 用户相关表
-- order_db: 订单相关表
-- product_db: 商品相关表

-- 示例:订单库独立
CREATE DATABASE order_db;
USE order_db;
CREATE TABLE orders (...);
CREATE TABLE order_items (...);
CREATE TABLE order_logistics (...);

4.2.2 水平分表(Sharding)

-- 按用户ID取模分表(订单表)
-- orders_0, orders_1, orders_2, orders_3

-- 分表规则:user_id % 4
-- 应用层路由逻辑
public class OrderSharding {
    public static String getTableName(Long userId) {
        int index = (int) (userId % 4);
        return "orders_" + index;
    }
    
    // 查询示例
    public List<Order> getOrdersByUser(Long userId) {
        String tableName = getTableName(userId);
        String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";
        return jdbcTemplate.query(sql, new Object[]{userId}, rowMapper);
    }
}

4.2.3 使用ShardingSphere实现分片

// ShardingSphere 分片算法
public class UserIdShardingAlgorithm implements StandardShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, ShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        int size = availableTargetNames.size();
        int index = (int) (userId % size);
        
        for (String tableName : availableTargetNames) {
            if (tableName.endsWith("_" + index)) {
                return tableName;
            }
        }
        throw new UnsupportedOperationException();
    }
    
    @Override
    public Properties getProps() {
        return null;
    }
    
    @Override
    public void init(Properties properties) {
    }
}

// 配置分片规则
spring:
  shardingsphere:
    rules:
      sharding:
        tables:
          orders:
            actual-data-nodes: ds.orders_$->{0..3}
            table-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: user_id_mod
        sharding-algorithms:
          user_id_mod:
            type: CLASS_BASED
            props:
              strategy: STANDARD
              algorithmClassName: com.example.UserIdShardingAlgorithm

4.3 缓存层引入

4.3.1 Redis缓存策略

// 缓存穿透保护
public class CacheService {
    
    // 缓存空值防止穿透
    public User getUser(Long id) {
        String key = "user:" + id;
        String cached = redis.get(key);
        
        if (cached != null) {
            if (cached.equals("NULL")) {
                return null; // 缓存空值
            }
            return JSON.parseObject(cached, User.class);
        }
        
        User user = userMapper.selectById(id);
        if (user == null) {
            redis.setex(key, 60, "NULL"); // 缓存空值60秒
        } else {
            redis.setex(key, 3600, JSON.toJSONString(user));
        }
        return user;
    }
    
    // 缓存击穿保护(互斥锁)
    public User getUserWithLock(Long id) {
        String key = "user:" + id;
        String cached = redis.get(key);
        if (cached != null) {
            return JSON.parseObject(cached, User.class);
        }
        
        // 获取分布式锁
        String lockKey = "lock:" + id;
        boolean locked = redis.setnx(lockKey, "1", 10);
        
        if (locked) {
            try {
                User user = userMapper.selectById(id);
                if (user != null) {
                    redis.setex(key, 3600, JSON.toJSONString(user));
                } else {
                    redis.setex(key, 60, "NULL");
                }
                return user;
            } finally {
                redis.del(lockKey);
            }
        } else {
            // 等待并重试
            Thread.sleep(100);
            return getUserWithLock(id);
        }
    }
}

4.3.2 缓存与数据库一致性

// 先更新数据库,再删除缓存(Cache Aside Pattern)
public void updateUser(User user) {
    // 1. 更新数据库
    userMapper.update(user);
    
    // 2. 删除缓存(让下次查询重新加载)
    String key = "user:" + user.getId();
    redis.del(key);
}

// 延迟双删策略(处理主从延迟)
public void updateUserWithDelayDelete(Long userId) {
    userMapper.update(user);
    redis.del("user:" + userId);
    
    // 延迟再次删除
    scheduledExecutor.schedule(() -> {
        redis.del("user:" + userId);
    }, 500, TimeUnit.MILLISECONDS);
}

4.4 高可用架构

4.4.1 MHA(Master High Availability)

# MHA 配置示例
# manager节点配置:/etc/mha/app1.cnf
[server default]
user=mha
password=mha123
ping_interval=3
master_binlog_dir=/var/lib/mysql

[server1]
hostname=master1
candidate_master=1

[server2]
hostname=slave1
candidate_master=1

[server3]
hostname=slave2
no_master=1

# 启动MHA manager
masterha_manager --conf=/etc/mha/app1.cnf

# 检查状态
masterha_check_status --conf=/etc/mha/app1.cnf

4.4.2 MGR(MySQL Group Replication)

-- MGR 配置(MySQL 5.7+)
-- 1. 配置my.cnf
[mysqld]
server_id = 1
gtid_mode = ON
enforce_gtid_consistency = ON
binlog_checksum = NONE
log_slave_updates = ON
binlog_format = ROW

plugin_load_add = group_replication.so
group_replication_group_name = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
group_replication_start_on_boot = off
group_replication_local_address = "192.168.1.101:33061"
group_replication_group_seeds = "192.168.1.101:33061,192.168.1.102:33061,192.168.1.103:33061"
group_replication_bootstrap_group = off

-- 2. 安装插件
INSTALL PLUGIN group_replication SONAME 'group_replication.so';

-- 3. 启动集群(第一个节点)
SET GLOBAL group_replication_bootstrap_group=ON;
START GROUP_REPLICATION;
SET GLOBAL group_replication_bootstrap_group=OFF;

-- 4. 其他节点加入
START GROUP_REPLICATION;

-- 5. 查看集群状态
SELECT * FROM performance_schema.replication_group_members;

第五部分:监控与诊断

5.1 性能监控指标

5.1.1 关键性能指标

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Threads_running';

-- 查看QPS
SHOW GLOBAL STATUS LIKE 'Queries';
-- 间隔1秒再次查询,差值即为QPS

-- 查看TPS
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';
-- (Com_commit + Com_rollback) 即为TPS

-- 查看InnoDB缓冲池命中率
SELECT 
    (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 AS buffer_hit_rate
FROM performance_schema.global_status 
WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads';

-- 理想值 > 99%

5.1.2 慢查询日志分析

# my.cnf 配置
slow_query_log = ON
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录超过1秒的查询
log_queries_not_using_indexes = ON
min_examined_row_limit = 0
log_slow_admin_statements = ON
log_slow_slave_statements = ON
# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

# 使用pt-query-digest分析(更强大)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 实时分析
pt-query-digest --processlist h=localhost --interval=10

5.1.3 使用Performance Schema

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
    MAX_TIMER_WAIT/1000000000000 AS max_time_sec,
    SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看表I/O情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    SUM_TIMER_WAIT/1000000000000 AS total_time_sec,
    COUNT_READ,
    COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_READ,
    COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_READ DESC;

5.2 实时诊断工具

5.2.1 SHOW PROCESSLIST

-- 查看当前所有连接
SHOW FULL PROCESSLIST;

-- 筛选活跃连接
SELECT * FROM information_schema.processlist 
WHERE COMMAND != 'Sleep' 
AND TIME > 10
ORDER BY TIME DESC;

-- 查看正在执行的事务
SELECT * FROM information_schema.innodb_trx;

5.2.2 InnoDB状态分析

-- 查看InnoDB引擎状态
SHOW ENGINE INnoDB STATUS\G

-- 重点关注:
-- 1. TRANSACTIONS - 当前事务信息
-- 2. FILE I/O - 文件I/O状态
-- 3. BUFFER POOL AND MEMORY - 缓冲池使用情况
-- 4. ROW OPERATIONS - 行操作统计

5.3 第三方监控工具

5.3.1 Percona Toolkit

# 安装
apt-get install percona-toolkit

# 常用命令
# 1. 慢查询日志分析
pt-query-digest slow.log

# 2. 表结构同步
pt-table-sync --execute h=master,D=db,t=orders h=slave

# 3. 主从一致性检查
pt-table-checksum h=master --databases=db --tables=orders

# 4. 在线DDL
pt-online-schema-change --alter "ADD INDEX idx_user (user_id)" D=db,t=orders --execute

5.3.2 Prometheus + Grafana 监控

# mysqld_exporter 配置
# /etc/default/mysqld_exporter
DATA_SOURCE_NAME="user:password@(localhost:3306)/"

# docker-compose.yml
version: '3'
services:
  mysqld_exporter:
    image: prom/mysqld-exporter
    environment:
      DATA_SOURCE_NAME: "user:password@(mysql:3306)/"
    ports:
      - "9104:9104"

  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"

第六部分:实战案例分析

6.1 案例1:电商秒杀系统优化

6.1.1 问题场景

  • 10000 QPS的秒杀活动
  • 库存扣减导致大量锁等待
  • 数据库连接池耗尽

6.1.2 优化方案

-- 1. 库存表设计优化
CREATE TABLE inventory (
    product_id BIGINT PRIMARY KEY,
    stock INT NOT NULL,
    version INT NOT NULL DEFAULT 0,  -- 乐观锁版本号
    INDEX idx_stock (stock)  -- 库存索引
) ENGINE=InnoDB;

-- 2. 乐观锁扣减库存
UPDATE inventory 
SET stock = stock - 1, version = version + 1
WHERE product_id = 123 
AND stock > 0 
AND version = ?;  -- 传入当前版本号

-- 3. 如果更新失败,说明库存不足或被其他请求修改
-- 应用层重试或返回失败

-- 4. 异步写入订单
-- 使用Redis队列缓冲订单创建请求
-- 后台任务批量处理
// Java 代码示例
public class SeckillService {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    // 秒杀请求处理
    public Result seckill(Long userId, Long productId) {
        String lockKey = "seckill:" + productId;
        
        // 1. Redis预减库存(内存标记)
        Long stock = redisTemplate.opsForValue().decrement("stock:" + productId);
        if (stock < 0) {
            redisTemplate.opsForValue().increment("stock:" + productId); // 回滚
            return Result.fail("库存不足");
        }
        
        // 2. 获取分布式锁
        boolean locked = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, "1", 5, TimeUnit.SECONDS);
        
        if (!locked) {
            return Result.fail("系统繁忙,请重试");
        }
        
        try {
            // 3. 数据库扣减库存(乐观锁)
            int updated = jdbcTemplate.update(
                "UPDATE inventory SET stock = stock - 1, version = version + 1 " +
                "WHERE product_id = ? AND stock > 0 AND version = ?",
                productId, getCurrentVersion(productId)
            );
            
            if (updated == 0) {
                return Result.fail("库存不足");
            }
            
            // 4. 异步创建订单
            sendOrderMessage(userId, productId);
            
            return Result.success("秒杀成功");
            
        } finally {
            redisTemplate.delete(lockKey);
        }
    }
    
    // 批量处理订单(后台任务)
    @Scheduled(fixedDelay = 1000)
    public void processOrders() {
        List<OrderMessage> messages = pollOrderMessages(100);
        if (messages.isEmpty()) return;
        
        // 批量插入订单
        String sql = "INSERT INTO orders (user_id, product_id, status) VALUES (?, ?, 'created')";
        jdbcTemplate.batchUpdate(sql, messages, 100, (ps, argument) -> {
            ps.setLong(1, argument.getUserId());
            ps.setLong(2, argument.getProductId());
        });
    }
}

6.1.3 效果

  • QPS从500提升到12000
  • 数据库连接数从200降到20
  • 响应时间从500ms降到50ms

6.2 案例2:社交Feed流优化

6.2.1 问题场景

  • 用户发布内容后,所有粉丝需要看到
  • 传统查询方式:SELECT * FROM posts WHERE user_id IN (SELECT follower_id FROM followers WHERE followee_id = ?)
  • 性能极差,无法扩展

6.2.2 优化方案:推模式(Write Fanout)

-- 1. 用户Feed表(按用户分表)
CREATE TABLE user_feed_0 (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    post_id BIGINT NOT NULL,
    create_time DATETIME NOT NULL,
    INDEX idx_user_time (user_id, create_time DESC)
) ENGINE=InnoDB;

-- 2. 推模式写入
-- 当用户A发布内容时,写入所有粉丝的Feed表
public void publishPost(Long userId, String content) {
    // 1. 创建帖子
    Long postId = createPost(userId, content);
    
    // 2. 获取粉丝列表
    List<Long> followerIds = getFollowers(userId);
    
    // 3. 批量写入Feed(分批处理,避免单次过大)
    int batchSize = 1000;
    for (int i = 0; i < followerIds.size(); i += batchSize) {
        List<Long> batch = followerIds.subList(i, Math.min(i + batchSize, followerIds.size()));
        batchInsertFeed(batch, postId);
    }
}

private void batchInsertFeed(List<Long> userIds, Long postId) {
    StringBuilder sql = new StringBuilder("INSERT INTO user_feed_0 (user_id, post_id, create_time) VALUES ");
    List<Object[]> args = new ArrayList<>();
    
    for (Long userId : userIds) {
        sql.append("(?, ?, NOW()),");
        args.add(new Object[]{userId, postId});
    }
    
    // 移除最后一个逗号
    sql.deleteCharAt(sql.length() - 1);
    
    jdbcTemplate.batchUpdate(sql.toString(), args);
}

-- 4. Feed查询(非常高效)
SELECT post_id, create_time 
FROM user_feed_0 
WHERE user_id = ? 
ORDER BY create_time DESC 
LIMIT 20;

6.2.3 冷启动优化(拉模式)

-- 对于粉丝数少的用户,采用拉模式
public List<Post> getFeed(Long userId) {
    // 1. 获取用户关注列表
    List<Long> followees = getFollowees(userId);
    
    if (followees.size() < 100) {
        // 粉丝少,拉模式
        return getPostsByFollowees(followees);
    } else {
        // 粉丝多,推模式(Feed表)
        return getFromFeedTable(userId);
    }
}

private List<Post> getPostsByFollowees(List<Long> followees) {
    String sql = "SELECT * FROM posts WHERE user_id IN (?) ORDER BY create_time DESC LIMIT 50";
    return jdbcTemplate.query(sql, new Object[]{String.join(",", followees)}, rowMapper);
}

6.2.4 混合模式(推拉结合)

// 推拉结合方案
public class FeedService {
    
    // 发布内容时,只推送给在线用户
    public void publishPost(Long userId, String content) {
        Long postId = createPost(userId, content);
        
        // 获取在线粉丝(Redis记录)
        List<Long> onlineFollowers = getOnlineFollowers(userId);
        if (!onlineFollowers.isEmpty()) {
            batchInsertFeed(onlineFollowers, postId);
        }
        
        // 离线用户在下次登录时拉取
    }
    
    // 查询Feed时,合并推和拉的数据
    public List<Post> getFeed(Long userId, Long lastId) {
        // 1. 从Feed表获取(推)
        List<Post> pushed = getPushedFeed(userId, lastId);
        
        // 2. 从关注用户拉取(拉)
        List<Post> pulled = getPullFeed(userId, lastId);
        
        // 3. 合并并去重
        return mergeAndDeduplicate(pushed, pulled);
    }
}

6.3 案例3:历史数据归档与冷热分离

6.3.1 问题场景

  • 订单表数据量达到10亿,查询性能急剧下降
  • 95%的查询集中在最近3个月的数据
  • 历史数据很少查询但占用大量空间

6.3.2 优化方案

-- 1. 创建历史表(按月分区)
CREATE TABLE orders_history_202301 LIKE orders;
CREATE TABLE orders_history_202302 LIKE orders;
-- ...

-- 2. 原表改为只存储最近3个月数据(分区表)
CREATE TABLE orders (
    id BIGINT NOT NULL AUTO_INCREMENT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    create_time DATETIME NOT NULL,
    PRIMARY KEY (id, create_time)
) PARTITION BY RANGE COLUMNS(create_time) (
    PARTITION p202401 VALUES LESS THAN ('2024-02-01'),
    PARTITION p202402 VALUES LESS THAN ('2024-03-01'),
    PARTITION p202403 VALUES LESS THAN ('2024-04-01'),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 3. 数据归档脚本
DELIMITER $$
CREATE PROCEDURE archive_orders(IN cutoff_date DATE)
BEGIN
    -- 插入历史表
    INSERT INTO orders_history_202312
    SELECT * FROM orders 
    WHERE create_time < cutoff_date;
    
    -- 删除原表数据(使用分区交换,快速)
    ALTER TABLE orders EXCHANGE PARTITION p202312 WITH TABLE orders_history_202312;
    
    -- 删除分区(快速)
    ALTER TABLE orders DROP PARTITION p202312;
    
    -- 添加新分区
    ALTER TABLE orders 
    ADD PARTITION (PARTITION p202404 VALUES LESS THAN ('2024-05-01'));
END$$
DELIMITER ;

-- 4. 归档调度
-- 使用事件调度器
CREATE EVENT archive_event
ON SCHEDULE EVERY 1 MONTH
STARTS '2024-02-01 02:00:00'
DO
    CALL archive_orders('2024-01-01');

-- 或使用cron定时任务
# 0 2 1 * * mysql -u root -p'password' -e "CALL archive_orders('2024-01-01');"

6.3.3 应用层路由

// 根据时间路由到不同表
public class OrderQueryService {
    
    public Order getOrder(Long orderId, LocalDate orderDate) {
        String tableName = getTableName(orderDate);
        String sql = "SELECT * FROM " + tableName + " WHERE id = ?";
        return jdbcTemplate.queryForObject(sql, new Object[]{orderId}, rowMapper);
    }
    
    private String getTableName(LocalDate date) {
        if (date.isAfter(LocalDate.now().minusMonths(3))) {
            return "orders";  // 热数据
        } else {
            // 归档表按月分
            return "orders_history_" + date.getYear() + 
                   String.format("%02d", date.getMonthValue());
        }
    }
    
    // 跨时间范围查询
    public List<Order> searchOrders(LocalDate start, LocalDate end) {
        List<String> tables = getAffectedTables(start, end);
        List<Order> result = new ArrayList<>();
        
        for (String table : tables) {
            String sql = "SELECT * FROM " + table + 
                        " WHERE create_time BETWEEN ? AND ?";
            result.addAll(jdbcTemplate.query(sql, 
                new Object[]{start.atStartOfDay(), end.atStartOfDay()}, 
                rowMapper));
        }
        
        return result;
    }
}

第七部分:高级优化技巧

7.1 并发控制策略

7.1.1 乐观锁 vs 悲观锁

-- 乐观锁(适合读多写少)
CREATE TABLE product (
    id BIGINT PRIMARY KEY,
    stock INT,
    version INT
);

-- 更新时检查版本
UPDATE product 
SET stock = stock - 1, version = version + 1
WHERE id = 123 AND version = ?;

-- 悲观锁(适合写多读少)
-- 事务中使用SELECT ... FOR UPDATE
BEGIN;
SELECT stock FROM product WHERE id = 123 FOR UPDATE;
UPDATE product SET stock = stock - 1 WHERE id = 123;
COMMIT;

7.1.2 死锁检测与避免

-- 查看最近死锁信息
SHOW ENGINE INnoDB STATUS\G

-- 查看死锁历史(MySQL 8.0+)
SELECT * FROM performance_schema.events_errors_summary_global_by_error_name 
WHERE ERROR_NAME = 'DEADLOCK';

-- 应用层避免死锁策略:
-- 1. 固定加锁顺序
-- 2. 减少事务持有时间
-- 1. 重试机制
// 死锁重试机制
public <T> T executeWithRetry(TransactionalCallback<T> callback) {
    int maxRetries = 3;
    int retryCount = 0;
    
    while (retryCount < maxRetries) {
        try {
            return callback.doInTransaction();
        } catch (DeadlockException e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                throw e;
            }
            // 指数退避
            try {
                Thread.sleep((long) (Math.random() * Math.pow(2, retryCount) * 100));
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(ie);
            }
        }
    }
    throw new RuntimeException("Max retries exceeded");
}

7.2 批量操作优化

7.2.1 批量插入优化

-- 1. 使用LOAD DATA INFILE(最快)
LOAD DATA LOCAL INFILE '/tmp/data.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount, status);

-- 2. 批量INSERT(合并多条)
INSERT INTO orders (user_id, amount, status) VALUES 
(1, 100, 'paid'),
(2, 200, 'paid'),
-- ... 1000条
(1000, 100000, 'paid');

-- 3. 使用INSERT DELAYED(MySQL 5.7,已废弃)
-- 改用异步队列

-- 4. 关闭自动提交
SET autocommit = 0;
-- 批量插入
COMMIT;
SET autocommit = 1;

7.2.2 批量更新优化

-- 1. CASE WHEN批量更新
UPDATE orders 
SET status = CASE id
    WHEN 1 THEN 'paid'
    WHEN 2 THEN 'paid'
    WHEN 3 THEN 'completed'
END,
amount = CASE id
    WHEN 1 THEN 100
    WHEN 2 THEN 200
    WHEN 3 THEN 300
END
WHERE id IN (1, 2, 3);

-- 2. INSERT ... ON DUPLICATE KEY UPDATE
INSERT INTO orders (id, user_id, amount, status) VALUES 
(1, 1, 100, 'paid'),
(2, 2, 200, 'paid')
ON DUPLICATE KEY UPDATE
status = VALUES(status),
amount = VALUES(amount),
update_time = NOW();

-- 3. REPLACE INTO(先删除后插入)
REPLACE INTO orders (id, user_id, amount) VALUES (1, 1, 100);

7.3 字符集与排序规则优化

-- 1. 选择合适的字符集(utf8mb4)
ALTER DATABASE your_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 2. 表级别设置
ALTER TABLE users CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 3. 列级别设置
ALTER TABLE users MODIFY name VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 4. 排序规则选择
-- utf8mb4_unicode_ci: 通用排序,支持多语言
-- utf8mb4_bin: 二进制排序,区分大小写
-- utf8mb4_general_ci: 更快但准确性稍低

-- 5. 连接级别设置
SET NAMES utf8mb4;

第八部分:MySQL 8.0 新特性应用

8.1 窗口函数

-- 1. 排名函数(替代复杂的自连接)
-- 查询每个用户的订单排名
SELECT 
    user_id,
    order_id,
    amount,
    ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY amount DESC) as rank
FROM orders;

-- 2. 聚合窗口函数
-- 计算累计销售额
SELECT 
    order_date,
    daily_sales,
    SUM(daily_sales) OVER (ORDER BY order_date) as cumulative_sales
FROM (
    SELECT DATE(create_time) as order_date, SUM(amount) as daily_sales
    FROM orders
    GROUP BY DATE(create_time)
) daily;

-- 3. LAG/LEAD函数(访问前后行数据)
SELECT 
    user_id,
    order_date,
    amount,
    LAG(amount, 1) OVER (PARTITION BY user_id ORDER BY order_date) as prev_amount,
    LEAD(amount, 1) OVER (PARTITION BY user_id ORDER BY order_date) as next_amount
FROM orders;

8.2 CTE(公用表表达式)

-- 1. 简单CTE
WITH user_stats AS (
    SELECT user_id, COUNT(*) as order_count, SUM(amount) as total_amount
    FROM orders
    GROUP BY user_id
)
SELECT u.name, us.order_count, us.total_amount
FROM users u
JOIN user_stats us ON u.id = us.user_id
WHERE us.total_amount > 1000;

-- 2. 递归CTE(树形结构查询)
WITH RECURSIVE org_tree AS (
    -- 锚点:根部门
    SELECT id, name, parent_id, 0 as level
    FROM departments
    WHERE parent_id IS NULL
    
    UNION ALL
    
    -- 递归:子部门
    SELECT d.id, d.name, d.parent_id, t.level + 1
    FROM departments d
    JOIN org_tree t ON d.parent_id = t.id
)
SELECT * FROM org_tree ORDER BY level;

8.3 原子DDL

-- MySQL 8.0+ 支持原子DDL,失败自动回滚
-- 以下操作都是原子的:
ALTER TABLE orders ADD COLUMN new_col INT;
CREATE INDEX idx_user ON orders(user_id);
DROP TABLE orders;

-- 查看DDL日志
SHOW VARIABLES LIKE 'ddl_%';

8.4 自增主键持久化

-- MySQL 8.0 自增主键持久化,重启后不会重复
-- 查看当前自增值
SELECT AUTO_INCREMENT 
FROM information_schema.tables 
WHERE table_name = 'orders' AND table_schema = 'your_db';

-- 重置自增值(谨慎使用)
ALTER TABLE orders AUTO_INCREMENT = 1;

第九部分:总结与最佳实践

9.1 高并发优化检查清单

9.1.1 索引优化检查

  • [ ] 所有查询都使用了合适的索引
  • [ ] 索引选择性>0.1
  • [ ] 避免了索引失效(函数、隐式转换)
  • [ ] 覆盖索引减少回表
  • [ ] 定期清理未使用索引

9.1.2 SQL优化检查

  • [ ] 避免SELECT *
  • [ ] 使用LIMIT限制结果集
  • [ ] 避免深度分页
  • [ ] 批量操作替代循环
  • [ ] 使用EXPLAIN分析执行计划

9.1.3 配置优化检查

  • [ ] innodb_buffer_pool_size设置合理(物理内存50-70%)
  • [ ] innodb_log_file_size足够大(1-2G)
  • [ ] 连接池大小适当
  • [ ] 慢查询日志开启
  • [ ] 并发参数调优

9.1.4 架构优化检查

  • [ ] 读写分离实施
  • [ ] 缓存层引入(Redis)
  • [ ] 分库分表策略
  • [ ] 高可用方案(MHA/MGR)
  • [ ] 监控告警体系

9.2 性能优化原则

9.2.1 80/20原则

  • 80%的性能问题由20%的SQL导致
  • 优先优化最慢的SQL
  • 优先优化最高频的查询

9.2.2 三层优化法

  1. 应用层:缓存、批量、异步
  2. 数据库层:索引、SQL、配置
  3. 架构层:读写分离、分库分表

9.2.3 数据驱动优化

  • 基于监控数据决策
  • A/B测试优化效果
  • 持续迭代改进

9.3 常见误区

9.3.1 过早优化

  • 不要凭感觉优化
  • 先测量,再优化
  • 优化前建立性能基线

9.3.2 过度索引

  • 索引越多越好 ❌
  • 索引会降低写性能
  • 每个索引都要有明确的查询场景

9.3.3 忽视硬件

  • 内存不足导致频繁I/O
  • 磁盘性能差(建议SSD)
  • 网络延迟影响主从同步

9.4 持续优化建议

9.4.1 建立性能基线

-- 定期记录关键指标
CREATE TABLE performance_baseline (
    id INT AUTO_INCREMENT PRIMARY KEY,
    record_time DATETIME NOT NULL,
    qps INT,
    tps INT,
    slow_queries INT,
    buffer_hit_rate DECIMAL(5,2),
    connection_count INT,
    INDEX idx_time (record_time)
);

-- 每小时记录一次
INSERT INTO performance_baseline 
SELECT NULL, NOW(), 
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Queries'),
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Com_commit') + 
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Com_rollback'),
    (SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest WHERE AVG_TIMER_WAIT > 1000000000000),
    (SELECT (1 - (SUM(VARIABLE_VALUE) / @@innodb_buffer_pool_size)) * 100 
     FROM performance_schema.global_status 
     WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads'),
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Threads_connected');

9.4.2 自动化监控告警

# 监控脚本示例
#!/bin/bash
# check_mysql_health.sh

MYSQL_CMD="mysql -u root -p'password' -e"
THRESHOLD_QPS=10000
THRESHOLD_SLOW=10

# 检查QPS
QPS=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Queries';" | awk 'NR==2{print $2}')
if [ $QPS -gt $THRESHOLD_QPS ]; then
    echo "ALERT: QPS is $QPS, exceeds threshold $THRESHOLD_QPS" | mail -s "MySQL Alert" admin@example.com
fi

# 检查慢查询
SLOW=$($MYSQL_CMD "SHOW GLOBAL STATUS LIKE 'Slow_queries';" | awk 'NR==2{print $2}')
if [ $SLOW -gt $THRESHOLD_SLOW ]; then
    echo "ALERT: Slow queries is $SLOW, exceeds threshold $THRESHOLD_SLOW" | mail -s "MySQL Alert" admin@example.com
fi

结语

MySQL高并发处理是一个系统工程,需要从索引优化、SQL调优、配置调整到架构升级的全方位优化。本文从基础到高级,从理论到实践,提供了完整的优化策略和实战案例。

核心要点总结

  1. 索引是基础:合理的索引设计能带来数量级的性能提升
  2. SQL是关键:避免索引失效,优化查询模式
  3. 配置是保障:合理配置发挥硬件最大性能
  4. 架构是根本:单机瓶颈时必须考虑架构升级
  5. 监控是眼睛:没有监控的优化是盲目的

优化是一个持续的过程,需要根据业务发展、数据增长和性能监控不断调整策略。建议建立性能基线,定期review,形成闭环的优化流程。

记住:先测量,再优化;先简单,再复杂;先应用,再架构


本文档基于MySQL 5.78.0版本,结合多年生产环境经验编写。实际应用中请根据具体业务场景调整优化策略。