引言

在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交平台还是金融交易系统,都需要处理海量并发请求。MySQL作为最流行的关系型数据库之一,如何在高并发环境下保持稳定和高性能,是每个开发者和DBA必须面对的挑战。

本文将从架构设计、数据库配置、SQL优化、代码调优等多个维度,系统性地介绍MySQL高并发处理的实战策略。我们将通过具体案例和代码示例,展示如何从底层到上层全方位提升MySQL的并发处理能力。

一、架构层面的优化策略

1.1 读写分离架构

核心思想:将读操作和写操作分离到不同的数据库实例,减轻主库压力。

实现方案

  • 主库(Master):处理所有写操作(INSERT、UPDATE、DELETE)
  • 从库(Slave):处理所有读操作(SELECT)
  • 中间件:使用ProxySQL、MyCat或ShardingSphere进行路由

代码示例(使用Spring Boot + MyBatis实现读写分离)

// 1. 配置多数据源
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

// 2. 自定义路由策略
public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

// 3. 数据源上下文管理器
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return contextHolder.get();
    }
    
    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

// 4. AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Around("@annotation(readOnly)")
    public Object switchDataSource(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
        try {
            DataSourceContextHolder.setDataSourceType("slave");
            return joinPoint.proceed();
        } finally {
            DataSourceContextHolder.clearDataSourceType();
        }
    }
    
    @Around("@annotation(writeOnly)")
    public Object switchToMaster(ProceedingJoinPoint joinPoint, WriteOnly writeOnly) throws Throwable {
        try {
            DataSourceContextHolder.setDataSourceType("master");
            return joinPoint.proceed();
        } finally {
            DataSourceContextHolder.clearDataSourceType();
        }
    }
}

// 5. 自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
}

// 6. 业务层使用示例
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @ReadOnly
    public List<Order> getOrdersByUserId(Long userId) {
        return orderMapper.selectByUserId(userId);
    }
    
    @WriteOnly
    public void createOrder(Order order) {
        orderMapper.insert(order);
    }
}

性能对比

  • 单库架构:QPS约2000-3000
  • 读写分离架构:QPS可提升至8000-10000(读操作占70%时)

1.2 分库分表策略

适用场景:单表数据量超过1000万行,或单库连接数达到上限。

分片策略

  • 水平分表:按用户ID、时间等维度拆分
  • 垂直分表:按业务字段拆分(如订单表拆分为订单基本信息表和订单详情表)

代码示例(使用ShardingSphere实现分库分表)

# sharding.yaml配置文件
dataSources:
  ds_0: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_0
    username: root
    password: 123456
  ds_1: !!com.zaxxer.hikari.HikariDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    jdbcUrl: jdbc:mysql://localhost:3306/db_1
    username: root
    password: 123456

shardingRule:
  tables:
    order:
      actualDataNodes: ds_${0..1}.order_${0..3}
      tableStrategy:
        standard:
          shardingColumn: order_id
          preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
      databaseStrategy:
        standard:
          shardingColumn: user_id
          preciseAlgorithmClassName: com.example.OrderDatabaseShardingAlgorithm
  bindingTables:
    - order
  defaultDatabaseStrategy:
    none:
  defaultTableStrategy:
    none:

props:
  sql.show: true
// 分片算法实现
public class OrderDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        int hash = userId.hashCode() % 2;
        return "ds_" + hash;
    }
}

public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
    
    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long orderId = shardingValue.getValue();
        int hash = Math.abs(orderId.hashCode()) % 4;
        return "order_" + hash;
    }
}

1.3 缓存层设计

Redis缓存策略

  • 热点数据缓存
  • 缓存穿透防护
  • 缓存雪崩预防

代码示例(Spring Cache + Redis)

// 1. 配置Redis缓存
@Configuration
@EnableCaching
public class RedisCacheConfig {
    
    @Bean
    public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofMinutes(10))
            .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
        
        return RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
    }
}

// 2. 缓存服务层
@Service
public class ProductService {
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 缓存穿透防护:使用空值缓存
    @Cacheable(value = "products", key = "#productId", unless = "#result == null")
    public Product getProductById(Long productId) {
        Product product = productMapper.selectById(productId);
        if (product == null) {
            // 缓存空值,防止缓存穿透
            redisTemplate.opsForValue().set("product:" + productId, "", Duration.ofMinutes(5));
            return null;
        }
        return product;
    }
    
    // 缓存更新策略
    @CachePut(value = "products", key = "#product.id")
    public Product updateProduct(Product product) {
        productMapper.updateById(product);
        return product;
    }
    
    // 批量查询优化
    public List<Product> getProductsByIds(List<Long> ids) {
        // 1. 先查缓存
        List<Product> result = new ArrayList<>();
        List<Long> missIds = new ArrayList<>();
        
        for (Long id : ids) {
            String key = "product:" + id;
            Product product = (Product) redisTemplate.opsForValue().get(key);
            if (product != null) {
                result.add(product);
            } else {
                missIds.add(id);
            }
        }
        
        // 2. 批量查询数据库
        if (!missIds.isEmpty()) {
            List<Product> dbProducts = productMapper.selectBatchIds(missIds);
            // 3. 写入缓存
            for (Product product : dbProducts) {
                redisTemplate.opsForValue().set("product:" + product.getId(), product, Duration.ofMinutes(10));
                result.add(product);
            }
        }
        
        return result;
    }
}

二、MySQL数据库配置优化

2.1 连接池配置

HikariCP配置示例

# application.yml
spring:
  datasource:
    hikari:
      # 连接池大小
      maximum-pool-size: 20
      minimum-idle: 10
      # 连接超时
      connection-timeout: 30000
      # 空闲连接存活时间
      idle-timeout: 600000
      # 连接最大生命周期
      max-lifetime: 1800000
      # 连接测试查询
      connection-test-query: SELECT 1
      # 连接预热
      initialization-fail-timeout: 1
      # 连接泄漏检测
      leak-detection-threshold: 60000

连接池大小计算公式

连接数 = (核心数 × 2) + 有效磁盘数

例如:4核CPU + 2块SSD = 4×2 + 2 = 10个连接

2.2 MySQL服务器参数调优

my.cnf关键配置

[mysqld]
# 内存配置
innodb_buffer_pool_size = 12G  # 通常设置为物理内存的70-80%
innodb_buffer_pool_instances = 8  # 缓冲池实例数,建议4-8

# 日志配置
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2  # 1-性能,2-安全

# 连接配置
max_connections = 500
thread_cache_size = 50
table_open_cache = 2000

# 查询缓存(MySQL 8.0已移除)
# query_cache_type = 0
# query_cache_size = 0

# InnoDB配置
innodb_flush_method = O_DIRECT
innodb_file_per_table = ON
innodb_flush_neighbors = 0  # SSD设置为0
innodb_io_capacity = 2000  # SSD设置为2000-4000

# 临时表配置
tmp_table_size = 256M
max_heap_table_size = 256M

# 排序缓冲区
sort_buffer_size = 4M
join_buffer_size = 4M

# 索引缓冲区
key_buffer_size = 256M

# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1

2.3 监控与诊断

使用Performance Schema监控

-- 查看活跃连接
SELECT * FROM performance_schema.threads WHERE TYPE = 'FOREGROUND';

-- 查看锁等待
SELECT * FROM performance_schema.data_lock_waits;

-- 查看慢查询
SELECT * FROM mysql.slow_log ORDER BY query_time DESC LIMIT 10;

-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G

使用sys schema诊断

-- 查看最耗时的查询
SELECT * FROM sys.statements_with_temp_tables ORDER BY exec_count DESC LIMIT 10;

-- 查看未使用索引的表
SELECT * FROM sys.schema_unused_indexes;

-- 查看重复索引
SELECT * FROM sys.schema_redundant_indexes;

-- 查看锁等待详情
SELECT * FROM sys.innodb_lock_waits;

三、SQL语句优化

3.1 索引优化策略

索引设计原则

  1. 最左前缀原则
  2. 覆盖索引
  3. 索引下推
  4. 避免索引失效

代码示例(索引优化前后对比)

-- 优化前:全表扫描
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';

-- 优化后:使用索引
-- 1. 创建索引
CREATE INDEX idx_create_time ON orders(create_time);

-- 2. 改写查询
SELECT * FROM orders 
WHERE create_time >= '2024-01-01 00:00:00' 
  AND create_time < '2024-01-02 00:00:00';

-- 复合索引示例
-- 创建复合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time);

-- 有效查询(使用最左前缀)
SELECT * FROM orders WHERE user_id = 123;
SELECT * FROM orders WHERE user_id = 123 AND status = 1;
SELECT * FROM orders WHERE user_id = 123 AND status = 1 AND create_time > '2024-01-01';

-- 无效查询(跳过user_id)
SELECT * FROM orders WHERE status = 1;
SELECT * FROM orders WHERE create_time > '2024-01-01';

-- 覆盖索引示例
-- 创建覆盖索引
CREATE INDEX idx_cover ON orders(user_id, status, create_time, amount);

-- 查询只需扫描索引,无需回表
SELECT user_id, status, create_time, amount 
FROM orders 
WHERE user_id = 123 AND status = 1;

3.2 查询语句优化

*避免SELECT **

-- 优化前:全表扫描,返回所有字段
SELECT * FROM users WHERE age > 18;

-- 优化后:只查询需要的字段,使用索引
SELECT id, name, email FROM users WHERE age > 18;

-- 使用EXPLAIN分析
EXPLAIN SELECT id, name, email FROM users WHERE age > 18;

JOIN优化

-- 优化前:嵌套循环连接
SELECT o.*, u.name 
FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE o.create_time > '2024-01-01';

-- 优化后:使用索引和合适的连接顺序
-- 1. 确保连接字段有索引
CREATE INDEX idx_user_id ON orders(user_id);
CREATE INDEX idx_id ON users(id);

-- 2. 调整连接顺序(小表驱动大表)
SELECT o.*, u.name 
FROM users u 
JOIN orders o ON u.id = o.user_id 
WHERE u.create_time > '2024-01-01';

子查询优化

-- 优化前:相关子查询,性能差
SELECT * FROM orders o 
WHERE o.user_id IN (
    SELECT id FROM users WHERE status = 1
);

-- 优化后:使用JOIN或EXISTS
-- 方法1:使用JOIN
SELECT o.* 
FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE u.status = 1;

-- 方法2:使用EXISTS(当users表很大时)
SELECT * FROM orders o 
WHERE EXISTS (
    SELECT 1 FROM users u 
    WHERE u.id = o.user_id AND u.status = 1
);

3.3 批量操作优化

批量插入优化

// 优化前:逐条插入,性能差
public void insertOrders(List<Order> orders) {
    for (Order order : orders) {
        orderMapper.insert(order); // 每次都提交事务
    }
}

// 优化后:批量插入,减少网络开销
public void insertOrdersBatch(List<Order> orders) {
    // 1. 使用MyBatis批量插入
    orderMapper.insertBatch(orders);
    
    // 2. 或者使用JDBC批量
    try (Connection conn = dataSource.getConnection();
         PreparedStatement ps = conn.prepareStatement(
             "INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)")) {
        
        conn.setAutoCommit(false);
        int batchSize = 1000;
        
        for (int i = 0; i < orders.size(); i++) {
            Order order = orders.get(i);
            ps.setLong(1, order.getUserId());
            ps.setBigDecimal(2, order.getAmount());
            ps.setInt(3, order.getStatus());
            ps.addBatch();
            
            if ((i + 1) % batchSize == 0) {
                ps.executeBatch();
                conn.commit();
            }
        }
        
        // 提交剩余批次
        ps.executeBatch();
        conn.commit();
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}

MyBatis批量插入XML配置

<!-- 批量插入 -->
<insert id="insertBatch" parameterType="list">
    INSERT INTO orders (user_id, amount, status, create_time) VALUES
    <foreach collection="list" item="order" separator=",">
        (#{order.userId}, #{order.amount}, #{order.status}, #{order.createTime})
    </foreach>
</insert>

四、事务与锁优化

4.1 事务隔离级别选择

隔离级别对比

隔离级别 脏读 不可重复读 幻读 性能
READ UNCOMMITTED 可能 可能 可能 最高
READ COMMITTED 不可能 可能 可能
REPEATABLE READ 不可能 不可能 可能
SERIALIZABLE 不可能 不可能 不可能

代码示例(事务隔离级别设置)

// Spring Boot事务配置
@Configuration
public class TransactionConfig {
    
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
        transactionManager.setDataSource(dataSource);
        return transactionManager;
    }
}

// 业务层使用
@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private ProductMapper productMapper;
    
    // 使用READ COMMITTED级别(适合高并发场景)
    @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRED)
    public void createOrderWithStockCheck(Long productId, Integer quantity) {
        // 1. 检查库存
        Product product = productMapper.selectById(productId);
        if (product.getStock() < quantity) {
            throw new RuntimeException("库存不足");
        }
        
        // 2. 扣减库存
        int updated = productMapper.decreaseStock(productId, quantity);
        if (updated == 0) {
            throw new RuntimeException("库存扣减失败");
        }
        
        // 3. 创建订单
        Order order = new Order();
        order.setProductId(productId);
        order.setQuantity(quantity);
        order.setAmount(product.getPrice().multiply(BigDecimal.valueOf(quantity)));
        orderMapper.insert(order);
    }
}

4.2 锁优化策略

行锁优化

-- 优化前:全表锁定
UPDATE products SET stock = stock - 1 WHERE id = 123;

-- 优化后:使用索引,只锁定行
-- 确保WHERE条件使用索引
CREATE INDEX idx_id ON products(id);

-- 使用SELECT ... FOR UPDATE(悲观锁)
BEGIN;
SELECT stock FROM products WHERE id = 123 FOR UPDATE;
-- 业务逻辑处理
UPDATE products SET stock = stock - 1 WHERE id = 123;
COMMIT;

-- 使用乐观锁(版本号机制)
-- 1. 表结构添加版本号字段
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;

-- 2. 更新时检查版本号
UPDATE products 
SET stock = stock - 1, version = version + 1 
WHERE id = 123 AND version = #{oldVersion};

-- 3. Java代码实现
public boolean decreaseStockWithOptimisticLock(Long productId, Integer quantity, Integer oldVersion) {
    int updated = productMapper.decreaseStockWithVersion(productId, quantity, oldVersion);
    return updated > 0;
}

死锁预防

-- 死锁场景:两个事务交叉锁定
-- 事务A:锁定行1,尝试锁定行2
-- 事务B:锁定行2,尝试锁定行1

-- 预防策略:
-- 1. 固定加锁顺序
BEGIN;
-- 先锁定id小的行
SELECT * FROM orders WHERE id < 100 FOR UPDATE;
SELECT * FROM orders WHERE id > 100 FOR UPDATE;
-- 业务处理
COMMIT;

-- 2. 使用SELECT ... FOR UPDATE SKIP LOCKED(MySQL 8.0+)
BEGIN;
-- 跳过已锁定的行
SELECT * FROM orders WHERE status = 1 FOR UPDATE SKIP LOCKED;
-- 处理未锁定的行
COMMIT;

4.3 长事务优化

长事务问题

  • 锁持有时间长
  • 回滚段压力大
  • 影响并发性能

优化方案

// 优化前:长事务
@Transactional
public void processLargeBatch() {
    // 1. 查询大量数据
    List<Order> orders = orderMapper.selectAll();
    
    // 2. 业务处理(耗时操作)
    for (Order order : orders) {
        // 调用外部API
        callExternalAPI(order);
        // 复杂计算
        calculateComplexData(order);
    }
    
    // 3. 更新数据库
    orderMapper.updateBatch(orders);
}

// 优化后:拆分事务
public void processLargeBatchOptimized() {
    // 1. 分批查询
    int batchSize = 100;
    int page = 0;
    
    while (true) {
        List<Order> orders = orderMapper.selectByPage(page, batchSize);
        if (orders.isEmpty()) {
            break;
        }
        
        // 2. 每批独立事务
        processBatch(orders);
        
        page++;
    }
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void processBatch(List<Order> orders) {
    // 业务处理
    for (Order order : orders) {
        callExternalAPI(order);
        calculateComplexData(order);
    }
    
    // 更新数据库
    orderMapper.updateBatch(orders);
}

五、应用层优化策略

5.1 连接池优化

HikariCP高级配置

@Configuration
public class HikariConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        
        // 连接池大小动态调整
        dataSource.setMaximumPoolSize(calculateOptimalPoolSize());
        
        // 连接预热
        dataSource.setInitializationFailTimeout(1);
        
        // 连接泄漏检测
        dataSource.setLeakDetectionThreshold(60000);
        
        // 连接测试
        dataSource.setConnectionTestQuery("SELECT 1");
        
        return dataSource;
    }
    
    private int calculateOptimalPoolSize() {
        // 根据CPU核心数和磁盘数计算
        int cores = Runtime.getRuntime().availableProcessors();
        int disks = 2; // 假设有2块SSD
        return cores * 2 + disks;
    }
}

5.2 异步处理

使用CompletableFuture异步化

@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ExecutorService executorService;
    
    // 异步创建订单
    public CompletableFuture<Order> createOrderAsync(Long userId, Long productId, Integer quantity) {
        return CompletableFuture.supplyAsync(() -> {
            // 1. 检查库存(同步)
            Product product = productMapper.selectById(productId);
            if (product.getStock() < quantity) {
                throw new RuntimeException("库存不足");
            }
            
            // 2. 扣减库存(同步)
            int updated = productMapper.decreaseStock(productId, quantity);
            if (updated == 0) {
                throw new RuntimeException("库存扣减失败");
            }
            
            // 3. 创建订单(同步)
            Order order = new Order();
            order.setUserId(userId);
            order.setProductId(productId);
            order.setQuantity(quantity);
            order.setAmount(product.getPrice().multiply(BigDecimal.valueOf(quantity)));
            orderMapper.insert(order);
            
            return order;
        }, executorService);
    }
    
    // 批量异步处理
    public List<CompletableFuture<Order>> createOrdersBatch(List<OrderRequest> requests) {
        return requests.stream()
            .map(request -> createOrderAsync(request.getUserId(), request.getProductId(), request.getQuantity()))
            .collect(Collectors.toList());
    }
    
    // 等待所有异步任务完成
    public List<Order> waitForAllOrders(List<CompletableFuture<Order>> futures) {
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        
        return allFutures.thenApply(v -> 
            futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        ).join();
    }
}

5.3 批量处理优化

使用Spring Batch处理大数据量

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    
    @Bean
    public Job importOrdersJob(JobBuilderFactory jobBuilderFactory, 
                               StepBuilderFactory stepBuilderFactory) {
        return jobBuilderFactory.get("importOrdersJob")
            .incrementer(new RunIdIncrementer())
            .flow(importOrdersStep(stepBuilderFactory))
            .end()
            .build();
    }
    
    @Bean
    public Step importOrdersStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("importOrdersStep")
            .<Order, Order>chunk(1000)
            .reader(orderReader())
            .processor(orderProcessor())
            .writer(orderWriter())
            .build();
    }
    
    @Bean
    public ItemReader<Order> orderReader() {
        // 从文件或数据库读取
        return new JdbcCursorItemReader<Order>() {{
            setSql("SELECT * FROM orders WHERE status = 0");
            setRowMapper(new BeanPropertyRowMapper<>(Order.class));
        }};
    }
    
    @Bean
    public ItemProcessor<Order, Order> orderProcessor() {
        return order -> {
            // 业务处理
            order.setStatus(1);
            order.setProcessedTime(new Date());
            return order;
        };
    }
    
    @Bean
    public ItemWriter<Order> orderWriter() {
        return orders -> {
            // 批量更新
            orderMapper.updateBatch(orders);
        };
    }
}

六、监控与调优实战

6.1 性能监控体系

Prometheus + Grafana监控方案

# prometheus.yml配置
scrape_configs:
  - job_name: 'mysql'
    static_configs:
      - targets: ['localhost:9104']
    metrics_path: '/metrics'
    
  - job_name: 'application'
    static_configs:
      - targets: ['localhost:8080']

自定义监控指标(Spring Boot Actuator)

@Component
public class DatabaseMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Autowired
    private DataSource dataSource;
    
    @PostConstruct
    public void init() {
        // 监控连接池使用情况
        Gauge.builder("hikari.connections.active", dataSource, 
            ds -> ((HikariDataSource) ds).getHikariPoolMXBean().getActiveConnections())
            .description("Active database connections")
            .register(meterRegistry);
        
        // 监控查询性能
        Counter.builder("database.queries.total")
            .description("Total database queries")
            .register(meterRegistry);
    }
    
    public void recordQueryTime(long timeMs) {
        Timer.builder("database.query.time")
            .description("Query execution time")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry)
            .record(timeMs, TimeUnit.MILLISECONDS);
    }
}

6.2 慢查询分析

慢查询日志分析工具

# 1. 开启慢查询日志
mysql> SET GLOBAL slow_query_log = 1;
mysql> SET GLOBAL long_query_time = 1;
mysql> SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';

# 2. 使用pt-query-digest分析
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 3. 使用mysqldumpslow
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

慢查询优化案例

-- 慢查询示例
SELECT * FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE u.status = 1 
  AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC 
LIMIT 1000;

-- 优化步骤:
-- 1. 使用EXPLAIN分析
EXPLAIN SELECT * FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE u.status = 1 
  AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC 
LIMIT 1000;

-- 2. 添加索引
CREATE INDEX idx_user_status ON users(status);
CREATE INDEX idx_create_time ON orders(create_time);
CREATE INDEX idx_user_id ON orders(user_id);

-- 3. 优化查询(只查询需要的字段)
SELECT o.id, o.amount, o.create_time, u.name 
FROM orders o 
JOIN users u ON o.user_id = u.id 
WHERE u.status = 1 
  AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC 
LIMIT 1000;

6.3 压力测试

使用JMeter进行压力测试

<!-- JMeter测试计划示例 -->
<TestPlan>
  <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="高并发测试">
    <stringProp name="ThreadGroup.num_threads">1000</stringProp>
    <stringProp name="ThreadGroup.ramp_time">60</stringProp>
    <stringProp name="ThreadGroup.duration">300</stringProp>
  </ThreadGroup>
  
  <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="查询订单">
    <stringProp name="HTTPSampler.domain">localhost</stringProp>
    <stringProp name="HTTPSampler.port">8080</stringProp>
    <stringProp name="HTTPSampler.path">/api/orders/123</stringProp>
    <stringProp name="HTTPSampler.method">GET</stringProp>
  </HTTPSamplerProxy>
</TestPlan>

测试结果分析

  • 响应时间:P95 < 200ms
  • 吞吐量:QPS > 5000
  • 错误率:< 0.1%

七、实战案例:电商秒杀系统

7.1 架构设计

用户请求 → Nginx → 应用服务 → Redis缓存 → MySQL数据库

7.2 核心代码实现

@Service
public class SeckillService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private ProductMapper productMapper;
    
    // 秒杀商品库存预热
    @PostConstruct
    public void preloadStock() {
        List<Product> products = productMapper.selectAll();
        for (Product product : products) {
            redisTemplate.opsForValue().set("seckill:stock:" + product.getId(), 
                                           product.getStock(), 
                                           Duration.ofHours(2));
        }
    }
    
    // 秒杀下单
    @Transactional
    public Long seckill(Long productId, Long userId) {
        // 1. 预减库存(Redis原子操作)
        Long stock = redisTemplate.opsForValue().decrement("seckill:stock:" + productId);
        if (stock < 0) {
            // 库存不足,回滚
            redisTemplate.opsForValue().increment("seckill:stock:" + productId);
            throw new RuntimeException("秒杀失败,库存不足");
        }
        
        // 2. 检查是否已秒杀
        String key = "seckill:user:" + productId + ":" + userId;
        if (redisTemplate.hasKey(key)) {
            throw new RuntimeException("您已参与过秒杀");
        }
        
        // 3. 创建订单(异步)
        createOrderAsync(productId, userId);
        
        // 4. 标记已秒杀
        redisTemplate.opsForValue().set(key, 1, Duration.ofHours(2));
        
        return System.currentTimeMillis();
    }
    
    // 异步创建订单
    @Async
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void createOrderAsync(Long productId, Long userId) {
        // 从Redis获取库存
        Long stock = (Long) redisTemplate.opsForValue().get("seckill:stock:" + productId);
        
        // 扣减数据库库存
        int updated = productMapper.decreaseStock(productId, 1);
        if (updated == 0) {
            // 库存不足,回滚Redis
            redisTemplate.opsForValue().increment("seckill:stock:" + productId);
            return;
        }
        
        // 创建订单
        Order order = new Order();
        order.setProductId(productId);
        order.setUserId(userId);
        order.setQuantity(1);
        order.setStatus(1);
        order.setCreateTime(new Date());
        orderMapper.insert(order);
        
        // 发送消息到MQ,后续处理(如支付、物流等)
        sendOrderMessage(order);
    }
    
    // 库存同步任务
    @Scheduled(fixedRate = 5000)
    public void syncStock() {
        // 定期将Redis库存同步到数据库
        Set<String> keys = redisTemplate.keys("seckill:stock:*");
        for (String key : keys) {
            Long productId = Long.parseLong(key.split(":")[2]);
            Long redisStock = (Long) redisTemplate.opsForValue().get(key);
            Product product = productMapper.selectById(productId);
            if (product != null && !product.getStock().equals(redisStock)) {
                productMapper.updateStock(productId, redisStock.intValue());
            }
        }
    }
}

7.3 性能优化点

  1. Redis原子操作:使用DECR/INCR保证库存扣减原子性
  2. 异步下单:将订单创建异步化,提升响应速度
  3. 库存预热:提前将库存加载到Redis
  4. 限流措施:使用Redis+Lua脚本实现限流
-- 限流Lua脚本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local expire = tonumber(ARGV[2])

local current = redis.call('GET', key)
if current and tonumber(current) >= limit then
    return 0
else
    redis.call('INCR', key)
    if tonumber(current) == 0 then
        redis.call('EXPIRE', key, expire)
    end
    return 1
end

八、总结与最佳实践

8.1 高并发处理黄金法则

  1. 缓存为王:80%的请求应该被缓存拦截
  2. 读写分离:减轻主库压力
  3. 异步处理:提升响应速度
  4. 限流降级:保护系统稳定性
  5. 监控告警:及时发现问题

8.2 调优检查清单

  • [ ] 数据库连接池大小是否合理?
  • [ ] 索引是否覆盖查询?
  • [ ] 是否存在慢查询?
  • [ ] 事务是否过长?
  • [ ] 是否有缓存穿透/雪崩风险?
  • [ ] 是否有合适的限流策略?
  • [ ] 监控体系是否完善?

8.3 持续优化建议

  1. 定期分析慢查询日志
  2. 监控数据库性能指标
  3. 压力测试验证优化效果
  4. 保持数据库结构简洁
  5. 定期清理历史数据

通过以上策略的综合应用,可以显著提升MySQL在高并发场景下的性能表现。记住,优化是一个持续的过程,需要根据业务特点和系统负载不断调整和改进。