引言

在当今互联网应用中,高并发场景已成为常态。无论是电商平台的秒杀活动、社交网络的热点事件,还是金融系统的交易高峰,MySQL作为最流行的关系型数据库之一,都面临着巨大的性能挑战。当并发请求量激增时,数据库响应时间变长、CPU和I/O资源紧张、甚至出现连接数耗尽等问题,严重影响用户体验和系统稳定性。

本文将从实战角度出发,系统性地探讨MySQL高并发处理的完整策略。我们将从最基础的索引优化开始,逐步深入到查询优化、架构升级等高级话题,并结合具体案例和代码示例,帮助您构建一个能够应对高并发挑战的MySQL系统。

一、索引优化:高并发的基石

1.1 索引的基本原理与类型

索引是MySQL中提高查询性能最有效的手段之一。它类似于书籍的目录,能够帮助数据库快速定位到所需的数据行,避免全表扫描。

MySQL支持多种索引类型:

  • B+树索引:最常用的索引类型,适用于等值查询和范围查询
  • 哈希索引:仅适用于精确匹配查询,不支持范围查询
  • 全文索引:用于文本内容的搜索
  • 空间索引:用于地理空间数据

1.2 索引优化实战

1.2.1 选择合适的列创建索引

原则:选择WHERE子句中频繁使用的列、JOIN条件中的列、ORDER BY和GROUP BY中的列。

案例:电商系统的订单查询

-- 原始查询(无索引)
SELECT * FROM orders 
WHERE user_id = 12345 AND status = 'paid' 
ORDER BY create_time DESC 
LIMIT 20;

-- 优化方案1:创建复合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time);

-- 优化方案2:如果查询经常变化,可以创建覆盖索引
CREATE INDEX idx_cover_user_status_time ON orders(user_id, status, create_time, order_id, amount);

1.2.2 避免索引失效的常见场景

  1. 索引列参与运算 “`sql – 错误示例:索引失效 SELECT * FROM users WHERE YEAR(create_time) = 2023;

– 正确示例:使用范围查询 SELECT * FROM users WHERE create_time >= ‘2023-01-01’ AND create_time < ‘2024-01-01’;


2. **前缀模糊查询**
   ```sql
   -- 错误示例:索引失效
   SELECT * FROM products WHERE name LIKE '%手机';
   
   -- 正确示例:使用前缀索引
   SELECT * FROM products WHERE name LIKE '手机%';
  1. OR条件导致索引失效 “`sql – 错误示例:索引可能失效 SELECT * FROM orders WHERE user_id = 123 OR amount > 1000;

– 正确示例:使用UNION ALL SELECT * FROM orders WHERE user_id = 123 UNION ALL SELECT * FROM orders WHERE amount > 1000 AND user_id != 123;


### 1.3 索引维护与监控

定期检查索引的使用情况,删除冗余索引:

```sql
-- 查看索引使用情况
SELECT 
    table_name,
    index_name,
    stat_value,
    stat_description
FROM mysql.innodb_index_stats 
WHERE database_name = 'your_database'
AND stat_name = 'size';

-- 查看未使用的索引(需要开启performance_schema)
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_READ,
    COUNT_WRITE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
AND INDEX_NAME IS NOT NULL
ORDER BY COUNT_READ + COUNT_WRITE DESC;

二、查询优化:让SQL飞起来

2.1 避免全表扫描

全表扫描是高并发场景下的性能杀手。通过EXPLAIN命令可以分析查询计划:

EXPLAIN SELECT * FROM orders WHERE user_id = 12345;

输出示例

+----+-------------+--------+------+---------------+------+---------+------+------+-------------+
| id | select_type | table  | type | possible_keys | key  | key_len | ref  | rows | Extra       |
+----+-------------+--------+------+---------------+------+---------+------+------+-------------+
|  1 | SIMPLE      | orders | ref  | idx_user_id   | idx_user_id | 5       | const| 100  | Using where |
+----+-------------+--------+------+---------------+------+---------+------+------+-------------+

关键指标

  • type:访问类型,ALL表示全表扫描,应避免
  • key:实际使用的索引
  • rows:预估扫描行数
  • Extra:额外信息,如Using filesort(文件排序)需要优化

2.2 分页查询优化

高并发场景下,深度分页(如LIMIT 100000, 20)会导致严重性能问题:

-- 问题SQL:深度分页性能差
SELECT * FROM orders ORDER BY create_time DESC LIMIT 100000, 20;

-- 优化方案1:使用覆盖索引+延迟关联
SELECT o.* FROM orders o
INNER JOIN (
    SELECT order_id FROM orders 
    ORDER BY create_time DESC 
    LIMIT 100000, 20
) t ON o.order_id = t.order_id;

-- 优化方案2:使用游标分页(推荐)
-- 假设上一页最后一条记录的create_time是'2023-10-01 12:00:00'
SELECT * FROM orders 
WHERE create_time < '2023-10-01 12:00:00'
ORDER BY create_time DESC 
LIMIT 20;

2.3 JOIN优化

在高并发场景下,JOIN操作容易成为瓶颈:

-- 问题SQL:多表JOIN性能差
SELECT o.*, u.username, p.product_name
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN products p ON o.product_id = p.id
WHERE o.status = 'paid';

-- 优化方案1:减少JOIN表数量,使用冗余字段
-- 在orders表中冗余username和product_name字段

-- 优化方案2:使用EXISTS替代JOIN
SELECT o.* 
FROM orders o
WHERE o.status = 'paid'
AND EXISTS (SELECT 1 FROM users u WHERE u.id = o.user_id)
AND EXISTS (SELECT 1 FROM products p WHERE p.id = o.product_id);

-- 优化方案3:使用临时表
CREATE TEMPORARY TABLE temp_orders AS
SELECT * FROM orders WHERE status = 'paid';

SELECT t.*, u.username, p.product_name
FROM temp_orders t
JOIN users u ON t.user_id = u.id
JOIN products p ON t.product_id = p.id;

三、事务与锁优化

3.1 事务隔离级别选择

MySQL支持四种事务隔离级别:

  • READ UNCOMMITTED:读未提交,可能出现脏读
  • READ COMMITTED:读已提交,MySQL默认级别
  • REPEATABLE READ:可重复读,InnoDB默认级别
  • SERIALIZABLE:串行化,性能最差

高并发场景建议

  • 一般业务:READ COMMITTED(减少锁竞争)
  • 金融业务:REPEATABLE READ(保证数据一致性)
  • 避免使用SERIALIZABLE
-- 设置事务隔离级别
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 查看当前隔离级别
SELECT @@transaction_isolation;

3.2 锁优化策略

3.2.1 减少锁持有时间

-- 问题示例:长时间持有锁
START TRANSACTION;
-- 复杂业务逻辑
UPDATE orders SET status = 'processed' WHERE id = 123;
-- 其他耗时操作
COMMIT;

-- 优化方案:拆分事务
-- 步骤1:快速更新
UPDATE orders SET status = 'processed' WHERE id = 123;
-- 步骤2:异步处理其他业务
-- 使用消息队列或定时任务

3.2.2 避免死锁

-- 死锁示例:两个事务交叉更新
-- 事务A
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

-- 事务B(同时执行)
START TRANSACTION;
UPDATE accounts SET balance = balance - 100 WHERE id = 2;
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
COMMIT;

-- 优化方案:固定更新顺序
-- 事务A和B都按id升序更新
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;

3.3 悲观锁与乐观锁

悲观锁:使用SELECT … FOR UPDATE

-- 悲观锁示例:秒杀场景
START TRANSACTION;
-- 锁定记录
SELECT * FROM products WHERE id = 1 FOR UPDATE;
-- 检查库存
IF stock > 0 THEN
    UPDATE products SET stock = stock - 1 WHERE id = 1;
    INSERT INTO orders (...) VALUES (...);
END IF;
COMMIT;

乐观锁:使用版本号

-- 乐观锁示例
-- 表结构增加version字段
CREATE TABLE products (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    stock INT,
    version INT DEFAULT 0
);

-- 更新操作
UPDATE products 
SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 0;

-- 检查影响行数
-- 如果影响行数为0,说明数据已被修改,需要重试

四、架构升级:从单机到分布式

4.1 读写分离

读写分离是提升MySQL并发能力的基础架构优化。

架构图

应用层
  ↓
负载均衡器
  ↓
主库(写) ← 同步 → 从库(读)

实现方案

  1. 使用中间件:如ProxySQL、MyCat
  2. 应用层实现:使用Spring的AbstractRoutingDataSource

代码示例(Spring Boot + MyBatis):

// 数据源配置
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        
        DynamicDataSource routingDataSource = new DynamicDataSource();
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

// 动态数据源
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

// 数据源上下文
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return contextHolder.get();
    }
    
    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

// 使用示例
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    // 写操作使用主库
    @Transactional
    public void createOrder(Order order) {
        DataSourceContextHolder.setDataSourceType("master");
        orderMapper.insert(order);
        DataSourceContextHolder.clearDataSourceType();
    }
    
    // 读操作使用从库
    public Order getOrder(Long id) {
        DataSourceContextHolder.setDataSourceType("slave");
        Order order = orderMapper.selectById(id);
        DataSourceContextHolder.clearDataSourceType();
        return order;
    }
}

4.2 分库分表

当单表数据量超过千万级时,需要考虑分库分表。

4.2.1 垂直分表

将大表拆分为多个小表,按业务维度拆分:

-- 原始大表
CREATE TABLE user_info (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    password VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20),
    address VARCHAR(200),
    -- 其他字段...
    create_time DATETIME
);

-- 垂直拆分后
CREATE TABLE user_base (
    id BIGINT PRIMARY KEY,
    username VARCHAR(50),
    password VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20)
);

CREATE TABLE user_profile (
    id BIGINT PRIMARY KEY,
    address VARCHAR(200),
    -- 其他扩展字段...
    create_time DATETIME
);

4.2.2 水平分表(分片)

按某个维度(如用户ID、时间)将数据分布到多个表中。

分片策略

  • 范围分片:按ID范围分片
  • 哈希分片:按用户ID哈希取模
  • 一致性哈希:解决扩容问题

代码示例(分片路由):

public class ShardingRouter {
    
    // 分片数量
    private static final int SHARD_COUNT = 4;
    
    /**
     * 根据用户ID计算分片表名
     * @param userId 用户ID
     * @return 分片表名,如 orders_0, orders_1, ...
     */
    public static String getShardTableName(String tableName, Long userId) {
        int shardIndex = (int) (userId % SHARD_COUNT);
        return tableName + "_" + shardIndex;
    }
    
    /**
     * 批量查询时,需要查询所有分片
     */
    public static List<String> getAllShardTableNames(String tableName) {
        List<String> tableNames = new ArrayList<>();
        for (int i = 0; i < SHARD_COUNT; i++) {
            tableNames.add(tableName + "_" + i);
        }
        return tableNames;
    }
}

// 使用示例
@Service
public class OrderService {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public void createOrder(Long userId, Order order) {
        String tableName = ShardingRouter.getShardTableName("orders", userId);
        String sql = String.format(
            "INSERT INTO %s (user_id, amount, status) VALUES (?, ?, ?)",
            tableName
        );
        jdbcTemplate.update(sql, userId, order.getAmount(), order.getStatus());
    }
    
    public List<Order> getUserOrders(Long userId) {
        String tableName = ShardingRouter.getShardTableName("orders", userId);
        String sql = String.format("SELECT * FROM %s WHERE user_id = ?", tableName);
        return jdbcTemplate.query(sql, new Object[]{userId}, new OrderRowMapper());
    }
}

4.3 分布式事务

在分库分表场景下,需要处理分布式事务问题。

4.3.1 两阶段提交(2PC)

// 伪代码示例
public class TwoPhaseCommit {
    
    public boolean executeTransaction(List<DatabaseOperation> operations) {
        // 阶段1:准备
        List<Boolean> prepareResults = new ArrayList<>();
        for (DatabaseOperation op : operations) {
            try {
                boolean prepared = op.prepare();
                prepareResults.add(prepared);
            } catch (Exception e) {
                // 回滚所有已准备的操作
                rollbackAll(operations);
                return false;
            }
        }
        
        // 阶段2:提交或回滚
        if (prepareResults.contains(false)) {
            rollbackAll(operations);
            return false;
        } else {
            commitAll(operations);
            return true;
        }
    }
}

4.3.2 最终一致性方案

使用消息队列实现最终一致性:

// 伪代码示例
@Service
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Transactional
    public void createOrder(Order order) {
        // 1. 本地事务:创建订单
        orderMapper.insert(order);
        
        // 2. 发送消息到消息队列
        OrderCreatedEvent event = new OrderCreatedEvent(order.getId());
        rabbitTemplate.convertAndSend("order.exchange", "order.created", event);
        
        // 3. 消费者处理其他业务(库存扣减、积分增加等)
    }
}

// 消费者
@Component
public class OrderEventListener {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 扣减库存
        inventoryService.decreaseStock(event.getOrderId());
        
        // 增加积分
        pointService.addPoints(event.getOrderId());
        
        // 发送通知
        notificationService.sendOrderNotification(event.getOrderId());
    }
}

五、缓存策略:减轻数据库压力

5.1 多级缓存架构

应用层缓存(本地缓存)
  ↓
分布式缓存(Redis)
  ↓
数据库

5.2 缓存穿透、击穿、雪崩解决方案

5.2.1 缓存穿透

问题:查询不存在的数据,导致请求直接打到数据库。

解决方案

public class CacheService {
    
    private static final String NULL_VALUE = "NULL";
    
    public String getProductFromCache(Long productId) {
        String cacheKey = "product:" + productId;
        
        // 1. 先查缓存
        String cachedValue = redisTemplate.opsForValue().get(cacheKey);
        if (cachedValue != null) {
            return cachedValue.equals(NULL_VALUE) ? null : cachedValue;
        }
        
        // 2. 缓存未命中,查数据库
        Product product = productMapper.selectById(productId);
        
        // 3. 缓存结果(包括空值)
        if (product != null) {
            redisTemplate.opsForValue().set(cacheKey, product.toJson(), 3600, TimeUnit.SECONDS);
            return product.toJson();
        } else {
            // 缓存空值,防止穿透
            redisTemplate.opsForValue().set(cacheKey, NULL_VALUE, 60, TimeUnit.SECONDS);
            return null;
        }
    }
}

5.2.2 缓存击穿

问题:热点key过期瞬间,大量请求同时打到数据库。

解决方案

public class CacheService {
    
    public String getHotProduct(Long productId) {
        String cacheKey = "product:" + productId;
        
        // 1. 先查缓存
        String cachedValue = redisTemplate.opsForValue().get(cacheKey);
        if (cachedValue != null) {
            return cachedValue;
        }
        
        // 2. 使用分布式锁防止缓存击穿
        String lockKey = "lock:" + productId;
        String lockValue = UUID.randomUUID().toString();
        
        try {
            // 尝试获取锁(设置过期时间,防止死锁)
            Boolean locked = redisTemplate.opsForValue()
                .setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
            
            if (Boolean.TRUE.equals(locked)) {
                // 获取锁成功,查询数据库并更新缓存
                Product product = productMapper.selectById(productId);
                if (product != null) {
                    redisTemplate.opsForValue().set(cacheKey, product.toJson(), 3600, TimeUnit.SECONDS);
                    return product.toJson();
                }
            } else {
                // 未获取锁,等待并重试
                Thread.sleep(100);
                return getHotProduct(productId); // 递归重试
            }
        } catch (Exception e) {
            // 异常处理
        } finally {
            // 释放锁(使用Lua脚本保证原子性)
            String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), 
                Collections.singletonList(lockKey), lockValue);
        }
        
        return null;
    }
}

5.2.3 缓存雪崩

问题:大量缓存同时过期,导致数据库压力激增。

解决方案

public class CacheService {
    
    public void setProductCache(Long productId, Product product) {
        String cacheKey = "product:" + productId;
        
        // 1. 设置基础过期时间(如3600秒)
        int baseExpire = 3600;
        
        // 2. 添加随机值(如±300秒),避免同时过期
        int randomExpire = baseExpire + (int)(Math.random() * 600) - 300;
        
        // 3. 设置缓存
        redisTemplate.opsForValue().set(cacheKey, product.toJson(), randomExpire, TimeUnit.SECONDS);
    }
    
    // 使用布隆过滤器防止缓存雪崩
    public void initBloomFilter() {
        // 初始化布隆过滤器
        BloomFilter<Long> bloomFilter = BloomFilter.create(
            Funnels.longFunnel(), 
            1000000, // 预计插入数量
            0.01     // 误判率
        );
        
        // 加载所有有效ID到布隆过滤器
        List<Long> allProductIds = productMapper.getAllIds();
        for (Long id : allProductIds) {
            bloomFilter.put(id);
        }
        
        // 存储到Redis
        redisTemplate.opsForValue().set("bloom:products", bloomFilter);
    }
    
    public boolean mightContain(Long productId) {
        BloomFilter<Long> bloomFilter = (BloomFilter<Long>) redisTemplate.opsForValue().get("bloom:products");
        return bloomFilter != null && bloomFilter.mightContain(productId);
    }
}

5.3 缓存与数据库一致性

5.3.1 Cache Aside模式

@Service
public class CacheAsideService {
    
    public Product getProduct(Long id) {
        // 1. 先查缓存
        String cacheKey = "product:" + id;
        String cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            return JSON.parseObject(cached, Product.class);
        }
        
        // 2. 缓存未命中,查数据库
        Product product = productMapper.selectById(id);
        
        // 3. 写入缓存
        if (product != null) {
            redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(product), 3600, TimeUnit.SECONDS);
        }
        
        return product;
    }
    
    @Transactional
    public void updateProduct(Product product) {
        // 1. 更新数据库
        productMapper.updateById(product);
        
        // 2. 删除缓存(而不是更新缓存)
        String cacheKey = "product:" + product.getId();
        redisTemplate.delete(cacheKey);
    }
}

5.3.2 延迟双删策略

@Service
public class DelayDoubleDeleteService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    
    @Transactional
    public void updateProductWithDelay(Product product) {
        // 1. 删除缓存
        String cacheKey = "product:" + product.getId();
        redisTemplate.delete(cacheKey);
        
        // 2. 更新数据库
        productMapper.updateById(product);
        
        // 3. 延迟再次删除缓存(防止主从延迟导致脏数据)
        taskExecutor.submit(() -> {
            try {
                Thread.sleep(500); // 延迟500ms
                redisTemplate.delete(cacheKey);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

六、连接池优化

6.1 连接池参数调优

HikariCP配置示例(application.yml):

spring:
  datasource:
    hikari:
      # 连接池名称
      pool-name: HikariCP-Connection-Pool
      # 最小空闲连接数
      minimum-idle: 10
      # 最大连接数(根据业务调整,一般为CPU核心数*2+1)
      maximum-pool-size: 50
      # 连接超时时间(毫秒)
      connection-timeout: 30000
      # 空闲连接最大存活时间(毫秒)
      idle-timeout: 600000
      # 连接最大生命周期(毫秒)
      max-lifetime: 1800000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 连接泄漏检测阈值(毫秒)
      leak-detection-threshold: 60000
      # 连接初始化SQL
      initialization-fail-timeout: 1
      # 是否自动提交
      auto-commit: true
      # 数据库元数据
      metadata:
        # 数据库名称
        database-name: test
        # 数据库版本
        database-version: 8.0.28
        # 驱动版本
        driver-version: 8.0.28

6.2 连接池监控

@Component
public class ConnectionPoolMonitor {
    
    @Autowired
    private DataSource dataSource;
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void monitorConnectionPool() {
        if (dataSource instanceof HikariDataSource) {
            HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
            HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
            
            System.out.println("=== 连接池监控 ===");
            System.out.println("活跃连接数: " + poolMXBean.getActiveConnections());
            System.out.println("空闲连接数: " + poolMXBean.getIdleConnections());
            System.out.println("总连接数: " + poolMXBean.getTotalConnections());
            System.out.println("等待连接的线程数: " + poolMXBean.getThreadsAwaitingConnection());
            System.out.println("连接获取超时次数: " + poolMXBean.getConnectionTimeoutRate());
            System.out.println("=================");
            
            // 告警逻辑
            if (poolMXBean.getThreadsAwaitingConnection() > 10) {
                // 发送告警
                sendAlert("连接池等待线程数过高: " + poolMXBean.getThreadsAwaitingConnection());
            }
        }
    }
    
    private void sendAlert(String message) {
        // 实现告警逻辑,如发送邮件、短信等
        System.out.println("ALERT: " + message);
    }
}

七、监控与告警

7.1 关键指标监控

7.1.1 性能指标

-- 查看MySQL性能指标
SHOW GLOBAL STATUS LIKE 'Threads_%';
SHOW GLOBAL STATUS LIKE 'Connections';
SHOW GLOBAL STATUS LIKE 'Slow_queries';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_%';
SHOW GLOBAL STATUS LIKE 'Qps%';
SHOW GLOBAL STATUS LIKE 'Tps%';

7.1.2 慢查询监控

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 2; -- 超过2秒的查询记录
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';

-- 分析慢查询日志
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

7.2 监控工具集成

7.2.1 Prometheus + Grafana

Prometheus配置(prometheus.yml):

scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['mysql-exporter:9104']

MySQL Exporter配置

# 启动MySQL Exporter
docker run -d \
  --name mysql-exporter \
  -p 9104:9104 \
  -e DATA_SOURCE_NAME="user:password@(mysql:3306)/" \
  prom/mysqld-exporter

7.2.2 自定义监控脚本

#!/usr/bin/env python3
# mysql_monitor.py

import pymysql
import time
import json
from datetime import datetime

class MySQLMonitor:
    def __init__(self, host, user, password, database):
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            charset='utf8mb4'
        )
    
    def get_connection_stats(self):
        """获取连接统计信息"""
        cursor = self.connection.cursor()
        cursor.execute("SHOW STATUS LIKE 'Threads_%'")
        result = cursor.fetchall()
        stats = {row[0]: row[1] for row in result}
        cursor.close()
        return stats
    
    def get_slow_queries(self):
        """获取慢查询信息"""
        cursor = self.connection.cursor()
        cursor.execute("""
            SELECT 
                COUNT(*) as slow_query_count,
                AVG(query_time) as avg_query_time,
                MAX(query_time) as max_query_time
            FROM mysql.slow_log
            WHERE start_time > DATE_SUB(NOW(), INTERVAL 1 HOUR)
        """)
        result = cursor.fetchone()
        cursor.close()
        return {
            'slow_query_count': result[0],
            'avg_query_time': result[1],
            'max_query_time': result[2]
        }
    
    def get_buffer_pool_stats(self):
        """获取InnoDB缓冲池统计"""
        cursor = self.connection.cursor()
        cursor.execute("""
            SELECT 
                variable_name,
                variable_value
            FROM performance_schema.global_status
            WHERE variable_name LIKE 'Innodb_buffer_pool%'
        """)
        result = cursor.fetchall()
        cursor.close()
        return {row[0]: row[1] for row in result}
    
    def monitor(self):
        """执行监控并输出JSON格式结果"""
        timestamp = datetime.now().isoformat()
        
        metrics = {
            'timestamp': timestamp,
            'connection_stats': self.get_connection_stats(),
            'slow_queries': self.get_slow_queries(),
            'buffer_pool_stats': self.get_buffer_pool_stats()
        }
        
        # 输出JSON格式
        print(json.dumps(metrics, indent=2))
        
        # 可以发送到监控系统
        # self.send_to_prometheus(metrics)
    
    def send_to_prometheus(self, metrics):
        """发送指标到Prometheus Pushgateway"""
        import requests
        
        pushgateway_url = "http://pushgateway:9091/metrics/job/mysql"
        
        # 转换为Prometheus格式
        prometheus_metrics = []
        for key, value in metrics['connection_stats'].items():
            prometheus_metrics.append(f'mysql_{key} {value}')
        
        for key, value in metrics['slow_queries'].items():
            prometheus_metrics.append(f'mysql_{key} {value}')
        
        for key, value in metrics['buffer_pool_stats'].items():
            prometheus_metrics.append(f'mysql_{key} {value}')
        
        # 发送数据
        requests.post(pushgateway_url, data='\n'.join(prometheus_metrics))

if __name__ == '__main__':
    monitor = MySQLMonitor(
        host='localhost',
        user='monitor',
        password='password',
        database='mysql'
    )
    
    # 每分钟执行一次监控
    while True:
        monitor.monitor()
        time.sleep(60)

八、实战案例:秒杀系统设计

8.1 系统架构设计

用户请求 → Nginx → 应用服务器集群 → 消息队列 → 库存服务 → MySQL
                ↓
            Redis缓存

8.2 核心代码实现

8.2.1 库存预热

@Service
public class StockPreheatService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    /**
     * 秒杀开始前,将库存预热到Redis
     */
    public void preheatStock(Long productId, int stock) {
        String stockKey = "seckill:stock:" + productId;
        String stockLockKey = "seckill:lock:" + productId;
        
        // 1. 设置库存到Redis
        redisTemplate.opsForValue().set(stockKey, String.valueOf(stock), 2, TimeUnit.HOURS);
        
        // 2. 设置分布式锁(防止并发预热)
        redisTemplate.opsForValue().setIfAbsent(stockLockKey, "1", 30, TimeUnit.SECONDS);
        
        // 3. 同步到数据库(异步)
        CompletableFuture.runAsync(() -> {
            Product product = new Product();
            product.setId(productId);
            product.setStock(stock);
            productMapper.updateById(product);
        });
    }
}

8.2.2 秒杀下单流程

@Service
public class SeckillOrderService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    /**
     * 秒杀下单(Redis预扣库存)
     */
    @Transactional
    public SeckillResult seckill(Long productId, Long userId) {
        String stockKey = "seckill:stock:" + productId;
        String orderKey = "seckill:order:" + productId + ":" + userId;
        
        // 1. 检查是否已下单
        if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
            return SeckillResult.fail("您已参与过秒杀");
        }
        
        // 2. 预扣库存(使用Lua脚本保证原子性)
        String luaScript = 
            "local stock = redis.call('get', KEYS[1]) " +
            "if not stock or tonumber(stock) <= 0 then " +
            "   return 0 " +
            "end " +
            "redis.call('decr', KEYS[1]) " +
            "return 1";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(luaScript, Long.class),
            Collections.singletonList(stockKey)
        );
        
        if (result == 0) {
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 记录已下单
        redisTemplate.opsForValue().set(orderKey, "1", 2, TimeUnit.HOURS);
        
        // 4. 发送消息到消息队列,异步创建订单
        SeckillMessage message = new SeckillMessage(productId, userId);
        rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
        
        return SeckillResult.success("秒杀成功,订单处理中");
    }
    
    /**
     * 消费消息队列,创建正式订单
     */
    @RabbitListener(queues = "seckill.queue")
    public void createOrder(SeckillMessage message) {
        try {
            // 1. 检查库存(数据库)
            Product product = productMapper.selectById(message.getProductId());
            if (product == null || product.getStock() <= 0) {
                // 库存不足,回滚Redis库存
                String stockKey = "seckill:stock:" + message.getProductId();
                redisTemplate.opsForValue().increment(stockKey);
                return;
            }
            
            // 2. 扣减数据库库存
            int updated = productMapper.decreaseStock(message.getProductId());
            if (updated == 0) {
                // 库存不足,回滚Redis库存
                String stockKey = "seckill:stock:" + message.getProductId();
                redisTemplate.opsForValue().increment(stockKey);
                return;
            }
            
            // 3. 创建订单
            Order order = new Order();
            order.setProductId(message.getProductId());
            order.setUserId(message.getUserId());
            order.setStatus("PAID");
            order.setCreateTime(new Date());
            orderMapper.insert(order);
            
            // 4. 发送订单创建成功事件
            OrderCreatedEvent event = new OrderCreatedEvent(order.getId());
            rabbitTemplate.convertAndSend("order.exchange", "order.created", event);
            
        } catch (Exception e) {
            // 异常处理:回滚Redis库存
            String stockKey = "seckill:stock:" + message.getProductId();
            redisTemplate.opsForValue().increment(stockKey);
            throw e;
        }
    }
}

8.2.3 Lua脚本优化

-- seckill.lua
-- 秒杀Lua脚本:检查库存并扣减
local stockKey = KEYS[1]
local orderKey = KEYS[2]
local userId = ARGV[1]
local productId = ARGV[2]

-- 检查是否已下单
if redis.call('exists', orderKey) == 1 then
    return {0, "您已参与过秒杀"}
end

-- 检查库存
local stock = redis.call('get', stockKey)
if not stock or tonumber(stock) <= 0 then
    return {0, "库存不足"}
end

-- 扣减库存
redis.call('decr', stockKey)

-- 记录已下单
redis.call('set', orderKey, "1", "EX", 7200)

return {1, "秒杀成功"}

Java调用Lua脚本

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public SeckillResult seckillWithLua(Long productId, Long userId) {
        String stockKey = "seckill:stock:" + productId;
        String orderKey = "seckill:order:" + productId + ":" + userId;
        
        // 加载Lua脚本
        DefaultRedisScript<List> redisScript = new DefaultRedisScript<>();
        redisScript.setLocation(new ClassPathResource("seckill.lua"));
        redisScript.setResultType(List.class);
        
        // 执行脚本
        List result = redisTemplate.execute(
            redisScript,
            Arrays.asList(stockKey, orderKey),
            String.valueOf(userId),
            String.valueOf(productId)
        );
        
        Long code = (Long) result.get(0);
        String message = (String) result.get(1);
        
        if (code == 1) {
            // 发送消息到队列
            sendToQueue(productId, userId);
            return SeckillResult.success(message);
        } else {
            return SeckillResult.fail(message);
        }
    }
}

九、总结与最佳实践

9.1 性能优化检查清单

  1. 索引优化

    • [ ] 检查慢查询日志,优化全表扫描
    • [ ] 确保WHERE、JOIN、ORDER BY字段有索引
    • [ ] 避免索引失效的写法
    • [ ] 定期清理冗余索引
  2. 查询优化

    • [ ] 使用EXPLAIN分析查询计划
    • [ ] 避免SELECT *
    • [ ] 优化分页查询
    • [ ] 减少JOIN操作
  3. 事务与锁

    • [ ] 选择合适的事务隔离级别
    • [ ] 缩短事务持有时间
    • [ ] 避免长事务
    • [ ] 使用乐观锁替代悲观锁
  4. 架构优化

    • [ ] 实现读写分离
    • [ ] 考虑分库分表
    • [ ] 使用缓存减轻数据库压力
    • [ ] 引入消息队列解耦
  5. 监控与告警

    • [ ] 监控关键性能指标
    • [ ] 设置慢查询告警
    • [ ] 监控连接池状态
    • [ ] 定期性能测试

9.2 常见误区与注意事项

  1. 过度索引:索引不是越多越好,每个索引都会增加写操作的开销
  2. 忽视事务:高并发下事务管理不当会导致死锁和性能问题
  3. 缓存滥用:缓存不是银弹,需要考虑一致性、穿透、击穿、雪崩等问题
  4. 盲目分库分表:分库分表会增加系统复杂度,应在单机优化达到极限后再考虑
  5. 忽视监控:没有监控的系统就像盲人摸象,无法及时发现问题

9.3 未来演进方向

  1. 云原生数据库:考虑使用云数据库服务(如RDS、Aurora),自动处理高可用和扩展性
  2. NewSQL数据库:对于极致性能要求,可考虑TiDB、CockroachDB等分布式数据库
  3. HTAP混合事务分析处理:使用TiDB、OceanBase等支持实时分析的数据库
  4. AI驱动的优化:利用机器学习自动优化索引和查询

结语

MySQL高并发处理是一个系统工程,需要从多个层面进行优化。本文从索引优化、查询优化、事务锁优化、架构升级、缓存策略、连接池优化、监控告警等多个维度进行了详细阐述,并提供了丰富的实战案例和代码示例。

记住,没有银弹。每个系统的优化策略都需要根据具体的业务场景、数据规模、并发量等因素来定制。建议从最简单的索引优化开始,逐步深入到架构优化,同时建立完善的监控体系,持续迭代优化。

在实际应用中,建议采用以下步骤:

  1. 基准测试:了解当前系统的性能瓶颈
  2. 逐步优化:从索引和查询优化开始
  3. 架构演进:当单机优化达到极限时,考虑架构升级
  4. 持续监控:建立完善的监控告警体系
  5. 定期复盘:定期回顾优化效果,调整策略

通过系统性的优化,MySQL完全能够应对高并发场景的挑战,为业务提供稳定、高效的数据库服务。