在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交平台还是金融交易系统,MySQL作为最流行的关系型数据库之一,如何有效应对高并发挑战是每个开发者和DBA必须掌握的技能。本文将从索引优化、查询优化、架构设计等多个维度,提供一套完整的MySQL高并发处理实战方案。

一、高并发场景下的性能瓶颈分析

1.1 什么是高并发?

高并发通常指系统在短时间内同时处理大量请求。例如:

  • 电商秒杀:10万用户同时抢购1000件商品
  • 社交热点:明星发布动态后百万级点赞/评论
  • 金融交易:每秒数千笔交易处理

1.2 MySQL在高并发下的常见瓶颈

  1. CPU瓶颈:复杂查询、大量排序/分组操作
  2. IO瓶颈:磁盘读写频繁,尤其是随机IO
  3. 锁竞争:行锁、表锁导致的等待
  4. 连接数限制:max_connections配置不足
  5. 内存不足:缓冲池(Buffer Pool)命中率低

二、索引优化:高并发的基石

2.1 索引设计原则

2.1.1 最左前缀原则

-- 创建复合索引
CREATE INDEX idx_user_order ON orders(user_id, order_date, status);

-- 有效使用索引的查询
SELECT * FROM orders WHERE user_id = 1001; -- ✅ 使用索引
SELECT * FROM orders WHERE user_id = 1001 AND order_date = '2023-10-01'; -- ✅ 使用索引
SELECT * FROM orders WHERE user_id = 1001 AND status = 'paid'; -- ✅ 使用索引(部分)

-- 无法使用索引的查询
SELECT * FROM orders WHERE order_date = '2023-10-01'; -- ❌ 无法使用索引
SELECT * FROM orders WHERE status = 'paid'; -- ❌ 无法使用索引

2.1.2 覆盖索引(Covering Index)

-- 创建覆盖索引
CREATE INDEX idx_cover ON orders(user_id, order_date, status, amount);

-- 查询只需要扫描索引,无需回表
SELECT user_id, order_date, status, amount 
FROM orders 
WHERE user_id = 1001 AND order_date >= '2023-10-01';

2.1.3 索引选择性优化

-- 查看列的选择性(唯一值/总行数)
SELECT 
    COUNT(DISTINCT status) / COUNT(*) AS selectivity,
    COUNT(DISTINCT status) AS distinct_count
FROM orders;

-- 选择性低于0.1的列不适合单独建索引
-- 选择性高的列(如user_id)优先建索引

2.2 索引优化实战案例

案例1:电商订单查询优化

-- 原始查询(无索引,全表扫描)
SELECT * FROM orders 
WHERE user_id = 1001 
  AND order_date BETWEEN '2023-01-01' AND '2023-12-31'
  AND status IN ('paid', 'shipped')
ORDER BY order_date DESC
LIMIT 10;

-- 优化方案1:创建复合索引
CREATE INDEX idx_user_date_status ON orders(user_id, order_date DESC, status);

-- 优化方案2:如果需要分页,避免深度分页问题
SELECT * FROM orders 
WHERE user_id = 1001 
  AND order_date BETWEEN '2023-01-01' AND '2023-12-31'
  AND status IN ('paid', 'shipped')
  AND order_date < '2023-06-01'  -- 使用上一页的最后一条记录
ORDER BY order_date DESC
LIMIT 10;

案例2:多条件组合查询

-- 场景:用户搜索订单,支持多条件组合
-- 条件:用户ID、订单状态、时间范围、金额范围

-- 优化前:多个单列索引,MySQL可能只使用一个
CREATE INDEX idx_user ON orders(user_id);
CREATE INDEX idx_status ON orders(status);
CREATE INDEX idx_date ON orders(order_date);

-- 优化后:创建智能复合索引
-- 分析查询模式,创建多个复合索引覆盖常见查询
CREATE INDEX idx_user_status_date ON orders(user_id, status, order_date);
CREATE INDEX idx_user_date_amount ON orders(user_id, order_date, amount);
CREATE INDEX idx_status_date ON orders(status, order_date);

-- 使用EXPLAIN分析执行计划
EXPLAIN SELECT * FROM orders 
WHERE user_id = 1001 
  AND status = 'paid' 
  AND order_date >= '2023-10-01';

2.3 索引维护与监控

-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    stat_value,
    stat_description
FROM mysql.innodb_index_stats 
WHERE database_name = 'your_database'
  AND table_name = 'orders';

-- 查看未使用的索引(MySQL 8.0+)
SELECT 
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    t.INDEX_NAME,
    s.ROWS_READ,
    s.ROWS_INSERTED,
    s.ROWS_UPDATED,
    s.ROWS_DELETED
FROM information_schema.STATISTICS t
LEFT JOIN performance_schema.table_io_waits_summary_by_index_usage s
ON t.TABLE_SCHEMA = s.OBJECT_SCHEMA 
   AND t.TABLE_NAME = s.OBJECT_NAME 
   AND t.INDEX_NAME = s.INDEX_NAME
WHERE t.TABLE_SCHEMA = 'your_database'
  AND s.ROWS_READ = 0
  AND s.ROWS_INSERTED = 0
  AND s.ROWS_UPDATED = 0
  AND s.ROWS_DELETED = 0;

三、查询优化:提升执行效率

3.1 避免全表扫描

3.1.1 WHERE条件优化

-- ❌ 避免在索引列上使用函数
SELECT * FROM orders WHERE DATE(order_date) = '2023-10-01';
-- ✅ 优化为范围查询
SELECT * FROM orders WHERE order_date BETWEEN '2023-10-01' AND '2023-10-02';

-- ❌ 避免使用LIKE前缀通配符
SELECT * FROM users WHERE username LIKE '%john%';
-- ✅ 如果必须使用,考虑全文索引
ALTER TABLE users ADD FULLTEXT INDEX ft_username (username);
SELECT * FROM users WHERE MATCH(username) AGAINST('john');

3.1.2 JOIN优化

-- ❌ 避免笛卡尔积
SELECT * FROM orders o, users u WHERE o.user_id = u.id;

-- ✅ 明确JOIN类型和条件
SELECT o.*, u.username 
FROM orders o 
INNER JOIN users u ON o.user_id = u.id
WHERE o.status = 'paid';

-- ✅ 小表驱动大表
-- orders表100万行,users表10万行
-- 应该用users驱动orders
SELECT o.*, u.username 
FROM users u 
INNER JOIN orders o ON u.id = o.user_id
WHERE u.status = 'active';

3.2 分页优化

3.2.1 深度分页问题

-- ❌ 传统分页(越往后越慢)
SELECT * FROM orders ORDER BY order_date DESC LIMIT 1000000, 10;

-- ✅ 优化方案1:使用子查询
SELECT * FROM orders 
WHERE order_date <= (
    SELECT order_date FROM orders 
    ORDER BY order_date DESC 
    LIMIT 1000000, 1
)
ORDER BY order_date DESC 
LIMIT 10;

-- ✅ 优化方案2:使用游标分页(推荐)
-- 第一页
SELECT * FROM orders 
WHERE order_date >= '2023-01-01'
ORDER BY order_date DESC 
LIMIT 10;

-- 第二页(记录上一页最后一条的order_date)
SELECT * FROM orders 
WHERE order_date < '2023-01-01 12:00:00'  -- 上一页最后一条的order_date
ORDER BY order_date DESC 
LIMIT 10;

3.3 批量操作优化

-- ❌ 逐条插入(1000条记录)
INSERT INTO orders (user_id, amount, status) VALUES (1, 100, 'paid');
INSERT INTO orders (user_id, amount, status) VALUES (2, 200, 'paid');
-- ... 重复1000次

-- ✅ 批量插入
INSERT INTO orders (user_id, amount, status) VALUES 
(1, 100, 'paid'),
(2, 200, 'paid'),
(3, 300, 'paid'),
-- ... 1000条记录
(1000, 1000, 'paid');

-- ✅ 使用LOAD DATA INFILE(最快)
LOAD DATA LOCAL INFILE '/tmp/orders.csv'
INTO TABLE orders
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
(user_id, amount, status);

四、架构优化:从单机到分布式

4.1 读写分离架构

4.1.1 基于Proxy的读写分离

# 使用Python + PyMySQL实现读写分离
import pymysql
from pymysql import connections

class ReadWriteSplittingProxy:
    def __init__(self):
        # 主库配置(写操作)
        self.master_config = {
            'host': 'master.example.com',
            'user': 'root',
            'password': 'password',
            'database': 'app_db'
        }
        
        # 从库配置(读操作)
        self.slave_configs = [
            {'host': 'slave1.example.com', 'user': 'root', 'password': 'password', 'database': 'app_db'},
            {'host': 'slave2.example.com', 'user': 'root', 'password': 'password', 'database': 'app_db'}
        ]
        
        self.current_slave_index = 0
    
    def get_connection(self, is_write=False):
        """获取数据库连接"""
        if is_write:
            # 写操作连接主库
            return pymysql.connect(**self.master_config)
        else:
            # 读操作轮询从库
            slave_config = self.slave_configs[self.current_slave_index]
            self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_configs)
            return pymysql.connect(**slave_config)
    
    def execute_query(self, sql, params=None, is_write=False):
        """执行查询"""
        conn = self.get_connection(is_write)
        try:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                if is_write:
                    conn.commit()
                    return cursor.rowcount
                else:
                    return cursor.fetchall()
        finally:
            conn.close()

# 使用示例
proxy = ReadWriteSplittingProxy()

# 写操作(主库)
proxy.execute_query(
    "INSERT INTO orders (user_id, amount) VALUES (%s, %s)",
    (1001, 150.00),
    is_write=True
)

# 读操作(从库轮询)
results = proxy.execute_query(
    "SELECT * FROM orders WHERE user_id = %s",
    (1001,),
    is_write=False
)

4.1.2 基于中间件的读写分离(ShardingSphere)

# ShardingSphere配置示例
spring:
  shardingsphere:
    datasource:
      names: master,slave1,slave2
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master.example.com:3306/app_db
        username: root
        password: password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1.example.com:3306/app_db
        username: root
        password: password
      slave2:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave2.example.com:3306/app_db
        username: root
        password: password
    
    rules:
      readwrite-splitting:
        data-sources:
          ds0:
            type: Static
            props:
              write-data-source-name: master
              read-data-source-names: slave1,slave2
              load-balancer-name: round_robin
    
    props:
      sql-show: true

4.2 分库分表策略

4.2.1 水平分表(Sharding)

-- 按用户ID取模分表
-- orders_0, orders_1, orders_2, orders_3

-- 创建分表函数
CREATE FUNCTION get_shard_table(user_id INT) RETURNS VARCHAR(50)
DETERMINISTIC
BEGIN
    DECLARE shard_num INT;
    SET shard_num = user_id % 4;
    RETURN CONCAT('orders_', shard_num);
END;

-- 查询时动态选择表
SET @table_name = get_shard_table(1001);
SET @sql = CONCAT('SELECT * FROM ', @table_name, ' WHERE user_id = 1001');
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;

4.2.2 垂直分表

-- 原始大表
CREATE TABLE orders (
    id BIGINT PRIMARY KEY,
    user_id INT,
    amount DECIMAL(10,2),
    status VARCHAR(20),
    -- 其他20个字段...
    created_at DATETIME,
    updated_at DATETIME
);

-- 垂直拆分:订单基本信息 + 订单详情
CREATE TABLE orders_base (
    id BIGINT PRIMARY KEY,
    user_id INT,
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at DATETIME,
    updated_at DATETIME
);

CREATE TABLE orders_detail (
    order_id BIGINT PRIMARY KEY,
    shipping_address TEXT,
    payment_method VARCHAR(50),
    -- 其他详情字段...
    FOREIGN KEY (order_id) REFERENCES orders_base(id)
);

4.3 缓存层优化

4.3.1 Redis缓存策略

import redis
import json
from functools import wraps

class RedisCache:
    def __init__(self):
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )
    
    def cache_query(self, ttl=300):
        """查询缓存装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存key
                cache_key = f"query:{func.__name__}:{str(args)}:{str(kwargs)}"
                
                # 尝试从缓存获取
                cached = self.redis_client.get(cache_key)
                if cached:
                    return json.loads(cached)
                
                # 缓存未命中,执行查询
                result = func(*args, **kwargs)
                
                # 写入缓存
                self.redis_client.setex(
                    cache_key,
                    ttl,
                    json.dumps(result)
                )
                
                return result
            return wrapper
        return decorator

# 使用示例
cache = RedisCache()

@cache.cache_query(ttl=60)
def get_user_orders(user_id, limit=10):
    """查询用户订单(带缓存)"""
    conn = pymysql.connect(**db_config)
    try:
        with conn.cursor() as cursor:
            cursor.execute(
                "SELECT * FROM orders WHERE user_id = %s ORDER BY created_at DESC LIMIT %s",
                (user_id, limit)
            )
            return cursor.fetchall()
    finally:
        conn.close()

# 首次查询会执行SQL,后续查询直接从Redis返回
orders = get_user_orders(1001, 10)

4.3.2 缓存穿透/雪崩防护

class SafeRedisCache(RedisCache):
    def get_with_fallback(self, key, fallback_func, ttl=300, lock_ttl=10):
        """带防护的缓存获取"""
        # 1. 尝试获取缓存
        cached = self.redis_client.get(key)
        if cached:
            return json.loads(cached)
        
        # 2. 获取分布式锁,防止缓存击穿
        lock_key = f"lock:{key}"
        lock_acquired = self.redis_client.set(
            lock_key, 
            "1", 
            ex=lock_ttl, 
            nx=True  # 仅当key不存在时设置
        )
        
        if lock_acquired:
            try:
                # 3. 双重检查(防止并发)
                cached = self.redis_client.get(key)
                if cached:
                    return json.loads(cached)
                
                # 4. 执行查询
                result = fallback_func()
                
                # 5. 写入缓存(空值也缓存,防止穿透)
                if result is None:
                    # 缓存空值,设置较短过期时间
                    self.redis_client.setex(key, 60, json.dumps(None))
                else:
                    self.redis_client.setex(key, ttl, json.dumps(result))
                
                return result
            finally:
                # 6. 释放锁
                self.redis_client.delete(lock_key)
        else:
            # 7. 等待并重试
            import time
            time.sleep(0.1)
            return self.get_with_fallback(key, fallback_func, ttl, lock_ttl)

五、MySQL配置优化

5.1 关键参数配置

# my.cnf 配置示例(MySQL 8.0)
[mysqld]
# 基础配置
port = 3306
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql

# 内存配置(根据服务器内存调整)
innodb_buffer_pool_size = 8G  # 通常设置为总内存的50-70%
innodb_buffer_pool_instances = 8  # 缓冲池实例数,减少竞争
innodb_log_file_size = 2G  # 日志文件大小
innodb_log_buffer_size = 16M  # 日志缓冲区

# 连接配置
max_connections = 1000  # 最大连接数
max_connect_errors = 100000
thread_cache_size = 100  # 线程缓存

# 查询缓存(MySQL 8.0已移除,5.7及以下版本)
# query_cache_type = 0  # 建议关闭

# InnoDB配置
innodb_flush_log_at_trx_commit = 2  # 1:每次提交都刷盘(安全),2:每秒刷盘(性能)
innodb_flush_method = O_DIRECT  # 直接IO,避免双缓冲
innodb_file_per_table = ON  # 每个表独立文件
innodb_read_io_threads = 8  # 读线程
innodb_write_io_threads = 8  # 写线程

# 日志配置
slow_query_log = ON
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2  # 慢查询阈值(秒)
log_queries_not_using_indexes = ON

# 复制配置(主从)
server_id = 1  # 主库ID
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW  # 行级复制
expire_logs_days = 7  # 日志保留天数

5.2 动态参数调整

-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'max_connections';

-- 动态调整(无需重启)
SET GLOBAL innodb_buffer_pool_size = 8589934592;  -- 8GB
SET GLOBAL max_connections = 1000;

-- 查看性能状态
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read_requests';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_reads';

-- 计算缓冲池命中率
SELECT 
    (1 - (Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests)) * 100 AS hit_rate
FROM (
    SELECT 
        MAX(CASE WHEN VARIABLE_NAME = 'Innodb_buffer_pool_reads' THEN VARIABLE_VALUE END) AS Innodb_buffer_pool_reads,
        MAX(CASE WHEN VARIABLE_NAME = 'Innodb_buffer_pool_read_requests' THEN VARIABLE_VALUE END) AS Innodb_buffer_pool_read_requests
    FROM performance_schema.global_status
) AS stats;

六、监控与告警

6.1 性能监控指标

-- 1. 连接数监控
SELECT 
    COUNT(*) AS total_connections,
    SUM(CASE WHEN COMMAND = 'Sleep' THEN 1 ELSE 0 END) AS idle_connections,
    SUM(CASE WHEN COMMAND != 'Sleep' THEN 1 ELSE 0 END) AS active_connections
FROM information_schema.PROCESSLIST;

-- 2. 慢查询监控
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 AS avg_time_sec,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
WHERE SCHEMA_NAME = 'your_database'
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 3. 锁等待监控
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;

6.2 自动化监控脚本

#!/usr/bin/env python3
"""
MySQL性能监控脚本
"""
import pymysql
import time
import smtplib
from email.mime.text import MIMEText

class MySQLMonitor:
    def __init__(self, db_config, alert_thresholds):
        self.db_config = db_config
        self.thresholds = alert_thresholds
    
    def check_connections(self):
        """检查连接数"""
        conn = pymysql.connect(**self.db_config)
        try:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT 
                        VARIABLE_VALUE as max_conn,
                        (SELECT COUNT(*) FROM information_schema.PROCESSLIST) as current_conn
                    FROM performance_schema.global_variables 
                    WHERE VARIABLE_NAME = 'max_connections'
                """)
                result = cursor.fetchone()
                max_conn, current_conn = result
                
                # 计算使用率
                usage_rate = (current_conn / max_conn) * 100
                
                if usage_rate > self.thresholds['connection_usage']:
                    self.send_alert(
                        f"连接数告警: {current_conn}/{max_conn} ({usage_rate:.1f}%)",
                        f"当前连接数: {current_conn}\n最大连接数: {max_conn}\n使用率: {usage_rate:.1f}%"
                    )
                
                return usage_rate
        finally:
            conn.close()
    
    def check_slow_queries(self):
        """检查慢查询"""
        conn = pymysql.connect(**self.db_config)
        try:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT 
                        COUNT(*) as slow_count,
                        AVG(AVG_TIMER_WAIT/1000000000000) as avg_time
                    FROM performance_schema.events_statements_summary_by_digest
                    WHERE SCHEMA_NAME = 'your_database'
                    AND AVG_TIMER_WAIT > %s
                """, (self.thresholds['slow_query_time'] * 1000000000000,))
                
                result = cursor.fetchone()
                slow_count, avg_time = result
                
                if slow_count > self.thresholds['slow_query_count']:
                    self.send_alert(
                        f"慢查询告警: {slow_count}条",
                        f"平均执行时间: {avg_time:.2f}秒\n阈值: {self.thresholds['slow_query_time']}秒"
                    )
                
                return slow_count
        finally:
            conn.close()
    
    def send_alert(self, subject, body):
        """发送告警邮件"""
        # 配置邮件服务器
        smtp_server = "smtp.example.com"
        smtp_port = 587
        sender = "monitor@example.com"
        password = "password"
        receiver = "dba@example.com"
        
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = receiver
        
        try:
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(sender, password)
            server.send_message(msg)
            server.quit()
            print(f"告警已发送: {subject}")
        except Exception as e:
            print(f"发送告警失败: {e}")

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'monitor',
        'password': 'password',
        'database': 'mysql'
    }
    
    thresholds = {
        'connection_usage': 80,  # 连接使用率超过80%告警
        'slow_query_time': 2,    # 慢查询阈值2秒
        'slow_query_count': 10   # 慢查询数量超过10条告警
    }
    
    monitor = MySQLMonitor(db_config, thresholds)
    
    # 定时监控(示例:每5分钟检查一次)
    while True:
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 开始监控...")
        monitor.check_connections()
        monitor.check_slow_queries()
        time.sleep(300)  # 5分钟

七、实战案例:电商秒杀系统

7.1 系统架构设计

用户请求 → Nginx负载均衡 → 应用服务器集群 → Redis缓存层 → MySQL主从集群
                    ↓
                消息队列(异步处理)

7.2 核心代码实现

7.2.1 秒杀下单接口

import redis
import pymysql
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

class SeckillService:
    def __init__(self):
        # Redis连接
        self.redis_client = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )
        
        # 数据库连接池
        self.db_pool = pymysql.pool.SimpleConnectionPool(
            1, 20,  # 最小连接数,最大连接数
            host='localhost',
            user='root',
            password='password',
            database='seckill_db',
            charset='utf8mb4'
        )
        
        # 线程池
        self.thread_pool = ThreadPoolExecutor(max_workers=10)
    
    def seckill(self, user_id, product_id, quantity):
        """秒杀下单"""
        # 1. 参数校验
        if quantity <= 0:
            return {'success': False, 'message': '数量必须大于0'}
        
        # 2. 检查用户是否已秒杀
        user_key = f"seckill:user:{user_id}:product:{product_id}"
        if self.redis_client.exists(user_key):
            return {'success': False, 'message': '您已参与过该秒杀'}
        
        # 3. 检查库存(使用Redis原子操作)
        stock_key = f"seckill:stock:{product_id}"
        stock = self.redis_client.decr(stock_key)
        
        if stock < 0:
            # 库存不足,恢复库存
            self.redis_client.incr(stock_key)
            return {'success': False, 'message': '库存不足'}
        
        # 4. 异步创建订单(避免阻塞)
        future = self.thread_pool.submit(
            self._create_order_async,
            user_id, product_id, quantity
        )
        
        # 5. 记录用户参与
        self.redis_client.setex(user_key, 3600, '1')
        
        return {
            'success': True,
            'message': '秒杀成功,订单处理中',
            'order_id': f"SECKILL_{user_id}_{product_id}_{int(time.time())}"
        }
    
    def _create_order_async(self, user_id, product_id, quantity):
        """异步创建订单"""
        conn = None
        try:
            conn = self.db_pool.get_connection()
            conn.autocommit(False)  # 开启事务
            
            with conn.cursor() as cursor:
                # 1. 扣减数据库库存(乐观锁)
                cursor.execute("""
                    UPDATE products 
                    SET stock = stock - %s,
                        version = version + 1
                    WHERE id = %s 
                    AND stock >= %s
                    AND version = (
                        SELECT version FROM products WHERE id = %s
                    )
                """, (quantity, product_id, quantity, product_id))
                
                if cursor.rowcount == 0:
                    conn.rollback()
                    # 库存不足,恢复Redis库存
                    self.redis_client.incrby(
                        f"seckill:stock:{product_id}",
                        quantity
                    )
                    return False
                
                # 2. 创建订单
                order_id = f"ORDER_{user_id}_{product_id}_{int(time.time())}"
                cursor.execute("""
                    INSERT INTO orders 
                    (id, user_id, product_id, quantity, status, created_at)
                    VALUES (%s, %s, %s, %s, 'pending', NOW())
                """, (order_id, user_id, product_id, quantity))
                
                # 3. 记录订单详情
                cursor.execute("""
                    INSERT INTO order_items 
                    (order_id, product_id, quantity, price)
                    SELECT %s, id, %s, price 
                    FROM products WHERE id = %s
                """, (order_id, quantity, product_id))
                
                conn.commit()
                
                # 4. 发送消息到MQ(异步处理支付、物流等)
                self._send_to_mq({
                    'order_id': order_id,
                    'user_id': user_id,
                    'product_id': product_id,
                    'quantity': quantity,
                    'timestamp': datetime.now().isoformat()
                })
                
                return True
                
        except Exception as e:
            if conn:
                conn.rollback()
            # 恢复Redis库存
            self.redis_client.incrby(
                f"seckill:stock:{product_id}",
                quantity
            )
            print(f"订单创建失败: {e}")
            return False
        finally:
            if conn:
                conn.close()
    
    def _send_to_mq(self, message):
        """发送消息到消息队列"""
        # 这里可以使用RabbitMQ、Kafka等
        # 简化示例:写入Redis列表
        self.redis_client.rpush('order_queue', json.dumps(message))
    
    def preload_cache(self, product_id, stock):
        """预热缓存"""
        # 秒杀开始前预热库存到Redis
        stock_key = f"seckill:stock:{product_id}"
        self.redis_client.set(stock_key, stock)
        
        # 设置过期时间(秒杀结束后清理)
        self.redis_client.expire(stock_key, 3600)  # 1小时
        
        # 预热商品信息
        product_key = f"seckill:product:{product_id}"
        conn = self.db_pool.get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT * FROM products WHERE id = %s",
                    (product_id,)
                )
                product = cursor.fetchone()
                if product:
                    self.redis_client.setex(
                        product_key,
                        3600,
                        json.dumps(product)
                    )
        finally:
            conn.close()

7.2.2 库存预热脚本

#!/usr/bin/env python3
"""
秒杀库存预热脚本
"""
import pymysql
import redis
import time

def preload_seckill_stock():
    """预热秒杀商品库存到Redis"""
    # 数据库连接
    db_conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='seckill_db'
    )
    
    # Redis连接
    redis_client = redis.Redis(
        host='localhost',
        port=6379,
        db=0
    )
    
    try:
        with db_conn.cursor() as cursor:
            # 查询秒杀商品
            cursor.execute("""
                SELECT id, seckill_stock, seckill_price 
                FROM products 
                WHERE seckill_start <= NOW() 
                AND seckill_end >= NOW()
                AND seckill_stock > 0
            """)
            
            products = cursor.fetchall()
            
            for product in products:
                product_id, stock, price = product
                
                # 预热库存
                stock_key = f"seckill:stock:{product_id}"
                redis_client.set(stock_key, stock)
                redis_client.expire(stock_key, 3600)  # 1小时
                
                # 预热商品信息
                product_key = f"seckill:product:{product_id}"
                redis_client.setex(
                    product_key,
                    3600,
                    f'{{"id":{product_id},"price":{price}}}'
                )
                
                print(f"预热商品 {product_id}: 库存 {stock}, 价格 {price}")
            
            print(f"预热完成,共 {len(products)} 个商品")
            
    finally:
        db_conn.close()

if __name__ == "__main__":
    preload_seckill_stock()

八、总结与最佳实践

8.1 高并发优化 checklist

  1. 索引优化

    • [ ] 确保所有查询都有合适的索引
    • [ ] 定期检查并删除未使用的索引
    • [ ] 使用覆盖索引减少回表
    • [ ] 避免索引列上的函数操作
  2. 查询优化

    • [ ] 避免SELECT *
    • [ ] 使用EXPLAIN分析执行计划
    • [ ] 优化JOIN操作(小表驱动大表)
    • [ ] 避免深度分页,使用游标分页
  3. 架构优化

    • [ ] 实施读写分离
    • [ ] 考虑分库分表(按业务维度)
    • [ ] 引入缓存层(Redis)
    • [ ] 使用消息队列异步处理
  4. 配置优化

    • [ ] 调整innodb_buffer_pool_size
    • [ ] 优化连接池配置
    • [ ] 设置合理的慢查询阈值
    • [ ] 启用慢查询日志
  5. 监控告警

    • [ ] 监控连接数使用率
    • [ ] 监控慢查询数量
    • [ ] 监控锁等待情况
    • [ ] 设置自动告警机制

8.2 性能优化原则

  1. 80/20法则:80%的性能问题来自20%的慢查询
  2. 分层优化:从应用层→缓存层→数据库层逐层优化
  3. 数据驱动:基于监控数据做优化决策
  4. 渐进式优化:每次只做一个改动,验证效果
  5. 预防为主:在设计阶段就考虑高并发场景

8.3 推荐工具

  1. 监控工具:Percona Monitoring and Management (PMM)、Prometheus + Grafana
  2. 慢查询分析:pt-query-digest、MySQL Workbench
  3. 压力测试:sysbench、JMeter
  4. 架构设计:ShardingSphere、Vitess

九、常见问题与解决方案

Q1:如何应对突发流量?

A:采用限流+降级+熔断策略

# 使用令牌桶限流
import time
from collections import deque

class TokenBucket:
    def __init__(self, capacity, refill_rate):
        self.capacity = capacity  # 桶容量
        self.tokens = capacity    # 当前令牌数
        self.refill_rate = refill_rate  # 每秒补充速率
        self.last_refill = time.time()
    
    def try_acquire(self):
        now = time.time()
        # 补充令牌
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now
        
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

# 使用示例
bucket = TokenBucket(capacity=100, refill_rate=10)  # 每秒10个请求

def handle_request(user_id):
    if bucket.try_acquire():
        # 处理请求
        return process_seckill(user_id)
    else:
        # 限流返回
        return {'success': False, 'message': '请求过于频繁,请稍后重试'}

Q2:如何保证数据一致性?

A:采用最终一致性方案

-- 1. 使用消息表记录操作
CREATE TABLE operation_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    operation_type VARCHAR(50),
    business_id VARCHAR(100),
    operation_data JSON,
    status ENUM('pending', 'processing', 'completed', 'failed'),
    created_at DATETIME,
    updated_at DATETIME,
    INDEX idx_status (status),
    INDEX idx_business (business_id)
);

-- 2. 定时任务补偿
DELIMITER $$
CREATE PROCEDURE compensate_operations()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE op_id BIGINT;
    DECLARE op_type VARCHAR(50);
    DECLARE business_id VARCHAR(100);
    DECLARE op_data JSON;
    
    DECLARE cur CURSOR FOR 
        SELECT id, operation_type, business_id, operation_data 
        FROM operation_log 
        WHERE status = 'failed' 
        AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
        ORDER BY created_at ASC
        LIMIT 100;
    
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
    
    OPEN cur;
    
    read_loop: LOOP
        FETCH cur INTO op_id, op_type, business_id, op_data;
        IF done THEN
            LEAVE read_loop;
        END IF;
        
        -- 重试逻辑
        BEGIN
            DECLARE EXIT HANDLER FOR SQLEXCEPTION
            BEGIN
                -- 记录失败
                UPDATE operation_log 
                SET status = 'failed', 
                    updated_at = NOW() 
                WHERE id = op_id;
            END;
            
            -- 执行补偿操作
            IF op_type = 'update_stock' THEN
                -- 补偿库存更新
                UPDATE products 
                SET stock = stock + JSON_EXTRACT(op_data, '$.quantity')
                WHERE id = JSON_EXTRACT(op_data, '$.product_id');
            END IF;
            
            -- 标记完成
            UPDATE operation_log 
            SET status = 'completed', 
                updated_at = NOW() 
            WHERE id = op_id;
        END;
    END LOOP;
    
    CLOSE cur;
END$$
DELIMITER ;

通过以上全面的优化策略和实战案例,您可以系统地提升MySQL在高并发场景下的性能表现。记住,性能优化是一个持续的过程,需要根据实际业务场景和监控数据不断调整优化策略。