引言:理解高并发场景下的MySQL挑战

在当今互联网应用中,高并发场景已经成为常态。无论是电商秒杀、社交网络的热点事件,还是金融交易系统,MySQL数据库都面临着前所未有的挑战。当每秒查询量(QPS)达到数千甚至数万时,数据库的性能瓶颈往往成为系统稳定性的关键因素。

高并发对MySQL的主要挑战包括:

  • 连接数耗尽:大量并发连接导致无法建立新连接
  • CPU资源竞争:频繁的上下文切换和锁竞争
  • I/O瓶颈:磁盘读写速度跟不上内存操作速度
  • 锁等待:行锁、表锁导致的长时间等待
  • 缓存失效:热点数据频繁更新导致缓存穿透

本文将从索引优化查询优化架构设计三个层面,全面解析MySQL应对流量洪峰的策略。

一、索引优化:性能提升的第一道防线

1.1 索引设计的核心原则

索引是MySQL性能优化的基石。正确的索引设计可以将查询性能提升几个数量级。

1.1.1 最左前缀原则

对于复合索引,MySQL会遵循最左前缀原则。例如,创建索引:

CREATE INDEX idx_user_name_age ON users(name, age);

这个索引可以被以下查询有效利用:

-- ✅ 使用索引
SELECT * FROM users WHERE name = '张三';
SELECT * FROM users WHERE name = '张三' AND age = 25;

-- ❌ 无法使用索引
SELECT * FROM users WHERE age = 25;

1.1.2 覆盖索引优化

覆盖索引是指查询所需的所有列都在索引中,避免回表操作。

示例:订单查询优化

-- 原始查询(需要回表)
SELECT order_id, user_id, amount FROM orders 
WHERE user_id = 1001 AND status = 'paid';

-- 优化方案1:创建覆盖索引
CREATE INDEX idx_user_status_amount ON orders(user_id, status, amount);

-- 优化方案2:使用延迟关联(大表特别有效)
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders 
    WHERE user_id = 1001 AND status = 'paid' 
    LIMIT 100
) AS tmp ON o.order_id = tmp.order_id;

1.2 索引失效的常见场景

1.2.1 隐式类型转换

-- 假设user_id是VARCHAR类型
-- ❌ 索引失效
SELECT * FROM users WHERE user_id = 12345;

-- ✅ 索引生效
SELECT * FROM users WHERE user_id = '12345';

1.2.2 函数操作导致索引失效

-- ❌ 索引失效(对索引列使用函数)
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- ✅ 索引生效(改写为范围查询)
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

1.2.3 LIKE查询优化

-- ❌ 索引失效(前缀模糊匹配)
SELECT * FROM users WHERE name LIKE '%三';

-- ✅ 索引生效(后缀模糊匹配)
SELECT * FROM users WHERE name LIKE '张%';

-- ✅ 全文索引方案(适用于大量文本搜索)
ALTER TABLE users ADD FULLTEXT INDEX ft_name (name);
SELECT * FROM users WHERE MATCH(name) AGAINST('三' IN BOOLEAN MODE);

1.3 索引维护的最佳实践

1.3.1 索引选择性计算

-- 计算列的选择性(越接近1越好)
SELECT 
    COUNT(DISTINCT status) / COUNT(*) AS selectivity
FROM orders;

-- 查看表中索引的使用情况
SELECT 
    object_schema,
    object_name,
    index_name,
    count_star,
    count_read,
    count_write
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE object_schema = 'your_database'
ORDER BY count_star DESC;

1.3.2 删除无用索引

-- 查找从未使用过的索引
SELECT 
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    t.INDEX_NAME
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage u
    ON t.TABLE_SCHEMA = u.OBJECT_SCHEMA 
    AND t.TABLE_NAME = u.OBJECT_NAME 
    AND t.INDEX_NAME = u.INDEX_NAME
WHERE u.INDEX_NAME IS NULL 
  AND t.TABLE_SCHEMA = 'your_database';

二、查询优化:让SQL飞起来

2.1 执行计划分析

2.1.1 EXPLAIN详解

-- 基础使用
EXPLAIN SELECT * FROM orders WHERE user_id = 1001;

-- 详细格式
EXPLAIN FORMAT=JSON SELECT * FROM orders WHERE user_id = 1001;

执行计划关键字段解读:

  • type:访问类型(ALL > index > range > ref > eq_ref > const > system)
  • key:实际使用的索引
  • rows:预估扫描行数
  • Extra:额外信息(Using index, Using where, Using filesort等)

2.1.2 慢查询日志分析

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;  -- 超过1秒的查询
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';

-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

2.2 高效查询模式

2.2.1 分页优化

-- ❌ 传统分页(深度分页性能差)
SELECT * FROM orders WHERE user_id = 1001 
ORDER BY create_time DESC 
LIMIT 1000000, 20;

-- ✅ 优化方案1:延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders 
    WHERE user_id = 1001 
    ORDER BY create_time DESC 
    LIMIT 1000000, 20
) AS tmp ON o.order_id = tmp.order_id;

-- ✅ 优化方案2:位置记录法(适用于实时性要求不高的场景)
SELECT * FROM orders 
WHERE user_id = 1001 
  AND create_time < '2024-01-01 00:00:00'  -- 上一页最后一条的时间
ORDER BY create_time DESC 
LIMIT 20;

2.2.2 IN vs EXISTS

-- ❌ 数据量大时性能差
SELECT * FROM users WHERE id IN (
    SELECT user_id FROM orders WHERE amount > 1000
);

-- ✅ 使用EXISTS(半连接优化)
SELECT * FROM users u WHERE EXISTS (
    SELECT 1 FROM orders o WHERE o.user_id = u.id AND o.amount > 1000
);

-- ✅ 更优:使用JOIN
SELECT DISTINCT u.* FROM users u
INNER JOIN orders o ON u.id = o.user_id
WHERE o.amount > 1000;

2.2.3 UNION vs UNION ALL

-- ❌ UNION会去重,性能较差
SELECT * FROM orders WHERE status = 'paid'
UNION
SELECT * FROM orders WHERE status = 'completed';

-- ✅ UNION ALL不去重,性能更好(如果业务允许重复)
SELECT * FROM orders WHERE status = 'paid'
UNION ALL
SELECT * FROM orders WHERE status = 'completed';

2.3 大事务优化

2.3.1 大事务拆分

-- ❌ 大事务(可能导致锁等待超时)
START TRANSACTION;
UPDATE users SET balance = balance - 100 WHERE id = 1;
UPDATE users SET balance = balance + 100 WHERE id = 2;
UPDATE orders SET status = 'completed' WHERE order_id = 1001;
-- ... 可能还有更多操作
COMMIT;

-- ✅ 拆分为小事务
START TRANSACTION;
UPDATE users SET balance = balance - 100 WHERE id = 1;
COMMIT;

START TRANSACTION;
UPDATE users SET balance = balance + 100 WHERE id = 2;
COMMIT;

START TRANSACTION;
UPDATE orders SET status = 'completed' WHERE order_id = 1001;
COMMIT;

2.3.2 批量操作优化

-- ❌ 循环单条插入(性能差)
INSERT INTO logs (user_id, action, create_time) VALUES (1, 'login', NOW());
INSERT INTO logs (user_id, action, create_time) VALUES (2, 'login', NOW());
-- ... 循环1000次

-- ✅ 批量插入
INSERT INTO logs (user_id, action, create_time) VALUES 
(1, 'login', NOW()),
(2, 'login', NOW()),
(3, 'login', NOW()),
-- ... 一次插入1000条
(1000, 'login', NOW());

-- ✅ 使用LOAD DATA INFILE(超大数据量)
LOAD DATA LOCAL INFILE '/tmp/logs.csv'
INTO TABLE logs
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, action, create_time);

2.4 并发控制策略

2.4.1 悲观锁 vs 乐观锁

悲观锁(适用于写冲突频繁场景)

-- 显式加锁
SELECT * FROM inventory WHERE product_id = 1001 FOR UPDATE;
-- 执行业务逻辑
UPDATE inventory SET stock = stock - 1 WHERE product_id = 1001;

乐观锁(适用于读多写少场景)

-- 添加版本号字段
ALTER TABLE inventory ADD COLUMN version INT DEFAULT 0;

-- 更新时检查版本
UPDATE inventory 
SET stock = stock - 1, version = version + 1
WHERE product_id = 1001 AND version = #{oldVersion};

-- 影响行数为0时重试

2.4.2 死锁预防

-- 统一加锁顺序(避免循环等待)
-- 场景:转账操作
-- ❌ 错误:不同事务加锁顺序不一致
-- 事务1: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;
-- 事务2: UPDATE users SET balance = balance - 100 WHERE id = 2; UPDATE users SET balance = balance + 100 WHERE id = 1;

-- ✅ 正确:按ID顺序加锁
-- 事务1: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;
-- 事务2: UPDATE users SET balance = balance - 100 WHERE id = 1; UPDATE users SET balance = balance + 100 WHERE id = 2;

三、架构设计:从单机到分布式

3.1 读写分离架构

3.1.1 主从复制原理

MySQL主从复制基于binlog,有三种日志格式:

  • STATEMENT:记录SQL语句(可能导致主从不一致)
  • ROW:记录行变更(推荐,更安全)
  • MIXED:混合模式

3.1.2 主从配置示例

# 主库配置 (my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7

# 从库配置
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1

3.1.3 主从搭建步骤

-- 主库:创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

-- 主库:锁定表并记录binlog位置
FLUSH TABLES WITH READ LOCK;
SHOW MASTER STATUS;  -- 记录File和Position

-- 从库:配置主库信息
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=1234;

-- 从库:启动复制
START SLAVE;
SHOW SLAVE STATUS\G  -- 检查Slave_IO_Running和Slave_SQL_Running

3.1.4 应用层读写分离

# Python示例:使用SQLAlchemy实现读写分离
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class RoutingSession:
    def __init__(self):
        self.master_engine = create_engine('mysql://master/db')
        self.slave_engine = create_engine('mysql://slave/db')
    
    def get_session(self, write=False):
        if write:
            return Session(self.master_engine)
        return Session(self.slave_engine)

# 使用示例
session = routing_session.get_session(write=True)
session.execute("INSERT INTO users (name) VALUES ('张三')")

session = routing_session.get_session(write=False)
result = session.execute("SELECT * FROM users WHERE id = 1")

3.2 分库分表架构

3.2.1 垂直拆分

按业务模块拆分

-- 原始单库
-- 用户库:users, user_profiles, user_settings
-- 订单库:orders, order_items, order_logs
-- 商品库:products, product_categories, product_inventory

-- 拆分后
-- user_db.users
-- user_db.user_profiles
-- order_db.orders
-- order_db.order_items
-- product_db.products
-- product_db.product_categories

3.2.2 水平拆分

按用户ID取模分片

-- 分片规则:user_id % 4
-- 分片0: user_id % 4 = 0
-- 分片1: user_id % 4 = 1
-- 分片2: user_id % 4 = 2
-- 分片3: user_id % 4 = 3

-- 分片表结构
CREATE TABLE users_0 (
    id BIGINT PRIMARY KEY,
    name VARCHAR(50),
    email VARCHAR(100),
    -- 其他字段
    INDEX idx_email (email)
) ENGINE=InnoDB;

CREATE TABLE users_1 LIKE users_0;
CREATE TABLE users_2 LIKE users_0;
CREATE TABLE users_3 LIKE users_0;

3.2.3 分库分表中间件

ShardingSphere-JDBC配置示例

# application.yml
spring:
  shardingsphere:
    datasource:
      names: ds0, ds1, ds2, ds3
      ds0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/db0
        username: root
        password: password
      # ds1, ds2, ds3 类似配置
    
    rules:
      sharding:
        tables:
          users:
            actual-data-nodes: ds${0..3}.users_${0..3}
            table-strategy:
              standard:
                sharding-column: id
                sharding-algorithm-name: user-mod
            database-strategy:
              standard:
                sharding-column: id
                sharding-algorithm-name: db-mod
        
        sharding-algorithms:
          user-mod:
            type: MOD
            props:
              sharding-count: 4
          db-mod:
            type: MOD
            props:
              sharding-count: 4

3.3 缓存策略

3.3.1 多级缓存架构

用户请求 → CDN → Nginx缓存 → 应用缓存(Redis) → MySQL

3.3.2 Redis缓存示例

import redis
import json

class CacheService:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379)
    
    def get_user(self, user_id):
        cache_key = f"user:{user_id}"
        
        # 1. 先查缓存
        cached = self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # 2. 缓存未命中,查数据库
        user = db.query("SELECT * FROM users WHERE id = %s", user_id)
        
        # 3. 写入缓存(设置过期时间)
        if user:
            self.redis_client.setex(
                cache_key, 
                3600,  # 1小时过期
                json.dumps(user)
            )
        
        return user
    
    def update_user(self, user_id, data):
        # 更新数据库
        db.execute("UPDATE users SET name = %s WHERE id = %s", 
                   data['name'], user_id)
        
        # 删除缓存(Cache Aside模式)
        self.redis_client.delete(f"user:{user_id}")

3.3.3 缓存穿透、击穿、雪崩防护

缓存穿透防护(查询不存在的数据)

def get_user(self, user_id):
    cache_key = f"user:{user_id}"
    cached = self.redis_client.get(cache_key)
    if cached:
        return None if cached == "null" else json.loads(cached)
    
    user = db.query("SELECT * FROM users WHERE id = %s", user_id)
    
    # 即使数据不存在,也缓存空值
    if user is None:
        self.redis_client.setex(cache_key, 60, "null")
        return None
    
    self.redis_client.setex(cache_key, 3600, json.dumps(user))
    return user

缓存击穿防护(热点key过期)

def get_hot_data(self, key):
    cache_key = f"hot:{key}"
    
    # 使用SETNX实现分布式锁
    lock_key = f"lock:{key}"
    lock = self.redis_client.set(lock_key, "1", nx=True, ex=10)
    
    if lock:
        try:
            # 只有获取锁的线程才查询数据库
            data = db.query("SELECT * FROM hot_table WHERE key = %s", key)
            self.redis_client.setex(cache_key, 3600, json.dumps(data))
            return data
        finally:
            self.redis_client.delete(lock_key)
    else:
        # 未获取锁,等待后重试
        time.sleep(0.1)
        return self.get_hot_data(key)

缓存雪崩防护(大量key同时过期)

def set_with_jitter(self, key, value, base_expire=3600):
    # 添加随机过期时间(±300秒)
    jitter = random.randint(-300, 300)
    expire = base_expire + jitter
    self.redis_client.setex(key, expire, value)

3.4 高可用架构

3.4.1 MHA高可用方案

MHA(Master High Availability)是MySQL高可用的经典方案。

架构图:

Master (写) → Slave1 (读) → Slave2 (读)
    ↓
MHA Manager (监控)

MHA配置示例

# manager配置文件 /etc/mha/app1.cnf
[server default]
user=mha
password=password
manager_workdir=/var/log/mha/app1
manager_log=/var/log/mha/app1/manager.log
master_binlog_dir=/var/lib/mysql
ping_interval=3
repl_password=password
repl_user=repl

[server1]
hostname=master1
candidate_master=1

[server2]
hostname=slave1
candidate_master=1

[server3]
hostname=slave2
no_master=1

3.4.2 InnoDB Cluster

MySQL 8.0+ 推荐的高可用方案,基于Group Replication。

# 创建InnoDB Cluster
# 1. 配置Group Replication
SET GLOBAL group_replication_bootstrap_group=ON;
SET GLOBAL group_replication_local_address="192.168.1.101:33061";
SET GLOBAL group_replication_group_seeds="192.168.1.101:33061,192.168.1.102:33061,192.168.1.103:33061";

# 2. 创建Cluster
mysqlsh --uri root@localhost:3306 --password=password
dba.createCluster('mycluster');

3.5 分布式事务处理

3.5.1 TCC模式(Try-Confirm-Cancel)

# 转账场景:A向B转账
class TransferService:
    def transfer(self, from_id, to_id, amount):
        # Try阶段:冻结资金
        try:
            self.try_reduce_balance(from_id, amount)
            self.try_increase_balance(to_id, amount)
            
            # Confirm阶段
            self.confirm_reduce_balance(from_id, amount)
            self.confirm_increase_balance(to_id, amount)
        except Exception as e:
            # Cancel阶段
            self.cancel_reduce_balance(from_id, amount)
            self.cancel_increase_balance(to_id, amount)
            raise e
    
    def try_reduce_balance(self, user_id, amount):
        # 冻结资金,不实际扣除
        db.execute("""
            UPDATE users 
            SET frozen_balance = frozen_balance + %s
            WHERE id = %s AND balance >= %s
        """, amount, user_id, amount)
    
    def confirm_reduce_balance(self, user_id, amount):
        # 确认扣除
        db.execute("""
            UPDATE users 
            SET balance = balance - %s,
                frozen_balance = frozen_balance - %s
            WHERE id = %s
        """, amount, amount, user_id)
    
    def cancel_reduce_balance(self, user_id, amount):
        # 解冻资金
        db.execute("""
            UPDATE users 
            SET frozen_balance = frozen_balance - %s
            WHERE id = %s
        """, amount, user_id)

3.5.2 Saga模式

适用于长事务场景,将大事务拆分为一系列小事务。

# 订单创建Saga
class OrderSaga:
    def create_order(self, user_id, product_id, quantity):
        try:
            # 步骤1:创建订单记录
            order_id = self.create_order_record(user_id, product_id, quantity)
            
            # 步骤2:扣减库存
            self.reduce_stock(product_id, quantity)
            
            # 步骤3:扣减用户余额
            self.reduce_balance(user_id, product_id, quantity)
            
            # 步骤4:更新订单状态为成功
            self.update_order_status(order_id, 'SUCCESS')
            
        except Exception as e:
            # 补偿事务
            self.compensate(order_id)
    
    def compensate(self, order_id):
        # 根据订单状态执行补偿
        order = self.get_order(order_id)
        if order.status == 'CREATED':
            # 补偿:恢复库存
            self.restore_stock(order.product_id, order.quantity)
            # 补偿:恢复余额
            self.restore_balance(order.user_id, order.amount)
            # 标记订单为已取消
            self.update_order_status(order_id, 'CANCELLED')

四、监控与调优:持续优化的保障

4.1 性能监控指标

4.1.1 关键指标

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';

-- 查看活跃连接数
SHOW STATUS LIKE 'Threads_running';

-- 查看QPS
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Queries'  -- 1秒后再次查询,计算差值

-- 查看TPS
SHOW GLOBAL STATUS LIKE 'Com_commit';
SHOW GLOBAL STATUS LIKE 'Com_rollback';

4.1.2 InnoDB引擎状态

SHOW ENGINE INNODB STATUS\G

重点关注:

  • TRANSACTIONS:锁等待和死锁信息
  • SEMAPHORES:信号量等待
  • FILE I/O:I/O性能
  • BUFFER POOL:缓存命中率

4.2 慢查询监控系统

4.2.1 使用Percona Toolkit

# 安装
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 在线分析
pt-query-digest --processlist h=localhost,u=root,p=password --interval 10

4.2.2 自定义监控脚本

#!/usr/bin/env python3
import mysql.connector
import time
import smtplib

def monitor_slow_queries():
    conn = mysql.connector.connect(
        host='localhost',
        user='monitor',
        password='password',
        database='performance_schema'
    )
    cursor = conn.cursor()
    
    # 查询最近5分钟的慢查询
    cursor.execute("""
        SELECT 
            DIGEST_TEXT,
            COUNT_STAR,
            AVG_TIMER_WAIT/1000000000000 as avg_time_sec
        FROM events_statements_summary_by_digest
        WHERE LAST_SEEN > NOW() - INTERVAL 5 MINUTE
          AND AVG_TIMER_WAIT > 1000000000000  -- 超过1秒
        ORDER BY COUNT_STAR DESC
        LIMIT 10
    """)
    
    slow_queries = cursor.fetchall()
    
    if slow_queries:
        # 发送告警
        send_alert(slow_queries)
    
    conn.close()

def send_alert(queries):
    # 实现告警发送逻辑
    subject = "MySQL慢查询告警"
    body = "\n".join([f"{q[0]}: {q[1]}次, 平均{q[2]:.2f}秒" for q in queries])
    
    # 发送邮件或钉钉/企业微信
    print(f"ALERT: {body}")

if __name__ == '__main__':
    while True:
        monitor_slow_queries()
        time.sleep(60)  # 每分钟检查一次

4.3 性能调优工具

4.3.1 sys schema(MySQL 5.7+)

-- 查看最耗时的查询
SELECT * FROM sys.statement_analysis ORDER BY avg_latency DESC LIMIT 10;

-- 查看未使用的索引
SELECT * FROM sys.schema_unused_indexes;

-- 查看冗余索引
SELECT * FROM sys.schema_redundant_indexes;

-- 查看表的I/O情况
SELECT * FROM sys.schema_table_statistics ORDER BY total_latency DESC;

4.3.2 MySQLTuner

# 下载并运行
wget https://raw.githubusercontent.com/major/MySQLTuner-perl/master/mysqltuner.pl
perl mysqltuner.pl --user root --password password

五、实战案例:秒杀系统设计

5.1 秒杀场景特点

  • 瞬时流量:QPS可能达到平时的100倍以上
  • 库存扣减:需要保证不超卖
  • 热点数据:少数商品被大量用户抢购
  • 防作弊:防止黄牛刷单

5.2 秒杀架构设计

5.2.1 整体架构

客户端 → CDN → Nginx → 秒杀服务 → Redis → MySQL
                ↓
            消息队列 → 异步处理

5.2.2 核心代码实现

Redis预扣库存

class SeckillService:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        self.db = create_db_connection()
    
    def seckill(self, user_id, product_id):
        # 1. 限流(令牌桶)
        if not self.rate_limit(user_id):
            return {"code": 429, "msg": "请求过于频繁"}
        
        # 2. 校验用户资格(防刷)
        if not self.check_user资格(user_id, product_id):
            return {"code": 403, "msg": "不符合购买条件"}
        
        # 3. 预扣库存(Redis原子操作)
        stock_key = f"seckill:stock:{product_id}"
        user_key = f"seckill:user:{product_id}"
        
        # 检查是否已购买
        if self.redis.sismember(user_key, user_id):
            return {"code": 400, "msg": "您已参与过秒杀"}
        
        # 原子扣减库存
        stock = self.redis.decr(stock_key)
        if stock < 0:
            self.redis.incr(stock_key)  # 回滚
            return {"code": 500, "msg": "库存不足"}
        
        # 4. 记录已购买用户
        self.redis.sadd(user_key, user_id)
        
        # 5. 发送消息到队列(异步创建订单)
        self.send_to_queue({
            "user_id": user_id,
            "product_id": product_id,
            "stock": stock,
            "timestamp": time.time()
        })
        
        return {"code": 200, "msg": "秒杀成功", "order_id": None}
    
    def rate_limit(self, user_id):
        # 令牌桶算法
        key = f"rate_limit:{user_id}"
        capacity = 10  # 桶容量
        refill_rate = 1  # 每秒 refill 1个令牌
        
        current = self.redis.get(key)
        if current is None:
            self.redis.setex(key, 60, capacity - 1)
            return True
        
        if int(current) > 0:
            self.redis.decr(key)
            return True
        
        return False
    
    def check_user资格(self, user_id, product_id):
        # 检查黑名单
        if self.redis.sismember("blacklist:users", user_id):
            return False
        
        # 检查购买频率(1小时内最多1次)
        freq_key = f"user_freq:{user_id}:{product_id}"
        if self.redis.exists(freq_key):
            return False
        
        self.redis.setex(freq_key, 3600, "1")
        return True
    
    def send_to_queue(self, message):
        # 发送到RabbitMQ/Kafka
        pass

异步订单处理

def process_seckill_order(message):
    """
    消费消息队列,创建订单
    """
    user_id = message['user_id']
    product_id = message['product_id']
    
    try:
        # 1. 开启事务
        with db.transaction():
            # 2. 检查库存(防止Redis和DB不一致)
            stock = db.query(
                "SELECT stock FROM products WHERE id = %s FOR UPDATE",
                product_id
            )
            if stock <= 0:
                raise Exception("库存不足")
            
            # 3. 扣减数据库库存
            result = db.execute(
                "UPDATE products SET stock = stock - 1 WHERE id = %s AND stock > 0",
                product_id
            )
            if result.rowcount == 0:
                raise Exception("数据库库存不足")
            
            # 4. 创建订单
            order_id = db.execute("""
                INSERT INTO orders (user_id, product_id, status, create_time)
                VALUES (%s, %s, 'SUCCESS', NOW())
            """, user_id, product_id)
            
            # 5. 记录订单号(用于后续查询)
            self.redis.setex(f"seckill:order:{user_id}:{product_id}", 3600, order_id)
            
    except Exception as e:
        # 异常处理:恢复Redis库存
        self.redis.incr(f"seckill:stock:{product_id}")
        self.redis.srem(f"seckill:user:{product_id}", user_id)
        logger.error(f"订单处理失败: {e}")

5.2.3 数据库表设计

-- 商品表(库存单独维护)
CREATE TABLE products (
    id BIGINT PRIMARY KEY,
    name VARCHAR(200),
    total_stock INT,
    seckill_stock INT,  -- 秒杀库存
    seckill_start DATETIME,
    seckill_end DATETIME,
    INDEX idx_seckill_time (seckill_start, seckill_end)
) ENGINE=InnoDB;

-- 订单表(分表)
CREATE TABLE orders_0 (
    order_id BIGINT PRIMARY KEY,
    user_id BIGINT,
    product_id BIGINT,
    amount DECIMAL(10,2),
    status ENUM('PENDING', 'SUCCESS', 'FAILED'),
    create_time DATETIME,
    INDEX idx_user (user_id),
    INDEX idx_product (product_id)
) ENGINE=InnoDB;

-- 秒杀记录表(只记录成功订单)
CREATE TABLE seckill_records (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT,
    product_id BIGINT,
    order_id BIGINT,
    create_time DATETIME,
    INDEX idx_user_product (user_id, product_id)
) ENGINE=InnoDB;

5.3 压测与优化

5.3.1 使用sysbench进行压测

# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=test --tables=10 --table-size=100000 \
         /usr/share/sysbench/oltp_read_write.lua prepare

# 执行压测
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=test --tables=10 --table-size=100000 \
         --threads=100 --time=60 --report-interval=10 \
         /usr/share/sysbench/oltp_read_write.lua run

# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=password \
         --mysql-db=test \
         /usr/share/sysbench/oltp_read_write.lua cleanup

5.3.2 性能优化 checklist

  • [ ] 索引是否覆盖查询?
  • [ ] 是否存在全表扫描?
  • [ ] 连接池配置是否合理?
  • [ ] 慢查询是否已优化?
  • [ ] 缓存命中率是否>95%?
  • [ ] 主从延迟是否秒?
  • [ ] 是否开启查询缓存(MySQL 8.0已移除)?
  • [ ] InnoDB缓冲池大小是否合理(建议70-80%内存)?
  • [ ] 日志文件大小是否合适(避免频繁刷盘)?
  • [ ] 是否启用压缩表?

六、总结与最佳实践

6.1 性能优化金字塔

        架构设计(分库分表、缓存)
            ↓
        查询优化(执行计划、大事务)
            ↓
        索引优化(覆盖索引、最左前缀)
            ↓
        配置优化(缓冲池、连接池)

6.2 高并发黄金法则

  1. 能不查库就不查库:缓存为王
  2. 能异步就不同步:消息队列解耦
  3. 能限制就不放行:限流熔断
  4. 能拆分就不合并:分而治之
  5. 能简单就不复杂:避免过度设计

6.3 持续优化建议

  1. 建立监控体系:实时掌握数据库健康状况
  2. 定期慢查询分析:每周review一次
  3. 压测常态化:每次大促前必须压测
  4. 容量规划:提前3个月预估增长
  5. 故障演练:定期演练主从切换、宕机恢复

6.4 推荐工具与资源

  • 监控:Prometheus + Grafana + mysqld_exporter
  • 慢查询分析:Percona Toolkit, pt-query-digest
  • 性能调优:MySQLTuner, tuning-primer
  • 备份:Percona XtraBackup
  • 高可用:Orchestrator, MHA
  • 分库分表:ShardingSphere, Vitess

通过以上策略的综合运用,MySQL完全可以应对万级QPS的高并发场景。关键在于预防优于治疗,在系统设计之初就考虑好扩展性和性能,而不是等到出现问题再紧急优化。