引言:理解高并发环境下的MySQL挑战

在当今互联网应用中,高并发访问已成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着海量用户同时访问的巨大压力。高并发场景下,数据库性能瓶颈往往成为系统整体吞吐量的制约因素。本文将深入探讨MySQL在高并发环境下的处理策略,从架构设计到具体优化技巧,帮助您构建能够应对海量用户挑战的数据库系统。

高并发对MySQL的主要挑战包括:

  • 连接数激增:大量并发连接导致资源耗尽
  • 锁竞争:行锁、表锁导致的等待和死锁
  • I/O瓶颈:磁盘读写速度跟不上请求速度
  • CPU过载:复杂查询和排序消耗大量CPU资源
  • 内存不足:缓冲池无法容纳热点数据

一、MySQL高并发架构优化策略

1.1 读写分离架构设计

读写分离是应对高并发读操作的最有效策略之一。通过将读请求分发到从库,写请求发送到主库,可以显著分担主库压力。

架构示例

主库 (Master) ←→ 写操作
    ↓ (复制)
从库1 (Slave1) ←→ 读操作
从库2 (Slave2) ←→ 读操作
从库3 (Slave3) ←→ 读操作

实现方式

  • 应用层路由:在应用程序中根据SQL类型选择数据源
  • 中间件路由:使用ShardingSphere、MyCat等中间件自动路由

代码示例(Spring Boot + ShardingSphere)

# application.yml
spring:
  shardingsphere:
    datasource:
      names: master,slave0,slave1
      master:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://master-host:3306/db
        username: root
        password: password
      slave0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave0-host:3306/db
        username: root
        password: password
      slave1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://slave1-host:3306/db
        username: root
        password: password
    rules:
      replica-query:
        data-sources:
          ds0:
            primary-data-source-name: master
            replica-data-source-names: slave0,slave1
            load-balance-algorithm-type: ROUND_ROBIN

1.2 分库分表策略

当单表数据量超过千万级或并发量极高时,分库分表成为必要选择。

水平分表示例

-- 原始订单表
CREATE TABLE order_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    create_time DATETIME,
    INDEX idx_user_id (user_id)
);

-- 按user_id取模分表
CREATE TABLE order_1 LIKE order_0;
CREATE TABLE order_2 LIKE order_0;
CREATE TABLE order_3 LIKE order_0;

分库分表中间件配置(ShardingSphere)

spring:
  shardingsphere:
    rules:
      sharding:
        tables:
          order:
            actual-data-nodes: ds${0..1}.order_${0..3}
            table-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-inline
        sharding-algorithms:
          order-table-inline:
            type: MOD
            props:
              sharding-count: 4
          order-db-inline:
            type: MOD
            props:
              sharding-count: 2

1.3 连接池优化

连接池是应用与数据库之间的缓冲层,合理配置连接池对高并发至关重要。

HikariCP配置示例

HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/db");
config.setUsername("root");
config.setPassword("password");

// 核心配置参数
config.setMaximumPoolSize(50);          // 最大连接数
config.setMinimumIdle(10);              // 最小空闲连接
config.setConnectionTimeout(30000);     // 连接超时30秒
config.setIdleTimeout(600000);          // 空闲超时10分钟
config.setMaxLifetime(1800000);         // 连接最大存活时间30分钟
config.setLeakDetectionThreshold(60000); // 泄漏检测阈值60秒

// 高并发优化参数
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");

HikariDataSource dataSource = new HikariDataSource(config);

二、MySQL配置参数优化

2.1 InnoDB引擎核心参数优化

InnoDB是MySQL的默认存储引擎,其配置直接影响高并发性能。

关键参数配置

# my.cnf 或 my.ini

[mysqld]
# 连接相关
max_connections = 2000
max_user_connections = 1800
thread_cache_size = 100

# InnoDB缓冲池(最重要的参数)
innodb_buffer_pool_size = 16G  # 通常设置为物理内存的50-70%
innodb_buffer_pool_instances = 16  # 缓冲池实例数,减少竞争

# 日志文件
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2  # 高并发下可设为2,性能优先

# I/O相关
innodb_flush_method = O_DIRECT
innodb_io_capacity = 2000
innodb_io_capacity_max = 4000

# 锁和事务
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = OFF

# 并发控制
innodb_thread_concurrency = 0  # 0表示不限制
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 查询缓存(MySQL 8.0已移除,5.7及之前版本)
query_cache_type = 0
query_cache_size = 0

# 临时表
tmp_table_size = 256M
max_heap_table_size = 256M

# 排序缓冲区
sort_buffer_size = 4M
join_buffer_size = 4M

# 批量插入
bulk_insert_buffer_size = 256M

# 文件打开数
open_files_limit = 65535
table_open_cache = 2000
table_definition_cache = 1400

2.2 MySQL 8.0新特性优化

MySQL 8.0引入了多项高并发优化特性:

1. 降序索引

-- 创建降序索引,优化ORDER BY DESC
CREATE TABLE user_scores (
    user_id BIGINT,
    score INT,
    create_time DATETIME,
    INDEX idx_score_desc (score DESC)
);

-- 查询优化
SELECT * FROM user_scores ORDER BY score DESC LIMIT 100;

2. 直接DDL支持

-- MySQL 8.0支持在线DDL,无需锁表
ALTER TABLE user_scores ADD COLUMN level INT DEFAULT 1, ALGORITHM=INPLACE, LOCK=NONE;

3. 隐藏索引

-- 测试删除索引前可先隐藏
ALTER TABLE user_scores ALTER INDEX idx_score_desc INVISIBLE;
-- 确认无影响后再删除
ALTER TABLE user_scores DROP INDEX idx_score_desc;

三、SQL语句优化策略

3.1 索引优化技巧

1. 覆盖索引(Covering Index)

-- 原始查询(回表操作)
SELECT user_id, username, email FROM users WHERE username = 'john';

-- 优化:创建覆盖索引
CREATE INDEX idx_username_email ON users(username, email);

-- 查询改为只查索引列
SELECT user_id, username, email FROM users WHERE username = 'john';

2. 最左前缀原则

-- 复合索引
CREATE INDEX idx_a_b_c ON table_name(a, b, c);

-- 有效查询
SELECT * FROM table_name WHERE a = 1;
SELECT * FROM table_name WHERE a = 1 AND b = 2;
SELECT * FROM table_name WHERE a = 1 AND b = 2 AND c = 3;

-- 无效查询(无法使用索引)
SELECT * FROM table_name WHERE b = 2;
SELECT * FROM table_name WHERE c = 3;
SELECT * FROM table_name WHERE b = 2 AND c = 3;

3. 索引下推(ICP)

-- MySQL 5.6+ 自动启用索引下推
-- 查询条件:a=1 AND b LIKE 'test%'
CREATE INDEX idx_a_b ON table_name(a, b);

-- 索引下推会先在索引层过滤b,减少回表次数
SELECT * FROM table_name WHERE a = 1 AND b LIKE 'test%';

3.2 避免索引失效的常见场景

1. 隐式类型转换

-- 错误:phone字段是varchar,传入数字导致索引失效
SELECT * FROM users WHERE phone = 13800138000;

-- 正确
SELECT * FROM users WHERE phone = '13800138000';

2. 函数操作索引列

-- 错误:索引失效
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- 正确:使用范围查询
SELECT * FROM orders WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

3. LIKE查询以通配符开头

-- 错误:索引失效
SELECT * FROM users WHERE email LIKE '%@gmail.com';

-- 正确:如果必须模糊查询,考虑全文索引
ALTER TABLE users ADD FULLTEXT INDEX ft_email (email);
SELECT * FROM users WHERE MATCH(email) AGAINST('@gmail.com');

3.3 批量操作优化

批量插入优化

// 原始方式:逐条插入(性能差)
for (Order order : orders) {
    jdbcTemplate.update("INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?)",
        order.getId(), order.getUserId(), order.getAmount());
}

// 优化方式:批量插入
String sql = "INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
    @Override
    public void setValues(PreparedStatement ps, int i) throws SQLException {
        Order order = orders.get(i);
        ps.setLong(1, order.getId());
        ps.setLong(2, order.getUserId());
        ps.setBigDecimal(3, order.getAmount());
    }

    @Override
    public int getBatchSize() {
        return orders.size();
    }
});

// 进一步优化:使用rewriteBatchedStatements参数
// 在JDBC URL中添加:?rewriteBatchedStatements=true
// 然后使用以下方式:
String sql = "INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?)";
// 手动拼接values,每批1000条

批量更新优化

-- 原始方式:多条UPDATE
UPDATE order_0 SET status = 'PAID' WHERE id = 1;
UPDATE order_0 SET status = 'PAID' WHERE id = 2;
UPDATE order_0 SET status = 'PAID' WHERE id = 3;

-- 优化方式:CASE WHEN批量更新
UPDATE order_0 
SET status = CASE id
    WHEN 1 THEN 'PAID'
    WHEN 2 THEN 'PAID'
    WHEN 3 THEN 'PAID'
END
WHERE id IN (1, 2, 3);

3.4 避免大事务和长事务

大事务问题

-- 错误:大事务导致锁持有时间过长
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
-- ... 可能还有1000条更新
COMMIT;

-- 优化:拆分为小事务
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
COMMIT;

BEGIN;
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
COMMIT;
-- ... 循环处理

长事务监控

-- 查找执行时间超过60秒的事务
SELECT * FROM information_schema.processlist 
WHERE time > 60 AND command != 'Sleep';

-- 查找未提交的事务
SELECT * FROM information_schema.innodb_trx 
WHERE trx_started < NOW() - INTERVAL 60 SECOND;

四、缓存策略与数据预热

4.1 多级缓存架构

架构设计

用户请求 → CDN → Nginx缓存 → 应用缓存(Redis) → 数据库

Redis缓存示例

@Service
public class UserService {
    private static final String USER_CACHE_PREFIX = "user:";
    private static final long CACHE_TTL = 3600; // 1小时
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public User getUserById(Long id) {
        String key = USER_CACHE_PREFIX + id;
        
        // 1. 先从缓存获取
        User user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            return user;
        }
        
        // 2. 缓存未命中,查询数据库
        user = userRepository.findById(id).orElse(null);
        if (user != null) {
            // 3. 写入缓存
            redisTemplate.opsForValue().set(key, user, CACHE_TTL, TimeUnit.SECONDS);
        }
        
        return user;
    }
    
    public void updateUser(User user) {
        // 更新数据库
        userRepository.save(user);
        
        // 删除缓存(避免脏数据)
        String key = USER_CACHE_PREFIX + user.getId();
        redisTemplate.delete(key);
    }
}

缓存穿透防护

public User getUserById(Long id) {
    String key = USER_CACHE_PREFIX + id;
    
    // 1. 查询缓存
    User user = (User) redisTemplate.opsForValue().get(key);
    if (user != null) {
        return user;
    }
    
    // 2. 缓存空值防止穿透
    String nullKey = key + ":null";
    if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
        return null;
    }
    
    // 3. 查询数据库
    user = userRepository.findById(id).orElse(null);
    
    if (user != null) {
        redisTemplate.opsForValue().set(key, user, CACHE_TTL, TimeUnit.SECONDS);
    } else {
        // 缓存空值,TTL设为较短时间
        redisTemplate.opsForValue().set(nullKey, "", 60, TimeUnit.SECONDS);
    }
    
    return user;
}

4.2 缓存预热

预热脚本示例

#!/usr/bin/env python3
import redis
import mysql.connector
import json

def preload_hot_data():
    # 连接Redis
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    # 连接MySQL
    db = mysql.connector.connect(
        host="localhost",
        user="root",
        password="password",
        database="business"
    )
    cursor = db.cursor(dictionary=True)
    
    # 预热热点数据(如热门商品)
    cursor.execute("""
        SELECT product_id, name, price, stock 
        FROM products 
        WHERE is_hot = 1 AND stock > 0
    """)
    
    for product in cursor.fetchall():
        key = f"product:{product['product_id']}"
        redis_client.setex(
            key,
            3600,  # 1小时过期
            json.dumps(product)
        )
    
    cursor.close()
    db.close()

if __name__ == "__main__":
    preload_hot_data()

五、高并发下的锁优化

5.1 InnoDB行锁优化

1. 索引与行锁

-- 错误:没有索引会导致表锁
UPDATE orders SET status = 'PAID' WHERE user_id = 123; -- user_id无索引

-- 正确:确保WHERE条件列有索引
CREATE INDEX idx_user_id ON orders(user_id);
UPDATE orders SET status = 'PAID' WHERE user_id = 123;

2. 间隙锁(Gap Lock)优化

-- 范围查询可能产生间隙锁
SELECT * FROM orders WHERE id BETWEEN 1000 AND 2000 FOR UPDATE;

-- 优化:使用精确查询或调整隔离级别
-- 方案1:使用IN查询
SELECT * FROM orders WHERE id IN (1000, 1001, ..., 2000) FOR UPDATE;

-- 方案2:降低隔离级别(需评估业务风险)
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

5.2 乐观锁与悲观锁选择

乐观锁实现

-- 表结构增加版本号字段
CREATE TABLE product (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    stock INT,
    version INT DEFAULT 0
);

-- 更新时检查版本号
UPDATE product 
SET stock = stock - 1, version = version + 1 
WHERE id = 100 AND version = 2; -- 假设当前版本是2

-- 检查影响行数
-- 如果影响行数为0,说明版本号已变化,需要重试

Java实现乐观锁

@Transactional
public boolean deductStock(Long productId, int quantity) {
    int maxRetries = 3;
    int retryCount = 0;
    
    while (retryCount < maxRetries) {
        Product product = productRepository.findById(productId);
        int currentVersion = product.getVersion();
        
        int updated = productRepository.updateStock(productId, quantity, currentVersion);
        
        if (updated > 0) {
            return true;
        }
        
        retryCount++;
        try {
            Thread.sleep(50); // 短暂延迟后重试
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    
    return false;
}

悲观锁实现

-- 显式加锁
BEGIN;
SELECT stock FROM product WHERE id = 100 FOR UPDATE;
-- 业务逻辑处理
UPDATE product SET stock = stock - 1 WHERE id = 100;
COMMIT;

5.3 死锁检测与处理

死锁日志分析

-- 开启死锁日志
SET GLOBAL innodb_print_all_deadlocks = ON;

-- 查看最近死锁信息
SHOW ENGINE INNODB STATUS\G

死锁预防策略

// 1. 固定加锁顺序
public void transfer(Long fromId, Long toId, BigDecimal amount) {
    // 始终按ID顺序加锁
    Long firstId = Math.min(fromId, toId);
    Long secondId = Math.max(fromId, toId);
    
    jdbcTemplate.execute(connection -> {
        // 先锁第一个账户
        PreparedStatement ps1 = connection.prepareStatement(
            "SELECT balance FROM account WHERE id = ? FOR UPDATE");
        ps1.setLong(1, firstId);
        ps1.executeQuery();
        
        // 再锁第二个账户
        PreparedStatement ps2 = connection.prepareStatement(
            "SELECT balance FROM account WHERE id = ? FOR UPDATE");
        ps2.setLong(1, secondId);
        ps2.executeQuery();
        
        // 执行转账逻辑
        // ...
        return null;
    });
}

六、监控与诊断工具

6.1 MySQL性能监控指标

关键指标

  • QPS/TPS:每秒查询/事务数
  • 连接数:Threads_connected vs max_connections
  • 缓存命中率:InnoDB Buffer Pool命中率
  • 锁等待:Innodb_row_lock_waits, Innodb_row_lock_time_avg
  • 慢查询:Slow_queries

监控SQL示例

-- 查看实时性能指标
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read%';

-- 计算缓存命中率
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100
SELECT 
    (1 - (SELECT Variable_value FROM information_schema.GLOBAL_STATUS 
          WHERE Variable_name = 'Innodb_buffer_pool_reads') /
     (SELECT Variable_value FROM information_schema.GLOBAL_STATUS 
      WHERE Variable_name = 'Innodb_buffer_pool_read_requests')) * 100 
    AS buffer_pool_hit_rate;

6.2 慢查询日志分析

配置慢查询日志

# my.cnf
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  # 记录超过1秒的查询
log_queries_not_using_indexes = 1
min_examined_row_limit = 1000

使用pt-query-digest分析

# 安装Percona Toolkit
sudo apt-get install percona-toolkit

# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 分析最近1小时的慢查询
pt-query-digest --since='1h' /var/log/mysql/slow.log

6.3 Performance Schema使用

启用Performance Schema

-- 检查是否启用
SELECT * FROM performance_schema.setup_instruments 
WHERE NAME LIKE '%wait/io/table/sql/handler%';

-- 启用所有监控
UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES';

诊断SQL性能

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 AS avg_seconds,
    SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看表I/O情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    COUNT_READ,
    COUNT_WRITE,
    SUM_TIMER_WAIT/1000000000000 AS total_seconds
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

七、高并发场景实战案例

7.1 秒杀系统设计

数据库设计

-- 秒杀商品表
CREATE TABLE seckill_product (
    id BIGINT PRIMARY KEY,
    name VARCHAR(200),
    total_stock INT,
    start_time DATETIME,
    end_time DATETIME,
    version INT DEFAULT 0
);

-- 秒杀订单表
CREATE TABLE seckill_order (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    product_id BIGINT,
    quantity INT,
    create_time DATETIME,
    UNIQUE KEY uk_user_product (user_id, product_id)
);

-- 库存扣减表(独立出来减少锁竞争)
CREATE TABLE seckill_stock (
    product_id BIGINT PRIMARY KEY,
    stock INT,
    version INT
);

秒杀服务实现

@Service
public class SeckillService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    
    /**
     * 秒杀下单
     */
    @Transactional
    public boolean seckill(Long userId, Long productId, int quantity) {
        // 1. 预扣库存(Redis预减)
        String stockKey = STOCK_KEY + productId;
        Long stock = redisTemplate.opsForValue().decrement(stockKey, quantity);
        
        if (stock < 0) {
            // 库存不足,回滚
            redisTemplate.opsForValue().increment(stockKey, quantity);
            return false;
        }
        
        // 2. 生成订单(异步写入数据库)
        String orderKey = ORDER_KEY + userId + ":" + productId;
        if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
            // 重复下单
            redisTemplate.opsForValue().increment(stockKey, quantity);
            return false;
        }
        
        // 3. 发送MQ异步创建订单
        OrderMessage message = new OrderMessage(userId, productId, quantity);
        mqProducer.send("seckill-order", message);
        
        // 4. 标记已下单
        redisTemplate.opsForValue().set(orderKey, 1, 30, TimeUnit.MINUTES);
        
        return true;
    }
    
    /**
     * 异步创建数据库订单
     */
    @Transactional
    public void createOrder(OrderMessage message) {
        // 检查数据库库存(最终一致性)
        Integer stock = jdbcTemplate.queryForObject(
            "SELECT stock FROM seckill_stock WHERE product_id = ?",
            Integer.class, message.getProductId());
        
        if (stock == null || stock < message.getQuantity()) {
            // 库存不足,回滚Redis
            String stockKey = STOCK_KEY + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey, message.getQuantity());
            return;
        }
        
        // 扣减数据库库存
        int updated = jdbcTemplate.update(
            "UPDATE seckill_stock SET stock = stock - ?, version = version + 1 " +
            "WHERE product_id = ? AND stock >= ?",
            message.getQuantity(), message.getProductId(), message.getQuantity());
        
        if (updated == 0) {
            // 库存不足,回滚
            String stockKey = STOCK_KEY + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey, message.getQuantity());
            return;
        }
        
        // 创建订单
        jdbcTemplate.update(
            "INSERT INTO seckill_order (id, user_id, product_id, quantity, create_time) " +
            "VALUES (?, ?, ?, ?, NOW())",
            generateOrderId(), message.getUserId(), message.getProductId(), message.getQuantity());
    }
}

库存预热脚本

#!/usr/bin/env python3
import redis

def preload_stock():
    r = redis.Redis(host='localhost', port=6379, db=0)
    
    # 从MySQL加载秒杀商品库存到Redis
    # 这里模拟1000个商品,每个100库存
    for product_id in range(1, 1001):
        stock_key = f"seckill:stock:{product_id}"
        r.set(stock_key, 100)
        r.expire(stock_key, 3600)  # 1小时过期

if __name__ == "__main__":
    preload_stock()

7.2 社交媒体Feed流优化

数据库设计

-- 用户表
CREATE TABLE users (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    avatar VARCHAR(200)
);

-- 帖子表(分表)
CREATE TABLE posts_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    content TEXT,
    create_time DATETIME,
    INDEX idx_user_time (user_id, create_time DESC)
);
-- posts_1, posts_2, posts_3 ... 共16张表

-- 关注关系表
CREATE TABLE follows (
    user_id BIGINT,
    follow_id BIGINT,
    create_time DATETIME,
    PRIMARY KEY (user_id, follow_id),
    INDEX idx_follow_id (follow_id)
);

Feed流查询优化

@Service
public class FeedService {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 获取用户Feed流(关注的人的帖子)
     */
    public List<Post> getFeed(Long userId, int offset, int limit) {
        String cacheKey = "feed:" + userId + ":" + offset + ":" + limit;
        
        // 1. 尝试从缓存获取
        List<Post> cached = (List<Post>) redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            return cached;
        }
        
        // 2. 获取关注列表
        List<Long> followIds = getFollowIds(userId);
        if (followIds.isEmpty()) {
            return Collections.emptyList();
        }
        
        // 3. 分表查询(根据user_id取模)
        Map<Long, List<Long>> tableMap = new HashMap<>();
        for (Long followId : followIds) {
            int tableIndex = (int) (followId % 16);
            tableMap.computeIfAbsent((long) tableIndex, k -> new ArrayList<>()).add(followId);
        }
        
        // 4. 并行查询各分表
        List<Post> allPosts = tableMap.entrySet().parallelStream()
            .flatMap(entry -> {
                String tableName = "posts_" + entry.getKey();
                List<Long> userIds = entry.getValue();
                return getPostsFromTable(tableName, userIds, offset, limit).stream();
            })
            .sorted(Comparator.comparing(Post::getCreateTime).reversed())
            .limit(limit)
            .collect(Collectors.toList());
        
        // 5. 写入缓存
        redisTemplate.opsForValue().set(cacheKey, allPosts, 60, TimeUnit.SECONDS);
        
        return allPosts;
    }
    
    private List<Long> getFollowIds(Long userId) {
        return jdbcTemplate.queryForList(
            "SELECT follow_id FROM follows WHERE user_id = ?",
            Long.class, userId);
    }
    
    private List<Post> getPostsFromTable(String tableName, List<Long> userIds, int offset, int limit) {
        String sql = String.format(
            "SELECT p.*, u.username, u.avatar " +
            "FROM %s p JOIN users u ON p.user_id = u.id " +
            "WHERE p.user_id IN (%s) " +
            "ORDER BY p.create_time DESC " +
            "LIMIT ? OFFSET ?",
            tableName,
            userIds.stream().map(String::valueOf).collect(Collectors.joining(",")));
        
        return jdbcTemplate.query(sql, new Object[]{limit, offset}, (rs, rowNum) -> {
            Post post = new Post();
            post.setId(rs.getLong("id"));
            post.setUserId(rs.getLong("user_id"));
            post.setContent(rs.getString("content"));
            post.setCreateTime(rs.getTimestamp("create_time"));
            post.setUsername(rs.getString("username"));
            post.setAvatar(rs.getString("avatar"));
            return post;
        });
    }
}

八、总结与最佳实践

8.1 高并发优化检查清单

架构层面

  • [ ] 实现读写分离
  • [ ] 考虑分库分表
  • [ ] 使用连接池并合理配置
  • [ ] 引入多级缓存

配置层面

  • [ ] innodb_buffer_pool_size设置为内存的50-70%
  • [ ] max_connections足够但不过大
  • [ ] 慢查询日志已开启
  • [ ] 合理设置innodb_flush_log_at_trx_commit

SQL层面

  • [ ] 关键查询都有合适索引
  • [ ] 避免SELECT *
  • [ ] 避免大事务
  • [ ] 批量操作替代单条操作
  • [ ] 避免索引失效的写法

监控层面

  • [ ] 部署监控系统(Prometheus + Grafana)
  • [ ] 配置告警规则
  • [ ] 定期分析慢查询日志
  • [ ] 监控锁等待和死锁

8.2 持续优化建议

  1. 定期压测:使用sysbench或JMeter进行压力测试,发现性能瓶颈
  2. 灰度发布:SQL优化先在从库测试,确认无误后再应用到主库
  3. 数据归档:定期归档历史数据,保持单表数据量在合理范围(建议<1000万行)
  4. 参数调优:根据业务特点持续调整MySQL参数,记录每次变更的影响
  5. 知识沉淀:建立团队的SQL规范,定期分享优化案例

通过以上策略的综合应用,可以有效应对海量用户同时访问的挑战,构建高性能、高可用的MySQL数据库系统。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。