引言
在当今互联网应用中,高并发场景已成为常态。无论是电商秒杀、社交平台还是金融交易系统,都需要处理海量并发请求。MySQL作为最流行的关系型数据库之一,如何在高并发环境下保持稳定和高性能,是每个开发者和DBA必须面对的挑战。
本文将从架构设计、数据库配置、SQL优化、代码调优等多个维度,系统性地介绍MySQL高并发处理的实战策略。我们将通过具体案例和代码示例,展示如何从底层到上层全方位提升MySQL的并发处理能力。
一、架构层面的优化策略
1.1 读写分离架构
核心思想:将读操作和写操作分离到不同的数据库实例,减轻主库压力。
实现方案:
- 主库(Master):处理所有写操作(INSERT、UPDATE、DELETE)
- 从库(Slave):处理所有读操作(SELECT)
- 中间件:使用ProxySQL、MyCat或ShardingSphere进行路由
代码示例(使用Spring Boot + MyBatis实现读写分离):
// 1. 配置多数据源
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSource routingDataSource() {
DynamicDataSource routingDataSource = new DynamicDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave", slaveDataSource());
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
// 2. 自定义路由策略
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSourceType();
}
}
// 3. 数据源上下文管理器
public class DataSourceContextHolder {
private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static void setDataSourceType(String dataSourceType) {
contextHolder.set(dataSourceType);
}
public static String getDataSourceType() {
return contextHolder.get();
}
public static void clearDataSourceType() {
contextHolder.remove();
}
}
// 4. AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
@Around("@annotation(readOnly)")
public Object switchDataSource(ProceedingJoinPoint joinPoint, ReadOnly readOnly) throws Throwable {
try {
DataSourceContextHolder.setDataSourceType("slave");
return joinPoint.proceed();
} finally {
DataSourceContextHolder.clearDataSourceType();
}
}
@Around("@annotation(writeOnly)")
public Object switchToMaster(ProceedingJoinPoint joinPoint, WriteOnly writeOnly) throws Throwable {
try {
DataSourceContextHolder.setDataSourceType("master");
return joinPoint.proceed();
} finally {
DataSourceContextHolder.clearDataSourceType();
}
}
}
// 5. 自定义注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface WriteOnly {
}
// 6. 业务层使用示例
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@ReadOnly
public List<Order> getOrdersByUserId(Long userId) {
return orderMapper.selectByUserId(userId);
}
@WriteOnly
public void createOrder(Order order) {
orderMapper.insert(order);
}
}
性能对比:
- 单库架构:QPS约2000-3000
- 读写分离架构:QPS可提升至8000-10000(读操作占70%时)
1.2 分库分表策略
适用场景:单表数据量超过1000万行,或单库连接数达到上限。
分片策略:
- 水平分表:按用户ID、时间等维度拆分
- 垂直分表:按业务字段拆分(如订单表拆分为订单基本信息表和订单详情表)
代码示例(使用ShardingSphere实现分库分表):
# sharding.yaml配置文件
dataSources:
ds_0: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_0
username: root
password: 123456
ds_1: !!com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/db_1
username: root
password: 123456
shardingRule:
tables:
order:
actualDataNodes: ds_${0..1}.order_${0..3}
tableStrategy:
standard:
shardingColumn: order_id
preciseAlgorithmClassName: com.example.OrderTableShardingAlgorithm
databaseStrategy:
standard:
shardingColumn: user_id
preciseAlgorithmClassName: com.example.OrderDatabaseShardingAlgorithm
bindingTables:
- order
defaultDatabaseStrategy:
none:
defaultTableStrategy:
none:
props:
sql.show: true
// 分片算法实现
public class OrderDatabaseShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
int hash = userId.hashCode() % 2;
return "ds_" + hash;
}
}
public class OrderTableShardingAlgorithm implements PreciseShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long orderId = shardingValue.getValue();
int hash = Math.abs(orderId.hashCode()) % 4;
return "order_" + hash;
}
}
1.3 缓存层设计
Redis缓存策略:
- 热点数据缓存
- 缓存穿透防护
- 缓存雪崩预防
代码示例(Spring Cache + Redis):
// 1. 配置Redis缓存
@Configuration
@EnableCaching
public class RedisCacheConfig {
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(10))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}
// 2. 缓存服务层
@Service
public class ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 缓存穿透防护:使用空值缓存
@Cacheable(value = "products", key = "#productId", unless = "#result == null")
public Product getProductById(Long productId) {
Product product = productMapper.selectById(productId);
if (product == null) {
// 缓存空值,防止缓存穿透
redisTemplate.opsForValue().set("product:" + productId, "", Duration.ofMinutes(5));
return null;
}
return product;
}
// 缓存更新策略
@CachePut(value = "products", key = "#product.id")
public Product updateProduct(Product product) {
productMapper.updateById(product);
return product;
}
// 批量查询优化
public List<Product> getProductsByIds(List<Long> ids) {
// 1. 先查缓存
List<Product> result = new ArrayList<>();
List<Long> missIds = new ArrayList<>();
for (Long id : ids) {
String key = "product:" + id;
Product product = (Product) redisTemplate.opsForValue().get(key);
if (product != null) {
result.add(product);
} else {
missIds.add(id);
}
}
// 2. 批量查询数据库
if (!missIds.isEmpty()) {
List<Product> dbProducts = productMapper.selectBatchIds(missIds);
// 3. 写入缓存
for (Product product : dbProducts) {
redisTemplate.opsForValue().set("product:" + product.getId(), product, Duration.ofMinutes(10));
result.add(product);
}
}
return result;
}
}
二、MySQL数据库配置优化
2.1 连接池配置
HikariCP配置示例:
# application.yml
spring:
datasource:
hikari:
# 连接池大小
maximum-pool-size: 20
minimum-idle: 10
# 连接超时
connection-timeout: 30000
# 空闲连接存活时间
idle-timeout: 600000
# 连接最大生命周期
max-lifetime: 1800000
# 连接测试查询
connection-test-query: SELECT 1
# 连接预热
initialization-fail-timeout: 1
# 连接泄漏检测
leak-detection-threshold: 60000
连接池大小计算公式:
连接数 = (核心数 × 2) + 有效磁盘数
例如:4核CPU + 2块SSD = 4×2 + 2 = 10个连接
2.2 MySQL服务器参数调优
my.cnf关键配置:
[mysqld]
# 内存配置
innodb_buffer_pool_size = 12G # 通常设置为物理内存的70-80%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,建议4-8
# 日志配置
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2 # 1-性能,2-安全
# 连接配置
max_connections = 500
thread_cache_size = 50
table_open_cache = 2000
# 查询缓存(MySQL 8.0已移除)
# query_cache_type = 0
# query_cache_size = 0
# InnoDB配置
innodb_flush_method = O_DIRECT
innodb_file_per_table = ON
innodb_flush_neighbors = 0 # SSD设置为0
innodb_io_capacity = 2000 # SSD设置为2000-4000
# 临时表配置
tmp_table_size = 256M
max_heap_table_size = 256M
# 排序缓冲区
sort_buffer_size = 4M
join_buffer_size = 4M
# 索引缓冲区
key_buffer_size = 256M
# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1
2.3 监控与诊断
使用Performance Schema监控:
-- 查看活跃连接
SELECT * FROM performance_schema.threads WHERE TYPE = 'FOREGROUND';
-- 查看锁等待
SELECT * FROM performance_schema.data_lock_waits;
-- 查看慢查询
SELECT * FROM mysql.slow_log ORDER BY query_time DESC LIMIT 10;
-- 查看InnoDB状态
SHOW ENGINE INNODB STATUS\G
使用sys schema诊断:
-- 查看最耗时的查询
SELECT * FROM sys.statements_with_temp_tables ORDER BY exec_count DESC LIMIT 10;
-- 查看未使用索引的表
SELECT * FROM sys.schema_unused_indexes;
-- 查看重复索引
SELECT * FROM sys.schema_redundant_indexes;
-- 查看锁等待详情
SELECT * FROM sys.innodb_lock_waits;
三、SQL语句优化
3.1 索引优化策略
索引设计原则:
- 最左前缀原则
- 覆盖索引
- 索引下推
- 避免索引失效
代码示例(索引优化前后对比):
-- 优化前:全表扫描
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 优化后:使用索引
-- 1. 创建索引
CREATE INDEX idx_create_time ON orders(create_time);
-- 2. 改写查询
SELECT * FROM orders
WHERE create_time >= '2024-01-01 00:00:00'
AND create_time < '2024-01-02 00:00:00';
-- 复合索引示例
-- 创建复合索引
CREATE INDEX idx_user_status_time ON orders(user_id, status, create_time);
-- 有效查询(使用最左前缀)
SELECT * FROM orders WHERE user_id = 123;
SELECT * FROM orders WHERE user_id = 123 AND status = 1;
SELECT * FROM orders WHERE user_id = 123 AND status = 1 AND create_time > '2024-01-01';
-- 无效查询(跳过user_id)
SELECT * FROM orders WHERE status = 1;
SELECT * FROM orders WHERE create_time > '2024-01-01';
-- 覆盖索引示例
-- 创建覆盖索引
CREATE INDEX idx_cover ON orders(user_id, status, create_time, amount);
-- 查询只需扫描索引,无需回表
SELECT user_id, status, create_time, amount
FROM orders
WHERE user_id = 123 AND status = 1;
3.2 查询语句优化
*避免SELECT **:
-- 优化前:全表扫描,返回所有字段
SELECT * FROM users WHERE age > 18;
-- 优化后:只查询需要的字段,使用索引
SELECT id, name, email FROM users WHERE age > 18;
-- 使用EXPLAIN分析
EXPLAIN SELECT id, name, email FROM users WHERE age > 18;
JOIN优化:
-- 优化前:嵌套循环连接
SELECT o.*, u.name
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.create_time > '2024-01-01';
-- 优化后:使用索引和合适的连接顺序
-- 1. 确保连接字段有索引
CREATE INDEX idx_user_id ON orders(user_id);
CREATE INDEX idx_id ON users(id);
-- 2. 调整连接顺序(小表驱动大表)
SELECT o.*, u.name
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE u.create_time > '2024-01-01';
子查询优化:
-- 优化前:相关子查询,性能差
SELECT * FROM orders o
WHERE o.user_id IN (
SELECT id FROM users WHERE status = 1
);
-- 优化后:使用JOIN或EXISTS
-- 方法1:使用JOIN
SELECT o.*
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.status = 1;
-- 方法2:使用EXISTS(当users表很大时)
SELECT * FROM orders o
WHERE EXISTS (
SELECT 1 FROM users u
WHERE u.id = o.user_id AND u.status = 1
);
3.3 批量操作优化
批量插入优化:
// 优化前:逐条插入,性能差
public void insertOrders(List<Order> orders) {
for (Order order : orders) {
orderMapper.insert(order); // 每次都提交事务
}
}
// 优化后:批量插入,减少网络开销
public void insertOrdersBatch(List<Order> orders) {
// 1. 使用MyBatis批量插入
orderMapper.insertBatch(orders);
// 2. 或者使用JDBC批量
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"INSERT INTO orders (user_id, amount, status) VALUES (?, ?, ?)")) {
conn.setAutoCommit(false);
int batchSize = 1000;
for (int i = 0; i < orders.size(); i++) {
Order order = orders.get(i);
ps.setLong(1, order.getUserId());
ps.setBigDecimal(2, order.getAmount());
ps.setInt(3, order.getStatus());
ps.addBatch();
if ((i + 1) % batchSize == 0) {
ps.executeBatch();
conn.commit();
}
}
// 提交剩余批次
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
MyBatis批量插入XML配置:
<!-- 批量插入 -->
<insert id="insertBatch" parameterType="list">
INSERT INTO orders (user_id, amount, status, create_time) VALUES
<foreach collection="list" item="order" separator=",">
(#{order.userId}, #{order.amount}, #{order.status}, #{order.createTime})
</foreach>
</insert>
四、事务与锁优化
4.1 事务隔离级别选择
隔离级别对比:
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 |
|---|---|---|---|---|
| READ UNCOMMITTED | 可能 | 可能 | 可能 | 最高 |
| READ COMMITTED | 不可能 | 可能 | 可能 | 高 |
| REPEATABLE READ | 不可能 | 不可能 | 可能 | 中 |
| SERIALIZABLE | 不可能 | 不可能 | 不可能 | 低 |
代码示例(事务隔离级别设置):
// Spring Boot事务配置
@Configuration
public class TransactionConfig {
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}
}
// 业务层使用
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductMapper productMapper;
// 使用READ COMMITTED级别(适合高并发场景)
@Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRED)
public void createOrderWithStockCheck(Long productId, Integer quantity) {
// 1. 检查库存
Product product = productMapper.selectById(productId);
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
// 2. 扣减库存
int updated = productMapper.decreaseStock(productId, quantity);
if (updated == 0) {
throw new RuntimeException("库存扣减失败");
}
// 3. 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setAmount(product.getPrice().multiply(BigDecimal.valueOf(quantity)));
orderMapper.insert(order);
}
}
4.2 锁优化策略
行锁优化:
-- 优化前:全表锁定
UPDATE products SET stock = stock - 1 WHERE id = 123;
-- 优化后:使用索引,只锁定行
-- 确保WHERE条件使用索引
CREATE INDEX idx_id ON products(id);
-- 使用SELECT ... FOR UPDATE(悲观锁)
BEGIN;
SELECT stock FROM products WHERE id = 123 FOR UPDATE;
-- 业务逻辑处理
UPDATE products SET stock = stock - 1 WHERE id = 123;
COMMIT;
-- 使用乐观锁(版本号机制)
-- 1. 表结构添加版本号字段
ALTER TABLE products ADD COLUMN version INT DEFAULT 0;
-- 2. 更新时检查版本号
UPDATE products
SET stock = stock - 1, version = version + 1
WHERE id = 123 AND version = #{oldVersion};
-- 3. Java代码实现
public boolean decreaseStockWithOptimisticLock(Long productId, Integer quantity, Integer oldVersion) {
int updated = productMapper.decreaseStockWithVersion(productId, quantity, oldVersion);
return updated > 0;
}
死锁预防:
-- 死锁场景:两个事务交叉锁定
-- 事务A:锁定行1,尝试锁定行2
-- 事务B:锁定行2,尝试锁定行1
-- 预防策略:
-- 1. 固定加锁顺序
BEGIN;
-- 先锁定id小的行
SELECT * FROM orders WHERE id < 100 FOR UPDATE;
SELECT * FROM orders WHERE id > 100 FOR UPDATE;
-- 业务处理
COMMIT;
-- 2. 使用SELECT ... FOR UPDATE SKIP LOCKED(MySQL 8.0+)
BEGIN;
-- 跳过已锁定的行
SELECT * FROM orders WHERE status = 1 FOR UPDATE SKIP LOCKED;
-- 处理未锁定的行
COMMIT;
4.3 长事务优化
长事务问题:
- 锁持有时间长
- 回滚段压力大
- 影响并发性能
优化方案:
// 优化前:长事务
@Transactional
public void processLargeBatch() {
// 1. 查询大量数据
List<Order> orders = orderMapper.selectAll();
// 2. 业务处理(耗时操作)
for (Order order : orders) {
// 调用外部API
callExternalAPI(order);
// 复杂计算
calculateComplexData(order);
}
// 3. 更新数据库
orderMapper.updateBatch(orders);
}
// 优化后:拆分事务
public void processLargeBatchOptimized() {
// 1. 分批查询
int batchSize = 100;
int page = 0;
while (true) {
List<Order> orders = orderMapper.selectByPage(page, batchSize);
if (orders.isEmpty()) {
break;
}
// 2. 每批独立事务
processBatch(orders);
page++;
}
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void processBatch(List<Order> orders) {
// 业务处理
for (Order order : orders) {
callExternalAPI(order);
calculateComplexData(order);
}
// 更新数据库
orderMapper.updateBatch(orders);
}
五、应用层优化策略
5.1 连接池优化
HikariCP高级配置:
@Configuration
public class HikariConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariDataSource dataSource = new HikariDataSource();
// 连接池大小动态调整
dataSource.setMaximumPoolSize(calculateOptimalPoolSize());
// 连接预热
dataSource.setInitializationFailTimeout(1);
// 连接泄漏检测
dataSource.setLeakDetectionThreshold(60000);
// 连接测试
dataSource.setConnectionTestQuery("SELECT 1");
return dataSource;
}
private int calculateOptimalPoolSize() {
// 根据CPU核心数和磁盘数计算
int cores = Runtime.getRuntime().availableProcessors();
int disks = 2; // 假设有2块SSD
return cores * 2 + disks;
}
}
5.2 异步处理
使用CompletableFuture异步化:
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductMapper productMapper;
@Autowired
private ExecutorService executorService;
// 异步创建订单
public CompletableFuture<Order> createOrderAsync(Long userId, Long productId, Integer quantity) {
return CompletableFuture.supplyAsync(() -> {
// 1. 检查库存(同步)
Product product = productMapper.selectById(productId);
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
// 2. 扣减库存(同步)
int updated = productMapper.decreaseStock(productId, quantity);
if (updated == 0) {
throw new RuntimeException("库存扣减失败");
}
// 3. 创建订单(同步)
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setAmount(product.getPrice().multiply(BigDecimal.valueOf(quantity)));
orderMapper.insert(order);
return order;
}, executorService);
}
// 批量异步处理
public List<CompletableFuture<Order>> createOrdersBatch(List<OrderRequest> requests) {
return requests.stream()
.map(request -> createOrderAsync(request.getUserId(), request.getProductId(), request.getQuantity()))
.collect(Collectors.toList());
}
// 等待所有异步任务完成
public List<Order> waitForAllOrders(List<CompletableFuture<Order>> futures) {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
return allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
).join();
}
}
5.3 批量处理优化
使用Spring Batch处理大数据量:
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Bean
public Job importOrdersJob(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("importOrdersJob")
.incrementer(new RunIdIncrementer())
.flow(importOrdersStep(stepBuilderFactory))
.end()
.build();
}
@Bean
public Step importOrdersStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("importOrdersStep")
.<Order, Order>chunk(1000)
.reader(orderReader())
.processor(orderProcessor())
.writer(orderWriter())
.build();
}
@Bean
public ItemReader<Order> orderReader() {
// 从文件或数据库读取
return new JdbcCursorItemReader<Order>() {{
setSql("SELECT * FROM orders WHERE status = 0");
setRowMapper(new BeanPropertyRowMapper<>(Order.class));
}};
}
@Bean
public ItemProcessor<Order, Order> orderProcessor() {
return order -> {
// 业务处理
order.setStatus(1);
order.setProcessedTime(new Date());
return order;
};
}
@Bean
public ItemWriter<Order> orderWriter() {
return orders -> {
// 批量更新
orderMapper.updateBatch(orders);
};
}
}
六、监控与调优实战
6.1 性能监控体系
Prometheus + Grafana监控方案:
# prometheus.yml配置
scrape_configs:
- job_name: 'mysql'
static_configs:
- targets: ['localhost:9104']
metrics_path: '/metrics'
- job_name: 'application'
static_configs:
- targets: ['localhost:8080']
自定义监控指标(Spring Boot Actuator):
@Component
public class DatabaseMetrics {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private DataSource dataSource;
@PostConstruct
public void init() {
// 监控连接池使用情况
Gauge.builder("hikari.connections.active", dataSource,
ds -> ((HikariDataSource) ds).getHikariPoolMXBean().getActiveConnections())
.description("Active database connections")
.register(meterRegistry);
// 监控查询性能
Counter.builder("database.queries.total")
.description("Total database queries")
.register(meterRegistry);
}
public void recordQueryTime(long timeMs) {
Timer.builder("database.query.time")
.description("Query execution time")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry)
.record(timeMs, TimeUnit.MILLISECONDS);
}
}
6.2 慢查询分析
慢查询日志分析工具:
# 1. 开启慢查询日志
mysql> SET GLOBAL slow_query_log = 1;
mysql> SET GLOBAL long_query_time = 1;
mysql> SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
# 2. 使用pt-query-digest分析
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 3. 使用mysqldumpslow
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log
慢查询优化案例:
-- 慢查询示例
SELECT * FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.status = 1
AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC
LIMIT 1000;
-- 优化步骤:
-- 1. 使用EXPLAIN分析
EXPLAIN SELECT * FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.status = 1
AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC
LIMIT 1000;
-- 2. 添加索引
CREATE INDEX idx_user_status ON users(status);
CREATE INDEX idx_create_time ON orders(create_time);
CREATE INDEX idx_user_id ON orders(user_id);
-- 3. 优化查询(只查询需要的字段)
SELECT o.id, o.amount, o.create_time, u.name
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE u.status = 1
AND o.create_time > '2024-01-01'
ORDER BY o.create_time DESC
LIMIT 1000;
6.3 压力测试
使用JMeter进行压力测试:
<!-- JMeter测试计划示例 -->
<TestPlan>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="高并发测试">
<stringProp name="ThreadGroup.num_threads">1000</stringProp>
<stringProp name="ThreadGroup.ramp_time">60</stringProp>
<stringProp name="ThreadGroup.duration">300</stringProp>
</ThreadGroup>
<HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="查询订单">
<stringProp name="HTTPSampler.domain">localhost</stringProp>
<stringProp name="HTTPSampler.port">8080</stringProp>
<stringProp name="HTTPSampler.path">/api/orders/123</stringProp>
<stringProp name="HTTPSampler.method">GET</stringProp>
</HTTPSamplerProxy>
</TestPlan>
测试结果分析:
- 响应时间:P95 < 200ms
- 吞吐量:QPS > 5000
- 错误率:< 0.1%
七、实战案例:电商秒杀系统
7.1 架构设计
用户请求 → Nginx → 应用服务 → Redis缓存 → MySQL数据库
7.2 核心代码实现
@Service
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private ProductMapper productMapper;
// 秒杀商品库存预热
@PostConstruct
public void preloadStock() {
List<Product> products = productMapper.selectAll();
for (Product product : products) {
redisTemplate.opsForValue().set("seckill:stock:" + product.getId(),
product.getStock(),
Duration.ofHours(2));
}
}
// 秒杀下单
@Transactional
public Long seckill(Long productId, Long userId) {
// 1. 预减库存(Redis原子操作)
Long stock = redisTemplate.opsForValue().decrement("seckill:stock:" + productId);
if (stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment("seckill:stock:" + productId);
throw new RuntimeException("秒杀失败,库存不足");
}
// 2. 检查是否已秒杀
String key = "seckill:user:" + productId + ":" + userId;
if (redisTemplate.hasKey(key)) {
throw new RuntimeException("您已参与过秒杀");
}
// 3. 创建订单(异步)
createOrderAsync(productId, userId);
// 4. 标记已秒杀
redisTemplate.opsForValue().set(key, 1, Duration.ofHours(2));
return System.currentTimeMillis();
}
// 异步创建订单
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void createOrderAsync(Long productId, Long userId) {
// 从Redis获取库存
Long stock = (Long) redisTemplate.opsForValue().get("seckill:stock:" + productId);
// 扣减数据库库存
int updated = productMapper.decreaseStock(productId, 1);
if (updated == 0) {
// 库存不足,回滚Redis
redisTemplate.opsForValue().increment("seckill:stock:" + productId);
return;
}
// 创建订单
Order order = new Order();
order.setProductId(productId);
order.setUserId(userId);
order.setQuantity(1);
order.setStatus(1);
order.setCreateTime(new Date());
orderMapper.insert(order);
// 发送消息到MQ,后续处理(如支付、物流等)
sendOrderMessage(order);
}
// 库存同步任务
@Scheduled(fixedRate = 5000)
public void syncStock() {
// 定期将Redis库存同步到数据库
Set<String> keys = redisTemplate.keys("seckill:stock:*");
for (String key : keys) {
Long productId = Long.parseLong(key.split(":")[2]);
Long redisStock = (Long) redisTemplate.opsForValue().get(key);
Product product = productMapper.selectById(productId);
if (product != null && !product.getStock().equals(redisStock)) {
productMapper.updateStock(productId, redisStock.intValue());
}
}
}
}
7.3 性能优化点
- Redis原子操作:使用DECR/INCR保证库存扣减原子性
- 异步下单:将订单创建异步化,提升响应速度
- 库存预热:提前将库存加载到Redis
- 限流措施:使用Redis+Lua脚本实现限流
-- 限流Lua脚本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local expire = tonumber(ARGV[2])
local current = redis.call('GET', key)
if current and tonumber(current) >= limit then
return 0
else
redis.call('INCR', key)
if tonumber(current) == 0 then
redis.call('EXPIRE', key, expire)
end
return 1
end
八、总结与最佳实践
8.1 高并发处理黄金法则
- 缓存为王:80%的请求应该被缓存拦截
- 读写分离:减轻主库压力
- 异步处理:提升响应速度
- 限流降级:保护系统稳定性
- 监控告警:及时发现问题
8.2 调优检查清单
- [ ] 数据库连接池大小是否合理?
- [ ] 索引是否覆盖查询?
- [ ] 是否存在慢查询?
- [ ] 事务是否过长?
- [ ] 是否有缓存穿透/雪崩风险?
- [ ] 是否有合适的限流策略?
- [ ] 监控体系是否完善?
8.3 持续优化建议
- 定期分析慢查询日志
- 监控数据库性能指标
- 压力测试验证优化效果
- 保持数据库结构简洁
- 定期清理历史数据
通过以上策略的综合应用,可以显著提升MySQL在高并发场景下的性能表现。记住,优化是一个持续的过程,需要根据业务特点和系统负载不断调整和改进。
