引言:理解高并发场景下的MySQL挑战

在现代互联网应用中,高并发访问已经成为常态。无论是电商平台的秒杀活动,还是社交媒体的热点事件,MySQL数据库都面临着前所未有的压力。当并发连接数达到数千甚至上万时,简单的查询可能变得缓慢,写入操作可能产生锁等待,严重时甚至导致数据库服务不可用。

高并发对MySQL的主要挑战包括:

  • CPU资源竞争:大量查询同时执行,CPU调度压力剧增
  • 内存资源耗尽:连接数过多导致内存不足
  • 磁盘I/O瓶颈:频繁的读写操作使磁盘成为瓶颈
  • 锁竞争:行锁、表锁导致的等待和死锁
  • 网络延迟:大量数据传输导致网络拥堵

本文将从索引优化查询优化架构优化读写分离四个层面,系统性地讲解MySQL高并发处理策略,并提供完整的实战代码示例。

一、索引优化:高并发的基石

1.1 索引的基本原理与类型选择

索引是MySQL性能优化的核心。在高并发场景下,合理的索引设计可以将查询性能提升100倍甚至更多。

B-Tree索引(默认索引类型)

B-Tree索引是最常用的索引类型,适用于全值匹配、范围查询和排序操作。

-- 创建用户表的复合索引
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username_email (username, email),
    INDEX idx_status_created (status, created_at)
);

-- 查看索引使用情况
EXPLAIN SELECT * FROM users WHERE username = 'john_doe' AND email = 'john@example.com';

索引选择建议

  • 高选择性列优先:选择性高的列(唯一值多)索引效果更好
  • 最左前缀原则:复合索引必须遵循最左匹配
  • 覆盖索引:查询列全部在索引中,避免回表

Hash索引(精确匹配场景)

Hash索引适用于等值查询,不支持范围查询。

-- Memory引擎表使用Hash索引
CREATE TABLE cache_data (
    cache_key VARCHAR(100) PRIMARY KEY,
    cache_value TEXT,
    INDEX idx_hash USING HASH (cache_key)
) ENGINE=MEMORY;

全文索引(文本搜索场景)

-- 文章表的全文索引
CREATE TABLE articles (
    id INT PRIMARY KEY AUTO_INCREMENT,
    title VARCHAR(200),
    content TEXT,
    FULLTEXT INDEX idx_fulltext (title, content)
) ENGINE=InnoDB;

-- 全文搜索查询
SELECT * FROM articles 
WHERE MATCH(title, content) AGAINST('database optimization' IN NATURAL LANGUAGE MODE);

1.2 高并发场景下的索引设计实战

场景1:电商订单查询优化

-- 原始表结构(无优化)
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    status TINYINT,
    amount DECIMAL(10,2),
    create_time DATETIME,
    update_time DATETIME
);

-- 高并发查询需求:
-- 1. 查询用户未完成订单(最频繁)
-- 2. 查询某时间段订单
-- 3. 查询某状态订单统计

-- 优化后的索引设计
ALTER TABLE orders ADD INDEX idx_user_status (user_id, status);
ALTER TABLE orders ADD INDEX idx_create_time (create_time);
ALTER TABLE orders ADD INDEX idx_status_amount (status, amount);

-- 查询性能对比
-- 优化前:全表扫描,扫描100万行,耗时2.3秒
EXPLAIN SELECT * FROM orders WHERE user_id = 12345 AND status = 0;

-- 优化后:索引扫描,扫描10行,耗时0.002秒
-- 执行计划:type=ref, key=idx_user_status, rows=10

场景2:社交平台消息流优化

-- 用户消息表
CREATE TABLE user_messages (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    sender_id BIGINT NOT NULL,
    message_type TINYINT NOT NULL,
    content TEXT,
    is_read TINYINT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user_type_read (user_id, message_type, is_read, created_at DESC)
);

-- 高并发查询:获取用户未读消息列表
SELECT * FROM user_messages 
WHERE user_id = 10001 
  AND message_type = 1 
  AND is_read = 0 
ORDER BY created_at DESC 
LIMIT 20;

-- 索引覆盖优化(避免回表)
CREATE INDEX idx_covering ON user_messages (
    user_id, message_type, is_read, created_at DESC
) INCLUDE (id, content);  -- MySQL 8.0+支持INCLUDE语法

1.3 索引维护与监控

监控索引使用情况

-- 查看索引使用统计(MySQL 8.0+)
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
ORDER BY COUNT_FETCH DESC;

-- 查找未使用的索引(可能浪费写入性能)
SELECT 
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    t.INDEX_NAME,
    t.ROWS_READ
FROM performance_schema.table_io_waits_summary_by_index_usage t
LEFT JOIN information_schema.STATISTICS s 
    ON t.TABLE_SCHEMA = s.TABLE_SCHEMA 
    AND t.TABLE_NAME = s.TABLE_NAME 
    AND t.INDEX_NAME = s.INDEX_NAME
WHERE t.ROWS_READ = 0 
  AND t.INDEX_NAME IS NOT NULL;

索引碎片整理

-- 查看表碎片率
SELECT 
    TABLE_NAME,
    ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) AS total_mb,
    ROUND(DATA_FREE / 1024 / 1024, 2) AS free_mb,
    ROUND(DATA_FREE / (DATA_LENGTH + INDEX_LENGTH) * 100, 2) AS fragment_ratio
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database'
  AND DATA_FREE > 1024 * 1024;  -- 超过1MB碎片

-- 重建索引(在线DDL,MySQL 5.6+)
ALTER TABLE orders ENGINE=InnoDB;  -- 重建表和索引

-- 或者单独重建索引(MySQL 8.0+)
ALTER TABLE orders DROP INDEX idx_old, ADD INDEX idx_new (column1, column2);

二、查询优化:让SQL飞起来

2.1 高效SQL编写原则

避免SELECT *

-- 反例:查询所有列,导致大量不必要的数据传输和内存占用
SELECT * FROM users WHERE id = 10001;

-- 正例:只查询需要的列
SELECT id, username, email FROM users WHERE id = 10001;

-- 更优:使用覆盖索引
CREATE INDEX idx_cover ON users (id, username, email);
SELECT id, username, email FROM users WHERE id = 10001;  -- 索引覆盖,无需回表

合理使用LIMIT

-- 分页查询优化(避免深度分页问题)
-- 传统方式(越往后越慢)
SELECT * FROM orders WHERE user_id = 12345 ORDER BY id LIMIT 1000000, 20;

-- 优化方式1:使用子查询(先定位ID,再关联)
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders 
    WHERE user_id = 12345 
    ORDER BY id 
    LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

-- 优化方式2:使用位置记录(业务层优化)
-- 记录上一页最后一条记录的ID
SELECT * FROM orders 
WHERE user_id = 12345 AND id > 1000000 
ORDER BY id LIMIT 20;

避免不必要的排序

-- 反例:不必要的ORDER BY(索引已排序)
SELECT * FROM users ORDER BY created_at DESC LIMIT 10;

-- 正例:利用索引排序(索引包含created_at DESC)
CREATE INDEX idx_created ON users (created_at DESC);
SELECT * FROM users LIMIT 10;  -- 直接读取索引,无需额外排序

2.2 高级查询优化技巧

使用EXPLAIN分析执行计划

-- 示例:分析复杂查询
EXPLAIN 
SELECT 
    u.username,
    COUNT(o.order_id) AS order_count,
    SUM(o.amount) AS total_amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 1
  AND o.create_time >= '2024-01-01'
GROUP BY u.id
HAVING order_count > 5
ORDER BY total_amount DESC;

-- 关键指标解读:
-- type: ALL(全表扫描)→ index(索引扫描)→ range(范围扫描)→ ref(索引查找)→ const(常量查找)
-- rows: 预计扫描行数(越小越好)
-- Extra: Using index(覆盖索引)→ Using where(条件过滤)→ Using filesort(文件排序,需优化)

优化子查询

-- 反例:相关子查询(性能差)
SELECT * FROM orders o
WHERE o.amount > (
    SELECT AVG(amount) FROM orders WHERE user_id = o.user_id
);

-- 正例:使用JOIN优化
SELECT o.* FROM orders o
INNER JOIN (
    SELECT user_id, AVG(amount) AS avg_amount
    FROM orders
    GROUP BY user_id
) AS stats ON o.user_id = stats.user_id
WHERE o.amount > stats.avg_amount;

-- 更优:使用窗口函数(MySQL 8.0+)
SELECT * FROM (
    SELECT *,
           AVG(amount) OVER (PARTITION BY user_id) AS avg_amount
    FROM orders
) AS tmp
WHERE amount > avg_amount;

优化OR条件

-- 反例:OR条件导致索引失效
SELECT * FROM users WHERE username = 'alice' OR email = 'alice@example.com';

-- 正例:使用UNION ALL
SELECT * FROM users WHERE username = 'alice'
UNION ALL
SELECT * FROM users WHERE email = 'alice@example.com';

-- 或者使用IN(如果列有索引)
SELECT * FROM users WHERE username IN ('alice', 'bob');

2.3 高并发场景下的查询模式优化

批量操作替代循环单条操作

# Python示例:批量插入 vs 循环插入
import mysql.connector

# 反例:循环单条插入(1000次网络往返)
def insert_slow(data_list):
    conn = mysql.connector.connect(...)
    cursor = conn.cursor()
    for item in data_list:
        cursor.execute(
            "INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)",
            (item['user_id'], item['action'], item['timestamp'])
        )
    conn.commit()

# 正例:批量插入(1次网络往返)
def insert_fast(data_list):
    conn = mysql.connector.connect(...)
    cursor = conn.cursor()
    sql = "INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)"
    values = [(item['user_id'], item['action'], item['timestamp']) for item in data_list]
    cursor.executemany(sql, values)  # 批量执行
    conn.commit()

# 性能对比:1000条记录,批量插入快50-100倍

使用存储过程减少网络往返

-- 存储过程:批量更新库存
DELIMITER $$

CREATE PROCEDURE batch_update_stock(
    IN order_items JSON  -- 订单项JSON数组
)
BEGIN
    DECLARE i INT DEFAULT 0;
    DECLARE item_count INT;
    DECLARE product_id INT;
    DECLARE quantity INT;
    
    SET item_count = JSON_LENGTH(order_items);
    
    -- 开启事务
    START TRANSACTION;
    
    WHILE i < item_count DO
        SET product_id = JSON_EXTRACT(order_items, CONCAT('$[', i, '].product_id'));
        SET quantity = JSON_EXTRACT(order_items, CONCAT('$[', i, '].quantity'));
        
        -- 更新库存(带乐观锁)
        UPDATE product_stock 
        SET stock = stock - quantity,
            version = version + 1
        WHERE product_id = product_id 
          AND stock >= quantity
          AND version = JSON_EXTRACT(order_items, CONCAT('$[', i, '].version'));
        
        IF ROW_COUNT() = 0 THEN
            -- 库存不足或版本冲突
            ROLLBACK;
            SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Stock update failed';
            RETURN;
        END IF;
        
        SET i = i + 1;
    END WHILE;
    
    COMMIT;
END$$

DELIMITER ;

-- 调用示例
CALL batch_update_stock('[{"product_id": 101, "quantity": 2, "version": 5}, {"product_id": 102, "quantity": 1, "version": 3}]');

三、架构优化:突破单机瓶颈

3.1 连接池优化

连接池参数调优

# Python SQLAlchemy连接池配置
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+pymysql://user:pass@host/db",
    poolclass=QueuePool,
    pool_size=20,           # 核心连接数(根据CPU核心数调整)
    max_overflow=10,        # 超出核心连接数时允许的最大连接数
    pool_timeout=30,        # 获取连接超时时间(秒)
    pool_recycle=3600,      # 连接回收时间(秒),避免长连接导致的超时
    pool_pre_ping=True,     # 连接健康检查(防止连接断开)
    echo=False              # 生产环境关闭SQL日志
)

# 连接池监控
from sqlalchemy import event
from sqlalchemy.pool import Pool

@event.listens_for(Pool, 'checkout')
def checkout(dbapi_con, con_record, con_proxy):
    print("获取连接")

@event.listens_for(Pool, 'checkin')
def checkin(dbapi_con, con_record):
    print("归还连接")

MySQL服务器连接参数

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 调整最大连接数(my.cnf)
[mysqld]
max_connections = 1000      # 最大连接数(根据内存调整,每个连接约10MB)
thread_cache_size = 100     # 线程缓存,减少线程创建开销
wait_timeout = 600          # 非交互连接超时时间(秒)
interactive_timeout = 3600  # 交互连接超时时间(秒)

-- 查看连接状态
SHOW PROCESSLIST;  -- 查看当前所有连接
KILL 12345;        -- 杀掉指定连接

3.2 缓存策略

应用层缓存(Redis)

import redis
import json
from functools import wraps

# Redis连接配置
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True,
    socket_timeout=5,
    socket_connect_timeout=5
)

def cache_query(ttl=300):
    """查询结果缓存装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存key
            key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
            
            # 尝试从缓存获取
            cached = redis_client.get(key)
            if cached:
                return json.loads(cached)
            
            # 执行查询
            result = func(*args, **kwargs)
            
            # 写入缓存
            redis_client.setex(key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

# 使用示例
@cache_query(ttl=60)
def get_user_profile(user_id):
    # 模拟数据库查询
    conn = mysql.connector.connect(...)
    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
    return cursor.fetchone()

# 缓存更新策略(写操作时清除缓存)
def update_user_profile(user_id, data):
    # 更新数据库
    conn = mysql.connector.connect(...)
    cursor = conn.cursor()
    cursor.execute("UPDATE users SET ... WHERE id = %s", (user_id,))
    conn.commit()
    
    # 清除缓存
    redis_client.delete(f"cache:get_user_profile:({user_id},)")

MySQL查询缓存(已废弃,推荐使用外部缓存)

-- MySQL 5.7及之前版本的查询缓存(MySQL 8.0已移除)
-- 查看查询缓存状态
SHOW VARIABLES LIKE 'query_cache%';

-- 在my.cnf中配置(不推荐在高并发场景使用)
[mysqld]
query_cache_type = 0  # 关闭(高并发下缓存失效开销大)
query_cache_size = 0

3.3 分库分表(垂直拆分与水平拆分)

垂直拆分:按业务模块拆分

-- 原始单表(字段过多,访问频繁)
CREATE TABLE user_all (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100),
    phone VARCHAR(20),
    password_hash VARCHAR(255),
    profile_text TEXT,
    avatar_url VARCHAR(255),
    last_login DATETIME,
    login_count INT,
    -- ... 更多字段
    INDEX idx_username (username)
);

-- 垂直拆分:核心信息 + 扩展信息
-- 表1:用户核心信息(高频访问)
CREATE TABLE user_core (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username),
    INDEX idx_email (email)
) ENGINE=InnoDB;

-- 表2:用户资料信息(低频访问)
CREATE TABLE user_profile (
    user_id BIGINT PRIMARY KEY,
    phone VARCHAR(20),
    profile_text TEXT,
    avatar_url VARCHAR(255),
    last_login DATETIME,
    login_count INT DEFAULT 0,
    FOREIGN KEY (user_id) REFERENCES user_core(id) ON DELETE CASCADE
) ENGINE=InnoDB;

-- 表3:用户统计信息(写入频繁)
CREATE TABLE user_stats (
    user_id BIGINT PRIMARY KEY,
    login_count INT DEFAULT 0,
    order_count INT DEFAULT 0,
    last_order_time DATETIME,
    total_spent DECIMAL(12,2) DEFAULT 0,
    FOREIGN KEY (user_id) REFERENCES user_core(id) ON DELETE CASCADE
) ENGINE=InnoDB;

水平拆分:按用户ID哈希分片

-- 分片表结构(4个分片)
-- 分片0:user_id % 4 = 0
CREATE TABLE orders_0 (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    amount DECIMAL(10,2),
    status TINYINT,
    create_time DATETIME,
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB;

-- 分片1:user_id % 4 = 1
CREATE TABLE orders_1 (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT NOT NULL,
    amount DECIMAL(10,2),
    status TINYINT,
    create_time DATETIME,
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB;

-- 分片2:user_id % 4 = 2
CREATE TABLE orders_2 (...);

-- 分片3:user_id % 4 = 3
CREATE TABLE orders_3 (...);

-- 分片路由函数(应用层实现)
def get_shard_table(user_id):
    shard_index = user_id % 4
    return f"orders_{shard_index}"

def get_orders_by_user(user_id):
    table_name = get_shard_table(user_id)
    sql = f"SELECT * FROM {table_name} WHERE user_id = %s"
    # 执行查询...

四、读写分离:架构级高并发解决方案

4.1 读写分离原理与架构

读写分离的核心思想是将写操作(INSERT/UPDATE/DELETE)路由到主库(Master),将读操作(SELECT)路由到从库(Slave),从而分担主库压力。

架构图

┌─────────────────────────────────────────┐
│           应用服务层                    │
│  ┌───────────────────────────────────┐  │
│  │   读写分离中间件/应用层路由        │  │
│  │   - 写操作 → Master               │  │
│  │   - 读操作 → Slave                │  │
│  └───────────────────────────────────┘  │
└─────────────────────────────────────────┘
           │                  │
           ▼                  ▼
   ┌─────────────┐    ┌─────────────┐
   │  Master     │    │   Slave1    │
   │  (写+读)    │    │   (只读)    │
   └─────────────┘    └─────────────┘
           │                  │
           └────────┬─────────┘
                    │
           ┌────────▼─────────┐
           │  Binlog复制      │
           │  (异步/半同步)   │
           └──────────────────┘

4.2 MySQL主从复制配置

步骤1:主库配置(Master)

-- 1. 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'ReplPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 2. 查看主库状态(记录File和Position)
SHOW MASTER STATUS;
-- 示例输出:
-- +------------------+----------+--------------+------------------+-------------------+
-- | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
-- +------------------+----------+--------------+------------------+-------------------+
-- | mysql-bin.000012 |      154 |              |                  |                   |
-- +------------------+----------+--------------+------------------+-------------------+

-- 3. 锁表(确保数据一致性,可选)
FLUSH TABLES WITH READ LOCK;

-- 4. 在my.cnf中配置主库
[mysqld]
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW          # 推荐ROW格式,确保数据一致性
expire_logs_days = 7         # 自动清理7天前的binlog
max_binlog_size = 100M       # 单个binlog文件大小
binlog_cache_size = 4M       # binlog缓存大小

-- 5. 解锁(数据导出完成后)
UNLOCK TABLES;

步骤2:从库配置(Slave)

-- 1. 在my.cnf中配置从库
[mysqld]
server-id = 2  # 必须与主库不同
relay_log = /var/log/mysql/mysql-relay-bin.log
log_bin = /var/log/mysql/mysql-bin.log
read_only = 1  # 强制从库只读,防止误写
slave_parallel_workers = 4  # 并行复制线程数(MySQL 5.7+)

-- 2. 从库启动复制
CHANGE MASTER TO
    MASTER_HOST='192.168.1.100',      -- 主库IP
    MASTER_USER='repl',               -- 复制用户名
    MASTER_PASSWORD='ReplPassword123!', -- 复制密码
    MASTER_LOG_FILE='mysql-bin.000012', -- 主库binlog文件名
    MASTER_LOG_POS=154,               -- 主库binlog位置
    MASTER_SSL=1;                     -- 启用SSL加密(生产环境推荐)

-- 3. 启动从库复制
START SLAVE;

-- 4. 查看复制状态
SHOW SLAVE STATUS\G
-- 关键指标:
-- Slave_IO_Running: Yes      # IO线程是否正常
-- Slave_SQL_Running: Yes     # SQL线程是否正常
-- Seconds_Behind_Master: 0   # 延迟秒数(0表示无延迟)
-- Last_Error:                # 错误信息(如果有)

4.3 应用层实现读写分离

方案1:基于AOP的读写分离(Java Spring Boot)

// 1. 定义数据源注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
    DataSourceType value() default DataSourceType.SLAVE;
}

public enum DataSourceType {
    MASTER, SLAVE
}

// 2. 动态数据源上下文
public class DataSourceContextHolder {
    private static final ThreadLocal<DataSourceType> contextHolder = 
        new ThreadLocal<>();
    
    public static void setDataSource(DataSourceType type) {
        contextHolder.set(type);
    }
    
    public static DataSourceType getDataSource() {
        return contextHolder.getSlaveFirst() != null ? 
            contextHolder.get() : DataSourceType.SLAVE;
    }
    
    public static void clear() {
        contextHolder.remove();
    }
}

// 3. AOP拦截器
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("execution(* com.example.service.*.*(..))")
    public void before(JoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        
        // 优先使用方法上的注解
        if (method.isAnnotationPresent(DataSource.class)) {
            DataSource annotation = method.getAnnotation(DataSource.class);
            DataSourceContextHolder.setDataSource(annotation.value());
        } else {
            // 默认策略:写操作走主库,读操作走从库
            String methodName = method.getName();
            if (methodName.startsWith("get") || methodName.startsWith("list") || 
                methodName.startsWith("query")) {
                DataSourceContextHolder.setDataSource(DataSourceType.SLAVE);
            } else {
                DataSourceContextHolder.setDataSource(DataSourceType.MASTER);
            }
        }
    }
    
    @After("execution(* com.example.service.*.*(..))")
    public void after() {
        DataSourceContextHolder.clear();
    }
}

// 4. 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSource();
    }
}

// 5. 服务层使用
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    // 强制走主库(写操作)
    @DataSource(DataSourceType.MASTER)
    public User createUser(User user) {
        return userRepository.save(user);
    }
    
    // 强制走从库(读操作)
    @DataSource(DataSourceType.SLAVE)
    public User getUserById(Long id) {
        return userRepository.findById(id).orElse(null);
    }
    
    // 默认走从库(读操作)
    public List<User> listUsers() {
        return userRepository.findAll();
    }
}

方案2:使用ShardingSphere中间件

# ShardingSphere配置(application.yml)
spring:
  shardingsphere:
    datasource:
      names: master,slave0,slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.1.100:3306/mydb
        username: root
        password: master_password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.1.101:3306/mydb
        username: root
        password: slave_password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://192.168.1.102:3306/mydb
        username: root
        password: slave_password
    
    # 读写分离规则
    rules:
      readwrite-splitting:
        data-sources:
          mydb:
            type: Static
            props:
              write-data-source-name: master
              read-data-source-names: slave0,slave1
              load-balance-algorithm-name: round_robin  # 轮询负载均衡
    
    # 负载均衡算法
    algorithms:
      round_robin:
        type: ROUND_ROBIN
    
    # 全局配置
    props:
      sql-show: true  # 显示实际路由的SQL(开发环境)
      check-table-metadata-enabled: true

方案3:使用ProxySQL(MySQL协议代理)

-- ProxySQL配置(通过Admin接口)
-- 连接ProxySQL管理端口
mysql -u admin -padmin -h 127.0.0.1 -P 6032

-- 配置主库和从库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) VALUES
(10, '192.168.1.100', 3306, 100),  -- 主库(hostgroup 10)
(20, '192.168.1.101', 3306, 100),  -- 从库1(hostgroup 20)
(20, '192.168.1.102', 3306, 100);  -- 从库2(hostgroup 20)

-- 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*FOR UPDATE', 10, 1),  -- SELECT FOR UPDATE走主库
(2, 1, '^SELECT', 20, 1);              -- 普通SELECT走从库

-- 配置健康检查
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-ping_interval_server_msec';
LOAD MYSQL VARIABLES TO RUNTIME;
LOAD MYSQL SERVERS TO RUNTIME;

-- 保存配置
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;

4.4 主从延迟监控与处理

监控延迟

-- 在从库执行
SHOW SLAVE STATUS\G
-- 关注:Seconds_Behind_Master

-- 更精确的延迟监控(在主库和从库分别执行相同查询)
-- 主库:
SELECT UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(LAST_UPDATE_TIME) AS delay 
FROM information_schema.GLOBAL_STATUS 
WHERE VARIABLE_NAME = 'LAST_QUERY_COST';

-- 创建监控表(记录延迟)
CREATE TABLE replication_delay (
    id INT PRIMARY KEY AUTO_INCREMENT,
    check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    delay_seconds INT,
    slave_host VARCHAR(100)
);

-- 定时任务监控(Event Scheduler)
CREATE EVENT monitor_replication
ON SCHEDULE EVERY 10 SECOND
DO
    INSERT INTO replication_delay (delay_seconds, slave_host)
    SELECT 
        COALESCE(SECONDS_BEHIND_MASTER, 0),
        VARIABLE_VALUE
    FROM performance_schema.global_status
    WHERE VARIABLE_NAME = 'SLAVE_LAG';

处理延迟的策略

# 应用层处理延迟的几种策略

# 策略1:强制读主库(关键业务)
def get_user_with_consistency(user_id):
    # 检查业务是否需要强一致性
    if need_strong_consistency():
        return get_from_master(user_id)  # 强制读主库
    else:
        return get_from_slave(user_id)   # 读从库

# 策略2:延迟检测与降级
def get_data_with_fallback(key):
    try:
        # 先尝试读从库
        data = redis_client.get(f"slave:{key}")
        if data:
            return json.loads(data)
        
        # 从库无数据或延迟过高,读主库
        data = get_from_master(key)
        # 写入缓存
        redis_client.setex(f"slave:{key}", 60, json.dumps(data))
        return data
    except Exception as e:
        # 从库故障,读主库
        logger.error(f"Slave error: {e}, fallback to master")
        return get_from_master(key)

# 策略3:业务层补偿(最终一致性)
def create_order_with_sync(order_data):
    # 1. 写主库
    order_id = insert_to_master(order_data)
    
    # 2. 等待从库同步(可选)
    if need_immediate_read():
        time.sleep(0.5)  # 等待500ms
    
    # 3. 返回结果
    return order_id

4.5 读写分离的高级优化

半同步复制(减少数据丢失风险)

-- 主库配置半同步复制
-- 安装插件(MySQL 5.7+)
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';

-- 配置参数(my.cnf)
[mysqld]
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 1000  # 1秒超时,退化为异步
rpl_semi_sync_master_wait_point = AFTER_SYNC  # MySQL 5.7+推荐

-- 查看状态
SHOW VARIABLES LIKE 'rpl_semi_sync_master_status';
SHOW STATUS LIKE 'Rpl_semi_sync_master_status';

-- 从库配置
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
[mysqld]
rpl_semi_sync_slave_enabled = 1

多级复制(缓解主库压力)

主库 (Master)
  │
  ├─► 一级从库 (Slave1) - 承担部分读流量
  │    │
  │    └─► 二级从库 (Slave2) - 承担报表查询
  │
  └─► 一级从库 (Slave3) - 承担读流量
       │
       └─► 二级从库 (Slave4) - 承担备份
-- 一级从库配置(开启binlog,作为二级从库的主库)
[mysqld]
server-id = 2
log_bin = /var/log/mysql/mysql-bin.log
read_only = 1
log_slave_updates = 1  -- 重要:将从主库接收的更新写入自己的binlog

-- 二级从库配置
CHANGE MASTER TO
    MASTER_HOST='192.168.1.101',  -- 一级从库IP
    MASTER_USER='repl',
    MASTER_PASSWORD='password';

五、高并发综合实战案例

5.1 秒杀系统设计

数据库设计

-- 商品表
CREATE TABLE products (
    id BIGINT PRIMARY KEY,
    name VARCHAR(200),
    stock INT NOT NULL,  -- 库存
    version INT DEFAULT 0,  -- 乐观锁版本号
    INDEX idx_stock (stock)  -- 库存索引
);

-- 订单表(分表)
CREATE TABLE orders_0 (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    product_id BIGINT,
    quantity INT,
    create_time DATETIME,
    INDEX idx_user_product (user_id, product_id)
) ENGINE=InnoDB;

-- 秒杀记录表(防重复)
CREATE TABLE seckill_records (
    user_id BIGINT,
    product_id BIGINT,
    order_id BIGINT,
    create_time DATETIME,
    PRIMARY KEY (user_id, product_id)
) ENGINE=InnoDB;

应用层实现

@Service
public class SeckillService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    // 秒杀主方法
    @Transactional
    public SeckillResult seckill(Long userId, Long productId, int quantity) {
        String lockKey = "seckill:lock:" + productId;
        String stockKey = "seckill:stock:" + productId;
        
        try {
            // 1. Redis预减库存(减少数据库访问)
            Long stock = redisTemplate.opsForValue().decrement(stockKey, quantity);
            if (stock < 0) {
                redisTemplate.opsForValue().increment(stockKey, quantity); // 回滚
                return SeckillResult.fail("库存不足");
            }
            
            // 2. 分布式锁(防止超卖)
            boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, "1", 5, TimeUnit.SECONDS);
            if (!locked) {
                return SeckillResult.fail("系统繁忙,请重试");
            }
            
            // 3. 检查是否已秒杀
            String existKey = "seckill:record:" + userId + ":" + productId;
            if (redisTemplate.hasKey(existKey)) {
                return SeckillResult.fail("您已参与过秒杀");
            }
            
            // 4. 数据库操作(走主库)
            // 乐观锁更新库存
            int updated = jdbcTemplate.update(
                "UPDATE products SET stock = stock - ?, version = version + 1 " +
                "WHERE id = ? AND stock >= ? AND version = ?",
                quantity, productId, quantity, getCurrentVersion(productId)
            );
            
            if (updated == 0) {
                return SeckillResult.fail("库存不足或已被抢购");
            }
            
            // 5. 创建订单
            Long orderId = createOrder(userId, productId, quantity);
            
            // 6. 记录到Redis(防重复)
            redisTemplate.opsForValue().set(existKey, orderId.toString(), 30, TimeUnit.MINUTES);
            
            // 7. 发送MQ消息(异步处理后续流程)
            sendOrderMessage(orderId, userId, productId);
            
            return SeckillResult.success(orderId);
            
        } finally {
            // 释放锁
            redisTemplate.delete(lockKey);
        }
    }
    
    private Long createOrder(Long userId, Long productId, int quantity) {
        Long orderId = generateOrderId();
        int shardIndex = (int) (userId % 4);
        String table = "orders_" + shardIndex;
        
        jdbcTemplate.update(
            String.format("INSERT INTO %s (order_id, user_id, product_id, quantity, create_time) VALUES (?, ?, ?, ?, NOW())", table),
            orderId, userId, productId, quantity
        );
        
        return orderId;
    }
}

数据库配置优化

# my.cnf 秒杀场景专用配置
[mysqld]
# 连接优化
max_connections = 2000
thread_cache_size = 200
back_log = 500

# InnoDB优化
innodb_buffer_pool_size = 16G  # 根据内存调整,建议70-80%内存
innodb_log_file_size = 2G
innodb_flush_log_at_trx_commit = 2  # 性能优先(可能丢失1秒数据)
innodb_flush_method = O_DIRECT
innodb_io_capacity = 2000

# 锁优化
innodb_lock_wait_timeout = 10
innodb_rollback_on_timeout = 1

# 查询缓存(关闭,高并发下无效)
query_cache_type = 0
query_cache_size = 0

# 临时表优化
tmp_table_size = 256M
max_heap_table_size = 256M

5.2 社交平台消息流优化

场景分析

  • 读多写少:用户查看消息流频率远高于发送消息
  • 实时性要求:新消息需要快速可见
  • 数据量大:单用户消息可能达到百万级

优化方案

-- 消息表(分表+分区)
CREATE TABLE messages (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    sender_id BIGINT NOT NULL,
    content TEXT,
    message_type TINYINT,
    is_read TINYINT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user_type_read (user_id, message_type, is_read, created_at DESC)
) PARTITION BY RANGE (YEAR(created_at)) (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 消息计数器(缓存)
CREATE TABLE message_counters (
    user_id BIGINT PRIMARY KEY,
    unread_count INT DEFAULT 0,
    total_count INT DEFAULT 0,
    last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 应用层读写分离策略
class MessageService:
    
    def send_message(self, sender_id, receiver_id, content):
        """发送消息(写主库)"""
        # 1. 插入消息表
        msg_id = self.insert_to_master(
            "INSERT INTO messages (user_id, sender_id, content, message_type) VALUES (?, ?, ?, ?)",
            (receiver_id, sender_id, content, 1)
        )
        
        # 2. 更新计数器(主库)
        self.insert_to_master(
            "INSERT INTO message_counters (user_id, unread_count, total_count) VALUES (?, 1, 1) "
            "ON DUPLICATE KEY UPDATE unread_count = unread_count + 1, total_count = total_count + 1",
            (receiver_id,)
        )
        
        # 3. 发送MQ通知(异步)
        self.send_notification(receiver_id, msg_id)
        
        return msg_id
    
    def get_unread_messages(self, user_id, limit=20):
        """获取未读消息(读从库)"""
        # 1. 先读计数器(从库)
        counter = self.query_from_slave(
            "SELECT unread_count FROM message_counters WHERE user_id = %s",
            (user_id,)
        )
        
        if not counter or counter['unread_count'] == 0:
            return [], 0
        
        # 2. 读取消息列表(从库)
        messages = self.query_from_slave(
            "SELECT * FROM messages WHERE user_id = %s AND is_read = 0 "
            "ORDER BY created_at DESC LIMIT %s",
            (user_id, limit)
        )
        
        return messages, counter['unread_count']
    
    def mark_as_read(self, user_id, message_ids):
        """标记已读(写主库)"""
        # 1. 批量更新消息状态
        placeholders = ','.join(['%s'] * len(message_ids))
        self.insert_to_master(
            f"UPDATE messages SET is_read = 1 WHERE id IN ({placeholders}) AND user_id = %s",
            message_ids + [user_id]
        )
        
        # 2. 更新计数器
        self.insert_to_master(
            "UPDATE message_counters SET unread_count = GREATEST(0, unread_count - %s) WHERE user_id = %s",
            (len(message_ids), user_id)
        )
        
        # 3. 清除缓存
        self.cache.delete(f"messages:{user_id}:unread")

六、监控与告警

6.1 关键性能指标监控

-- 1. 连接数监控
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 2. 查询性能监控
SHOW STATUS LIKE 'Slow_queries';
SHOW STATUS LIKE 'Questions';

-- 3. InnoDB状态监控
SHOW ENGINE INNODB STATUS\G

-- 4. 锁等待监控
SELECT * FROM performance_schema.data_lock_waits;
SELECT * FROM performance_schema.data_locks;

-- 5. 慢查询日志分析
-- 在my.cnf中开启
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录超过1秒的查询
log_queries_not_using_indexes = 1

-- 分析慢日志
mysqldumpslow /var/log/mysql/slow.log
pt-query-digest /var/log/mysql/slow.log

6.2 监控脚本示例

#!/usr/bin/env python3
# MySQL监控脚本

import mysql.connector
import time
import smtplib
from email.mime.text import MIMEText

class MySQLMonitor:
    def __init__(self, host, user, password):
        self.config = {
            'host': host,
            'user': user,
            'password': password,
            'connect_timeout': 5
        }
    
    def check_connections(self):
        """检查连接数"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
        connected = int(cursor.fetchone()[1])
        
        cursor.execute("SHOW VARIABLES LIKE 'max_connections'")
        max_conn = int(cursor.fetchone()[1])
        
        cursor.close()
        conn.close()
        
        usage = (connected / max_conn) * 100
        if usage > 80:
            self.send_alert(f"连接数过高: {connected}/{max_conn} ({usage:.1f}%)")
        
        return connected, max_conn
    
    def check_slow_queries(self):
        """检查慢查询"""
        conn = mysql.connector.connect(**self.config)
        cursor = conn.cursor()
        cursor.execute("SHOW STATUS LIKE 'Slow_queries'")
        slow_queries = int(cursor.fetchone()[1])
        cursor.close()
        conn.close()
        
        if slow_queries > 100:  # 1分钟内超过100个慢查询
            self.send_alert(f"慢查询过多: {slow_queries}个")
        
        return slow_queries
    
    def check_replication_lag(self):
        """检查主从延迟"""
        try:
            conn = mysql.connector.connect(**self.config)
            cursor = conn.cursor()
            cursor.execute("SHOW SLAVE STATUS")
            result = cursor.fetchone()
            
            if result:
                lag = result[32]  # Seconds_Behind_Master
                if lag and lag > 60:  # 延迟超过60秒
                    self.send_alert(f"主从延迟过高: {lag}秒")
                return lag
        except:
            pass
        finally:
            cursor.close()
            conn.close()
        
        return None
    
    def send_alert(self, message):
        """发送告警(邮件示例)"""
        msg = MIMEText(message)
        msg['Subject'] = 'MySQL监控告警'
        msg['From'] = 'monitor@example.com'
        msg['To'] = 'dba@example.com'
        
        try:
            server = smtplib.SMTP('smtp.example.com', 587)
            server.login('user', 'pass')
            server.send_message(msg)
            server.quit()
        except Exception as e:
            print(f"发送告警失败: {e}")
    
    def run_monitor(self):
        """持续监控"""
        while True:
            try:
                self.check_connections()
                self.check_slow_queries()
                self.check_replication_lag()
            except Exception as e:
                print(f"监控错误: {e}")
            
            time.sleep(60)  # 每分钟检查一次

if __name__ == '__main__':
    monitor = MySQLMonitor('localhost', 'monitor', 'password')
    monitor.run_monitor()

七、总结与最佳实践

7.1 高并发优化检查清单

索引优化

  • [ ] 所有查询都有合适的索引
  • [ ] 避免索引失效(如函数操作、隐式类型转换)
  • [ ] 定期检查未使用索引并清理
  • [ ] 复合索引遵循最左前缀原则

查询优化

  • [ ] 避免SELECT *
  • [ ] 合理使用LIMIT,避免深度分页
  • [ ] 批量操作替代循环单条
  • [ ] 使用EXPLAIN分析慢查询

架构优化

  • [ ] 配置合理的连接池参数
  • [ ] 实现读写分离
  • [ ] 使用Redis缓存热点数据
  • [ ] 考虑分库分表(数据量超过千万级)

监控告警

  • [ ] 开启慢查询日志
  • [ ] 监控连接数和CPU使用率
  • [ ] 监控主从延迟
  • [ ] 设置合理的告警阈值

7.2 不同并发量下的推荐方案

并发量 推荐方案 预期性能提升
< 100 QPS 索引优化 + 连接池 5-10倍
100-1000 QPS 查询优化 + Redis缓存 10-50倍
1000-5000 QPS 读写分离 + 缓存 50-200倍
> 5000 QPS 分库分表 + 读写分离 200倍以上

7.3 常见误区与避坑指南

  1. 过度索引:索引越多越好 ❌

    • 每个索引都会增加写入开销,建议单表索引不超过5个
  2. 盲目分库分表:数据量不大就分表 ❌

    • 分表增加复杂度,数据量超过1000万再考虑
  3. 忽视主从延迟:认为读写分离就完美 ❌

    • 必须处理延迟问题,关键业务读主库
  4. 缓存与数据库不一致:只更新缓存不更新数据库 ❌

    • 必须先更新数据库,再删除缓存(Cache Aside模式)
  5. 长事务:事务中包含远程调用 ❌

    • 事务应短小精悍,避免持有锁过长时间

7.4 性能测试基准

# 使用sysbench进行压力测试
# 安装sysbench
sudo apt-get install sysbench

# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
  --mysql-db=testdb --tables=10 --table-size=100000 \
  /usr/share/sysbench/oltp_read_write.lua prepare

# 执行测试(100并发,60秒)
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
  --mysql-db=testdb --tables=10 --table-size=100000 \
  --threads=100 --time=60 --report-interval=10 \
  /usr/share/sysbench/oltp_read_write.lua run

# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
  --mysql-db=testdb /usr/share/sysbench/oltp_read_write.lua cleanup

通过本文的系统性优化,您可以将MySQL的并发处理能力提升10-100倍。记住,优化是一个持续的过程,需要根据业务变化和监控数据不断调整策略。建议建立性能基线,定期进行压力测试,确保系统始终处于最佳状态。