引言:理解高并发对MySQL的挑战

在现代互联网应用中,高并发场景已经成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰,都会给数据库带来巨大的压力。MySQL作为最流行的关系型数据库,在高并发环境下容易出现性能瓶颈、锁竞争、连接耗尽甚至崩溃宕机的问题。

高并发对MySQL的挑战主要体现在以下几个方面:

  • 连接数激增:大量并发请求会快速消耗数据库连接资源
  • CPU负载过高:频繁的查询解析、执行计划生成和结果返回会占用大量CPU资源
  • I/O瓶颈:高并发读写会导致磁盘I/O成为系统瓶颈
  • 锁竞争:行锁、表锁、元数据锁等在高并发写操作下会产生严重竞争
  • 内存压力:缓冲池、连接缓存、排序缓存等内存结构面临巨大压力

本文将从多个维度深入探讨MySQL高并发处理策略,帮助您构建稳定、高性能的数据库系统。

一、连接层优化策略

1.1 合理配置连接参数

MySQL的连接处理是高并发优化的第一道关卡。不当的连接配置会导致连接拒绝或资源耗尽。

关键参数配置:

-- 查看当前连接配置
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'wait_timeout';
SHOW VARIABLES LIKE 'interactive_timeout';

-- 推荐配置(根据服务器内存调整)
SET GLOBAL max_connections = 2000;  -- 最大连接数
SET GLOBAL wait_timeout = 300;      -- 非交互连接超时时间(秒)
SET GLOBAL interactive_timeout = 600; -- 交互连接超时时间(秒)
SET GLOBAL max_connect_errors = 100000; -- 最大连接错误数

配置说明:

  • max_connections:建议设置为CPU核心数的2-4倍,但不超过服务器内存承载能力(每个连接约占用10MB内存)
  • wait_timeout:设置过长会占用连接资源,过短会导致频繁重连,300秒是较为平衡的值
  • max_connect_errors:防止恶意连接攻击,适当调高避免正常请求被拒绝

1.2 连接池技术应用

连接池是解决高并发连接问题的核心技术,通过复用连接减少建立/关闭连接的开销。

Java JDBC连接池配置示例(HikariCP):

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MySQLConnectionPool {
    
    private static HikariDataSource dataSource;
    
    static {
        HikariConfig config = new HikariConfig();
        // 数据库连接信息
        config.setJdbcUrl("jdbc:mysql://localhost:3306/your_database?useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true");
        config.setUsername("username");
        config.setPassword("password");
        
        // 连接池核心配置
        config.setMaximumPoolSize(50);           // 最大连接数
        config.setMinimumIdle(10);               // 最小空闲连接数
        config.setConnectionTimeout(30000);      // 连接超时时间(毫秒)
        config.setIdleTimeout(600000);           // 空闲超时时间(毫秒)
        config.setMaxLifetime(1800000);          // 连接最大存活时间(毫秒)
        config.setLeakDetectionThreshold(60000); // 泄漏检测阈值(毫秒)
        
        // 性能优化配置
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useLocalSessionState", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");
        
        dataSource = new HikariDataSource(config);
    }
    
    // 获取数据源
    public static DataSource getDataSource() {
        return dataSource;
    }
    
    // 示例:使用连接池执行查询
    public void executeQuery() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?");
             ResultSet rs = stmt.executeQuery()) {
            
            stmt.setInt(1, 123);
            while (rs.next()) {
                System.out.println("User: " + rs.getString("username"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

连接池配置要点:

  • 最大连接数:根据业务峰值设置,通常为CPU核心数的2-4倍
  • 最小空闲连接:保持一定数量的备用连接,避免突发请求等待
  • 连接超时:防止连接获取等待过久
  • 预编译SQL缓存:显著减少SQL解析开销

1.3 读写分离架构

对于读多写少的场景,读写分离是提升并发能力的有效手段。

MySQL主从复制配置:

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 7

-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay_log = mysql-relay-bin
read_only = 1

-- 主库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

-- 从库启动复制
CHANGE MASTER TO
MASTER_HOST='master_ip',
MASTER_USER='repl',
MASTER_PASSWORD='password',
MASTER_LOG_FILE='mysql-bin.000001',
MASTER_LOG_POS=4;

START SLAVE;

Java读写分离实现:

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.stereotype.Component;

@Component
public class DynamicDataSource extends AbstractRoutingDataSource {
    
    private static final ThreadLocal<DataSourceType> currentDataSource = 
        ThreadLocal.withInitial(() -> DataSourceType.MASTER);
    
    @Override
    protected Object determineCurrentLookupKey() {
        return currentDataSource.get();
    }
    
    public static void setDataSource(DataSourceType type) {
        currentDataSource.set(type);
    }
    
    public static void clearDataSource() {
        currentDataSource.remove();
    }
    
    public enum DataSourceType {
        MASTER, SLAVE
    }
}

// 使用AOP实现读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Around("@annotation(readOnly)")
    public Object setReadOnlyDataSource(ProceedingJoinPoint pjp, ReadOnly readOnly) throws Throwable {
        try {
            DynamicDataSource.setDataSource(DynamicDataSource.DataSourceType.SLAVE);
            return pjp.proceed();
        } finally {
            DynamicDataSource.clearDataSource();
        }
    }
}

// 注解使用
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

@Service
public class UserService {
    
    @Autowired
    private UserMapper userMapper;
    
    // 写操作使用主库
    public void createUser(User user) {
        userMapper.insert(user);
    }
    
    // 读操作使用从库
    @ReadOnly
    public User getUserById(Long id) {
        return userMapper.selectById(id);
    }
}

二、SQL语句优化策略

2.1 索引优化与覆盖索引

索引是提升查询性能的最有效手段,但不当的索引会成为性能杀手。

索引优化原则:

  • 最左前缀原则:复合索引必须从左开始匹配
  • 区分度原则:选择性高的列放在索引前面
  • 覆盖索引:查询列全部包含在索引中,避免回表

示例:优化用户查询表

-- 创建用户表
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    age INT,
    status TINYINT DEFAULT 1,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username),
    INDEX idx_age_status (age, status),
    INDEX idx_email (email)
);

-- 问题SQL:全表扫描
EXPLAIN SELECT * FROM users WHERE age > 25 AND status = 1;

-- 优化方案1:创建合适索引
CREATE INDEX idx_age_status ON users(age, status);

-- 优化方案2:使用覆盖索引
CREATE INDEX idx_cover ON users(age, status, username, email);

-- 验证索引效果
EXPLAIN SELECT username, email FROM users WHERE age > 25 AND status = 1;
-- 输出应显示:type=range, key=idx_cover, Extra=Using index condition

-- 问题SQL:索引失效
EXPLAIN SELECT * FROM users WHERE username LIKE '%john%';

-- 优化方案:前缀索引或全文索引
CREATE INDEX idx_username_prefix ON users(username(10));
EXPLAIN SELECT * FROM users WHERE username LIKE 'john%';

-- 或者使用Elasticsearch等外部搜索引擎处理模糊查询

索引失效的常见场景:

-- 1. 隐式类型转换
SELECT * FROM users WHERE phone = 13800138000; -- phone是varchar,会失效
SELECT * FROM users WHERE phone = '13800138000'; -- 正确

-- 2. 函数操作
SELECT * FROM users WHERE DATE(created_at) = '2024-01-01'; -- 失效
SELECT * FROM users WHERE created_at >= '2024-01-01' AND created_at < '2024-01-02'; -- 正确

-- 3. OR条件(部分情况)
SELECT * FROM users WHERE age = 25 OR age = 30; -- 可能失效
SELECT * FROM users WHERE age IN (25, 30); -- 更好

-- 4. 负向查询
SELECT * FROM users WHERE status != 0; -- 可能失效
SELECT * FROM users WHERE status = 1; -- 更好

2.2 批量操作与分页优化

高并发场景下,批量操作和分页查询是常见需求,需要特殊优化。

批量插入优化:

-- 问题:逐条插入(慢)
INSERT INTO orders (user_id, product_id, amount) VALUES (1, 100, 99.9);
INSERT INTO orders (user_id, product_id, amount) VALUES (2, 101, 199.9);
INSERT INTO orders (user_id, product_id, amount) VALUES (3, 102, 299.9);

-- 优化:批量插入(快10倍以上)
INSERT INTO orders (user_id, product_id, amount) VALUES 
(1, 100, 99.9),
(2, 101, 199.9),
(3, 102, 299.9),
(4, 103, 399.9),
(5, 104, 499.9);

-- Java批量操作示例
public void batchInsertOrders(List<Order> orders) {
    String sql = "INSERT INTO orders (user_id, product_id, amount) VALUES (?, ?, ?)";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement stmt = conn.prepareStatement(sql)) {
        
        conn.setAutoCommit(false); // 关闭自动提交
        
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            stmt.setLong(1, order.getUserId());
            stmt.setLong(2, order.getProductId());
            stmt.setBigDecimal(3, order.getAmount());
            stmt.addBatch();
            
            // 每1000条提交一次
            if (i % 1000 == 0 || i == orders.size() - 1) {
                stmt.executeBatch();
                conn.commit();
            }
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
}

深分页问题优化:

-- 问题:深分页查询慢(扫描大量数据)
SELECT * FROM orders WHERE status = 1 ORDER BY id DESC LIMIT 1000000, 20;

-- 优化方案1:延迟关联(覆盖索引)
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders WHERE status = 1 ORDER BY id DESC LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

-- 优化方案2:书签记录法(业务层优化)
-- 第一次查询
SELECT * FROM orders WHERE status = 1 AND id < ? ORDER BY id DESC LIMIT 20;
-- 后续查询使用上次结果的最小id作为条件

-- 优化方案3:使用Elasticsearch处理深分页

2.3 事务与锁优化

高并发下事务和锁是性能瓶颈的关键点。

事务优化原则:

  • 短事务:事务尽可能短,减少锁持有时间
  • 小事务:单个事务操作数据量不宜过大
  • 避免长事务:监控长事务并优化

锁优化示例:

-- 问题:长事务导致锁竞争
START TRANSACTION;
UPDATE products SET stock = stock - 1 WHERE id = 100;
-- 复杂业务逻辑,耗时10秒
UPDATE orders SET status = 'paid' WHERE user_id = 1;
COMMIT; -- 长时间持有锁

-- 优化:拆分事务,减少锁时间
-- 方案1:先执行非关键操作
-- 执行复杂业务逻辑(不持有锁)
START TRANSACTION;
UPDATE products SET stock = stock - 1 WHERE id = 100;
COMMIT; -- 立即提交,释放锁

-- 方案2:使用乐观锁
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;

-- 更新时检查版本
UPDATE products 
SET stock = stock - 1, version = version + 1 
WHERE id = 100 AND version = ?;

-- Java实现乐观锁
public boolean decreaseStock(Long productId, Integer quantity) {
    String sql = "UPDATE products SET stock = stock - ?, version = version + 1 " +
                 "WHERE id = ? AND version = ?";
    
    try (Connection conn = dataSource.getConnection();
         PreparedStatement stmt = conn.prepareStatement(sql)) {
        
        stmt.setInt(1, quantity);
        stmt.setLong(2, productId);
        stmt.setInt(3, currentVersion);
        
        int affected = stmt.executeUpdate();
        return affected > 0; // 成功更新返回true
    } catch (SQLException e) {
        return false;
    }
}

-- 方案3:使用SELECT FOR UPDATE(悲观锁)
START TRANSACTION;
-- 先查询锁定
SELECT stock FROM products WHERE id = 100 FOR UPDATE;
-- 再执行更新
UPDATE products SET stock = stock - 1 WHERE id = 100;
COMMIT;

三、数据库架构优化

3.1 分库分表策略

当单表数据量超过千万级或并发量极高时,需要考虑分库分表。

分表策略示例:

-- 按用户ID取模分表(水平分表)
-- 订单表分16张表
CREATE TABLE orders_0 (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    amount DECIMAL(10,2),
    created_at TIMESTAMP
);
-- orders_1, orders_2, ... orders_15

-- 分表路由函数
public class TableSharding {
    private static final int TABLE_COUNT = 16;
    
    public static String getTableName(Long userId) {
        int index = (int) (userId % TABLE_COUNT);
        return "orders_" + index;
    }
    
    public static void main(String[] args) {
        // 示例:用户ID为12345的订单表
        String tableName = getTableName(12345L);
        System.out.println("表名: " + tableName); // orders_9
    }
}

-- 分表查询示例
public List<Order> getOrdersByUserId(Long userId) {
    String tableName = TableSharding.getTableName(userId);
    String sql = "SELECT * FROM " + tableName + " WHERE user_id = ?";
    
    // 执行查询...
    return orders;
}

分库策略:

-- 按业务垂直分库
-- 用户库(user_db)
CREATE TABLE users (id BIGINT PRIMARY KEY, username VARCHAR(50));
CREATE TABLE user_profiles (user_id BIGINT, avatar VARCHAR(255));

-- 订单库(order_db)
CREATE TABLE orders (id BIGINT PRIMARY KEY, user_id BIGINT, amount DECIMAL(10,2));
CREATE TABLE order_items (order_id BIGINT, product_id BIGINT, quantity INT);

-- 支付库(payment_db)
CREATE TABLE payments (id BIGINT PRIMARY KEY, order_id BIGINT, amount DECIMAL(10,2));

3.2 缓存策略

缓存是提升高并发性能的最有效手段之一。

Redis缓存示例:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

@Component
public class CacheService {
    
    private JedisPool jedisPool;
    
    @PostConstruct
    public void init() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(100);          // 最大连接数
        config.setMaxIdle(20);            // 最大空闲连接
        config.setMinIdle(5);             // 最小空闲连接
        config.setTestOnBorrow(true);     // 获取连接时测试
        
        jedisPool = new JedisPool(config, "localhost", 6379, 2000, "password");
    }
    
    // 缓存用户信息
    public User getUserWithCache(Long userId) {
        String cacheKey = "user:" + userId;
        
        try (Jedis jedis = jedisPool.getResource()) {
            // 1. 先从缓存获取
            String cached = jedis.get(cacheKey);
            if (cached != null) {
                return JSON.parseObject(cached, User.class);
            }
            
            // 2. 缓存未命中,查询数据库
            User user = userMapper.selectById(userId);
            if (user != null) {
                // 3. 写入缓存,设置过期时间
                jedis.setex(cacheKey, 3600, JSON.toJSONString(user));
            }
            return user;
        }
    }
    
    // 缓存穿透保护(布隆过滤器)
    public User getUserWithBloomFilter(Long userId) {
        String cacheKey = "user:" + userId;
        String bloomKey = "bloom:user";
        
        try (Jedis jedis = jedisPool.getResource()) {
            // 检查布隆过滤器
            if (!jedis.sismember(bloomKey, String.valueOf(userId))) {
                // 不存在于过滤器,直接返回null
                return null;
            }
            
            // 正常缓存逻辑
            return getUserWithCache(userId);
        }
    }
    
    // 缓存击穿保护(互斥锁)
    public User getUserWithMutex(Long userId) {
        String cacheKey = "user:" + userId;
        String lockKey = "lock:" + userId;
        
        try (Jedis jedis = jedisPool.getResource()) {
            // 1. 尝试获取缓存
            String cached = jedis.get(cacheKey);
            if (cached != null) {
                return JSON.parseObject(cached, User.class);
            }
            
            // 2. 获取分布式锁
            String lockValue = String.valueOf(System.currentTimeMillis());
            Boolean locked = jedis.set(lockKey, lockValue, "NX", "EX", 10);
            
            if (locked) {
                try {
                    // 3. 再次检查缓存(防止重复加载)
                    cached = jedis.get(cacheKey);
                    if (cached != null) {
                        return JSON.parseObject(cached, User.class);
                    }
                    
                    // 4. 查询数据库
                    User user = userMapper.selectById(userId);
                    if (user != null) {
                        jedis.setex(cacheKey, 3600, JSON.toJSONString(user));
                    }
                    return user;
                } finally {
                    // 5. 释放锁(使用Lua脚本保证原子性)
                    String lua = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                    jedis.eval(lua, 1, lockKey, lockValue);
                }
            } else {
                // 6. 未获取锁,短暂等待后重试
                Thread.sleep(50);
                return getUserWithMutex(userId);
            }
        } catch (Exception e) {
            return userMapper.selectById(userId);
        }
    }
}

3.3 消息队列削峰填谷

对于瞬时高并发写入,使用消息队列进行削峰。

RabbitMQ削峰示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

@Component
public class OrderMessageQueue {
    
    private final String EXCHANGE_NAME = "order_exchange";
    private final String QUEUE_NAME = "order_queue";
    private final String ROUTING_KEY = "order.create";
    
    // 生产者:接收高并发请求
    public void createOrderRequest(OrderRequest request) {
        // 立即返回订单创建中
        String orderId = generateOrderId();
        
        // 发送消息到队列
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            
            // 消息属性
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) // 持久化
                .priority(5)
                .build();
            
            // 发送消息
            String message = JSON.toJSONString(request);
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props, message.getBytes());
            
            // 返回订单ID,用户可查询状态
            return orderId;
        } catch (Exception e) {
            throw new RuntimeException("订单创建失败", e);
        }
    }
    
    // 消费者:异步处理订单
    @PostConstruct
    public void startConsumer() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            
            channel.basicQos(10); // 每次只预取10条消息,防止内存溢出
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    OrderRequest request = JSON.parseObject(message, OrderRequest.class);
                    
                    // 实际创建订单(在可控的并发下执行)
                    processOrder(request);
                    
                    // 确认消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    // 拒绝消息并重新入队
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void processOrder(OrderRequest request) {
        // 实际的订单处理逻辑
        // 在这里可以控制处理速度,避免数据库压力
    }
}

四、服务器配置优化

4.1 InnoDB缓冲池优化

InnoDB缓冲池是MySQL性能的核心,缓存数据和索引在内存中。

-- 查看当前配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_buffer_pool_instances';

-- 推荐配置(my.cnf)
[mysqld]
# 缓冲池大小:设置为物理内存的50%-70%
innodb_buffer_pool_size = 8G

# 缓冲池实例数:1GB对应1个实例,最多64个
innodb_buffer_pool_instances = 8

# 缓冲池预热:启动时加载热数据
innodb_buffer_pool_load_at_startup = ON
innodb_buffer_pool_dump_at_shutdown = ON

# 刷新策略
innodb_flush_log_at_trx_commit = 1      # ACID保证(最安全)
innodb_flush_log_at_trx_commit = 2      # 性能优化(折中)
innodb_flush_log_at_trx_commit = 0      # 最高性能(有数据丢失风险)

# 日志文件大小
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M

# IO线程
innodb_read_io_threads = 8
innodb_write_io_threads = 8

# 其他优化
innodb_lru_scan_depth = 1024
innodb_page_cleaners = 4

4.2 查询缓存与线程池

-- 查询缓存(MySQL 8.0已移除,5.7及之前可配置)
[mysqld]
query_cache_type = 1
query_cache_size = 128M
query_cache_limit = 2M

-- 线程池(企业版或Percona版)
[mysqld]
thread_pool_size = 16
thread_pool_algorithm = 2
thread_pool_high_prio_mode = transactions
thread_pool_high_prio_burst_limit = 32

-- 连接数优化
max_connections = 2000
thread_cache_size = 100
table_open_cache = 2000
table_definition_cache = 1000

4.3 慢查询日志监控

-- 开启慢查询日志
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1  -- 记录超过1秒的查询
log_queries_not_using_indexes = 1
log_slow_admin_statements = 1
log_slow_slave_statements = 1

-- 分析慢查询日志
-- 使用mysqldumpslow
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

-- 使用pt-query-digest(Percona Toolkit)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

-- 使用Performance Schema监控
SELECT * FROM performance_schema.events_statements_summary_by_digest 
WHERE avg_timer_wait > 1000000000 
ORDER BY avg_timer_wait DESC LIMIT 10;

五、监控与预警体系

5.1 关键指标监控

MySQL性能监控脚本:

#!/bin/bash
# MySQL监控脚本

MYSQL_USER="monitor"
MYSQL_PASS="password"
MYSQL_HOST="localhost"

# 获取MySQL状态
mysql_status() {
    mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW GLOBAL STATUS LIKE '$1';" | tail -1 | awk '{print $2}'
}

# 获取MySQL变量
mysql_var() {
    mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW VARIABLES LIKE '$1';" | tail -1 | awk '{print $2}'
}

# 关键指标
QPS=$(mysql_status "Queries")
TPS=$(mysql_status "Com_commit")
Connections=$(mysql_status "Threads_connected")
SlowQueries=$(mysql_status "Slow_queries")
InnodbBufferPoolHitRate=$(mysql_status "Innodb_buffer_pool_read_hit_rate")

# 计算QPS(需要间隔1秒)
sleep 1
QPS2=$(mysql_status "Queries")
QPS_RATE=$(( (QPS2 - QPS) ))

echo "=== MySQL监控指标 $(date) ==="
echo "QPS: $QPS_RATE"
echo "TPS: $TPS"
echo "连接数: $Connections / $(mysql_var "max_connections")"
echo "慢查询: $SlowQueries"
echo "Buffer Pool命中率: $InnodbBufferPoolHitRate%"

# 告警阈值
MAX_CONNECTIONS=$(mysql_var "max_connections")
if [ $Connections -gt $((MAX_CONNECTIONS * 80 / 100)) ]; then
    echo "警告:连接数超过80%!"
fi

if [ $InnodbBufferPoolHitRate -lt 95 ]; then
    echo "警告:Buffer Pool命中率低于95%!"
fi

5.2 性能模式监控

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR,
    AVG_TIMER_WAIT/1000000000000 as avg_time_sec,
    SUM_ROWS_EXAMINED,
    SUM_ROWS_SENT
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看表IO情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    COUNT_READ,
    COUNT_WRITE,
    SUM_NUMBER_OF_BYTES_READ,
    SUM_NUMBER_OF_BYTES_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_NUMBER_OF_BYTES_READ + SUM_NUMBER_OF_BYTES_WRITE DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_FETCH DESC
LIMIT 10;

六、高并发场景实战案例

6.1 秒杀系统设计

秒杀场景的核心问题:

  1. 瞬时高并发读
  2. 瞬时高并发写
  3. 库存超卖问题

完整解决方案:

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    private static final String STOCK_KEY = "seckill:stock:";
    private static final String ORDER_KEY = "seckill:order:";
    private static final String USER_KEY = "seckill:user:";
    
    /**
     * 秒杀流程
     */
    public SeckillResult seckill(Long seckillId, Long userId) {
        String lockKey = "seckill:lock:" + seckillId;
        
        try {
            // 1. 预减库存(Redis原子操作)
            Long stock = redisTemplate.opsForValue().decrement(STOCK_KEY + seckillId);
            if (stock < 0) {
                // 库存不足,恢复库存
                redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
                return SeckillResult.fail("库存不足");
            }
            
            // 2. 判断是否已秒杀成功
            Boolean hasOrdered = redisTemplate.hasKey(ORDER_KEY + seckillId + ":" + userId);
            if (hasOrdered) {
                return SeckillResult.fail("您已秒杀成功,请勿重复下单");
            }
            
            // 3. 发送消息到队列(异步创建订单)
            sendSeckillMessage(seckillId, userId);
            
            // 4. 标记已下单
            redisTemplate.opsForValue().set(ORDER_KEY + seckillId + ":" + userId, "1", 1, TimeUnit.HOURS);
            
            return SeckillResult.success("秒杀成功,订单处理中");
            
        } catch (Exception e) {
            // 异常时恢复库存
            redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
            return SeckillResult.error("系统异常");
        }
    }
    
    /**
     * 异步处理订单(消费者)
     */
    @RabbitListener(queues = "seckill_queue")
    public void processSeckillMessage(SeckillMessage message) {
        Long seckillId = message.getSeckillId();
        Long userId = message.getUserId();
        
        try {
            // 1. 数据库扣减库存(带乐观锁)
            String sql = "UPDATE seckill_goods SET stock = stock - 1 " +
                        "WHERE id = ? AND stock > 0";
            int affected = jdbcTemplate.update(sql, seckillId);
            
            if (affected == 0) {
                // 数据库库存不足,回滚Redis
                redisTemplate.opsForValue().increment(STOCK_KEY + seckillId);
                return;
            }
            
            // 2. 创建订单
            createOrder(seckillId, userId);
            
        } catch (Exception e) {
            // 记录失败日志,人工补偿
            log.error("秒杀订单处理失败: seckillId={}, userId={}", seckillId, userId, e);
        }
    }
    
    /**
     * 初始化库存到Redis
     */
    @PostConstruct
    public void initStock() {
        // 从数据库加载库存到Redis
        List<SeckillGoods> goodsList = seckillGoodsMapper.selectAll();
        for (SeckillGoods goods : goodsList) {
            redisTemplate.opsForValue().set(STOCK_KEY + goods.getId(), 
                                          String.valueOf(goods.getStock()), 
                                          2, TimeUnit.HOURS);
        }
    }
}

数据库表设计:

-- 秒杀商品表
CREATE TABLE seckill_goods (
    id BIGINT PRIMARY KEY,
    name VARCHAR(100),
    original_price DECIMAL(10,2),
    seckill_price DECIMAL(10,2),
    stock INT,
    start_time DATETIME,
    end_time DATETIME,
    version INT DEFAULT 0,  -- 乐观锁版本
    INDEX idx_time (start_time, end_time)
);

-- 秒杀订单表
CREATE TABLE seckill_orders (
    id BIGINT PRIMARY KEY,
    seckill_id BIGINT,
    user_id BIGINT,
    amount DECIMAL(10,2),
    status TINYINT,
    created_at TIMESTAMP,
    INDEX idx_user (user_id),
    INDEX idx_seckill (seckill_id)
);

-- 防止重复索引
CREATE UNIQUE INDEX uk_user_seckill ON seckill_orders(user_id, seckill_id);

6.2 社交媒体热点事件

热点事件数据库优化:

-- 1. 热点数据分离
CREATE TABLE posts_hot (
    id BIGINT PRIMARY KEY,
    content TEXT,
    like_count INT,
    comment_count INT,
    created_at TIMESTAMP,
    INDEX idx_hot (like_count DESC, created_at DESC)
) ENGINE=InnoDB;

CREATE TABLE posts_normal (
    id BIGINT PRIMARY KEY,
    content TEXT,
    like_count INT,
    comment_count INT,
    created_at TIMESTAMP
) ENGINE=InnoDB;

-- 2. 读写分离+缓存
-- 热点数据写入时同时写入Redis
-- 读取时优先读Redis,其次读posts_hot,最后读posts_normal

-- 3. 分区表(按时间)
CREATE TABLE posts (
    id BIGINT,
    content TEXT,
    created_at TIMESTAMP
) PARTITION BY RANGE (YEAR(created_at)) (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

七、总结与最佳实践

7.1 高并发优化检查清单

连接层:

  • [ ] 配置合适的max_connections(不超过内存限制)
  • [ ] 使用连接池(HikariCP/Druid)
  • [ ] 实现读写分离架构

SQL层:

  • [ ] 关键查询都有合适索引
  • [ ] 避免SELECT *,使用覆盖索引
  • [ ] 批量操作代替单条操作
  • [ ] 优化深分页查询

架构层:

  • [ ] 实施缓存策略(Redis)
  • [ ] 考虑分库分表
  • [ ] 使用消息队列削峰

配置层:

  • [ ] 优化InnoDB缓冲池
  • [ ] 配置慢查询监控
  • [ ] 合理设置事务隔离级别

7.2 性能优化原则

  1. 先监控后优化:没有数据支撑的优化是盲目的
  2. 二八法则:80%的性能问题由20%的SQL导致
  3. 空间换时间:缓存、索引都是用空间换时间
  4. 异步化:能异步处理的不要同步阻塞
  5. 分层防御:多层缓存、多级降级

7.3 持续优化建议

  • 定期审查:每月审查慢查询日志
  • 压力测试:定期进行全链路压测
  • 容量规划:根据业务增长预测资源需求
  • 故障演练:定期进行故障注入测试
  • 知识沉淀:建立性能优化知识库

通过以上策略的综合应用,可以有效提升MySQL在高并发场景下的性能和稳定性,避免数据库崩溃,确保系统平稳运行。记住,数据库优化是一个持续的过程,需要根据业务发展和技术演进不断调整和优化。