引言:理解高并发环境下的MySQL挑战
在当今互联网应用中,高并发访问已成为常态。无论是电商平台的秒杀活动、社交媒体的热点事件,还是金融系统的交易高峰期,MySQL数据库都面临着海量用户同时访问的巨大压力。高并发场景下,数据库性能瓶颈往往成为系统整体吞吐量的制约因素。本文将深入探讨MySQL在高并发环境下的处理策略,从架构设计到具体优化技巧,帮助您构建能够应对海量用户挑战的数据库系统。
高并发对MySQL的主要挑战包括:
- 连接数激增:大量并发连接导致资源耗尽
- 锁竞争:行锁、表锁导致的等待和死锁
- I/O瓶颈:磁盘读写速度跟不上请求速度
- CPU过载:复杂查询和排序消耗大量CPU资源
- 内存不足:缓冲池无法容纳热点数据
一、MySQL高并发架构优化策略
1.1 读写分离架构设计
读写分离是应对高并发读操作的最有效策略之一。通过将读请求分发到从库,写请求发送到主库,可以显著分担主库压力。
架构示例:
主库 (Master) ←→ 写操作
↓ (复制)
从库1 (Slave1) ←→ 读操作
从库2 (Slave2) ←→ 读操作
从库3 (Slave3) ←→ 读操作
实现方式:
- 应用层路由:在应用程序中根据SQL类型选择数据源
- 中间件路由:使用ShardingSphere、MyCat等中间件自动路由
代码示例(Spring Boot + ShardingSphere):
# application.yml
spring:
shardingsphere:
datasource:
names: master,slave0,slave1
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://master-host:3306/db
username: root
password: password
slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave0-host:3306/db
username: root
password: password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://slave1-host:3306/db
username: root
password: password
rules:
replica-query:
data-sources:
ds0:
primary-data-source-name: master
replica-data-source-names: slave0,slave1
load-balance-algorithm-type: ROUND_ROBIN
1.2 分库分表策略
当单表数据量超过千万级或并发量极高时,分库分表成为必要选择。
水平分表示例:
-- 原始订单表
CREATE TABLE order_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT,
amount DECIMAL(10,2),
create_time DATETIME,
INDEX idx_user_id (user_id)
);
-- 按user_id取模分表
CREATE TABLE order_1 LIKE order_0;
CREATE TABLE order_2 LIKE order_0;
CREATE TABLE order_3 LIKE order_0;
分库分表中间件配置(ShardingSphere):
spring:
shardingsphere:
rules:
sharding:
tables:
order:
actual-data-nodes: ds${0..1}.order_${0..3}
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-inline
sharding-algorithms:
order-table-inline:
type: MOD
props:
sharding-count: 4
order-db-inline:
type: MOD
props:
sharding-count: 2
1.3 连接池优化
连接池是应用与数据库之间的缓冲层,合理配置连接池对高并发至关重要。
HikariCP配置示例:
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/db");
config.setUsername("root");
config.setPassword("password");
// 核心配置参数
config.setMaximumPoolSize(50); // 最大连接数
config.setMinimumIdle(10); // 最小空闲连接
config.setConnectionTimeout(30000); // 连接超时30秒
config.setIdleTimeout(600000); // 空闲超时10分钟
config.setMaxLifetime(1800000); // 连接最大存活时间30分钟
config.setLeakDetectionThreshold(60000); // 泄漏检测阈值60秒
// 高并发优化参数
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
HikariDataSource dataSource = new HikariDataSource(config);
二、MySQL配置参数优化
2.1 InnoDB引擎核心参数优化
InnoDB是MySQL的默认存储引擎,其配置直接影响高并发性能。
关键参数配置:
# my.cnf 或 my.ini
[mysqld]
# 连接相关
max_connections = 2000
max_user_connections = 1800
thread_cache_size = 100
# InnoDB缓冲池(最重要的参数)
innodb_buffer_pool_size = 16G # 通常设置为物理内存的50-70%
innodb_buffer_pool_instances = 16 # 缓冲池实例数,减少竞争
# 日志文件
innodb_log_file_size = 2G
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2 # 高并发下可设为2,性能优先
# I/O相关
innodb_flush_method = O_DIRECT
innodb_io_capacity = 2000
innodb_io_capacity_max = 4000
# 锁和事务
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = OFF
# 并发控制
innodb_thread_concurrency = 0 # 0表示不限制
innodb_read_io_threads = 8
innodb_write_io_threads = 8
# 查询缓存(MySQL 8.0已移除,5.7及之前版本)
query_cache_type = 0
query_cache_size = 0
# 临时表
tmp_table_size = 256M
max_heap_table_size = 256M
# 排序缓冲区
sort_buffer_size = 4M
join_buffer_size = 4M
# 批量插入
bulk_insert_buffer_size = 256M
# 文件打开数
open_files_limit = 65535
table_open_cache = 2000
table_definition_cache = 1400
2.2 MySQL 8.0新特性优化
MySQL 8.0引入了多项高并发优化特性:
1. 降序索引:
-- 创建降序索引,优化ORDER BY DESC
CREATE TABLE user_scores (
user_id BIGINT,
score INT,
create_time DATETIME,
INDEX idx_score_desc (score DESC)
);
-- 查询优化
SELECT * FROM user_scores ORDER BY score DESC LIMIT 100;
2. 直接DDL支持:
-- MySQL 8.0支持在线DDL,无需锁表
ALTER TABLE user_scores ADD COLUMN level INT DEFAULT 1, ALGORITHM=INPLACE, LOCK=NONE;
3. 隐藏索引:
-- 测试删除索引前可先隐藏
ALTER TABLE user_scores ALTER INDEX idx_score_desc INVISIBLE;
-- 确认无影响后再删除
ALTER TABLE user_scores DROP INDEX idx_score_desc;
三、SQL语句优化策略
3.1 索引优化技巧
1. 覆盖索引(Covering Index):
-- 原始查询(回表操作)
SELECT user_id, username, email FROM users WHERE username = 'john';
-- 优化:创建覆盖索引
CREATE INDEX idx_username_email ON users(username, email);
-- 查询改为只查索引列
SELECT user_id, username, email FROM users WHERE username = 'john';
2. 最左前缀原则:
-- 复合索引
CREATE INDEX idx_a_b_c ON table_name(a, b, c);
-- 有效查询
SELECT * FROM table_name WHERE a = 1;
SELECT * FROM table_name WHERE a = 1 AND b = 2;
SELECT * FROM table_name WHERE a = 1 AND b = 2 AND c = 3;
-- 无效查询(无法使用索引)
SELECT * FROM table_name WHERE b = 2;
SELECT * FROM table_name WHERE c = 3;
SELECT * FROM table_name WHERE b = 2 AND c = 3;
3. 索引下推(ICP):
-- MySQL 5.6+ 自动启用索引下推
-- 查询条件:a=1 AND b LIKE 'test%'
CREATE INDEX idx_a_b ON table_name(a, b);
-- 索引下推会先在索引层过滤b,减少回表次数
SELECT * FROM table_name WHERE a = 1 AND b LIKE 'test%';
3.2 避免索引失效的常见场景
1. 隐式类型转换:
-- 错误:phone字段是varchar,传入数字导致索引失效
SELECT * FROM users WHERE phone = 13800138000;
-- 正确
SELECT * FROM users WHERE phone = '13800138000';
2. 函数操作索引列:
-- 错误:索引失效
SELECT * FROM orders WHERE DATE(create_time) = '2024-01-01';
-- 正确:使用范围查询
SELECT * FROM orders WHERE create_time >= '2024-01-01 00:00:00'
AND create_time < '2024-01-02 00:00:00';
3. LIKE查询以通配符开头:
-- 错误:索引失效
SELECT * FROM users WHERE email LIKE '%@gmail.com';
-- 正确:如果必须模糊查询,考虑全文索引
ALTER TABLE users ADD FULLTEXT INDEX ft_email (email);
SELECT * FROM users WHERE MATCH(email) AGAINST('@gmail.com');
3.3 批量操作优化
批量插入优化:
// 原始方式:逐条插入(性能差)
for (Order order : orders) {
jdbcTemplate.update("INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?)",
order.getId(), order.getUserId(), order.getAmount());
}
// 优化方式:批量插入
String sql = "INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Order order = orders.get(i);
ps.setLong(1, order.getId());
ps.setLong(2, order.getUserId());
ps.setBigDecimal(3, order.getAmount());
}
@Override
public int getBatchSize() {
return orders.size();
}
});
// 进一步优化:使用rewriteBatchedStatements参数
// 在JDBC URL中添加:?rewriteBatchedStatements=true
// 然后使用以下方式:
String sql = "INSERT INTO order_0 (id, user_id, amount) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?)";
// 手动拼接values,每批1000条
批量更新优化:
-- 原始方式:多条UPDATE
UPDATE order_0 SET status = 'PAID' WHERE id = 1;
UPDATE order_0 SET status = 'PAID' WHERE id = 2;
UPDATE order_0 SET status = 'PAID' WHERE id = 3;
-- 优化方式:CASE WHEN批量更新
UPDATE order_0
SET status = CASE id
WHEN 1 THEN 'PAID'
WHEN 2 THEN 'PAID'
WHEN 3 THEN 'PAID'
END
WHERE id IN (1, 2, 3);
3.4 避免大事务和长事务
大事务问题:
-- 错误:大事务导致锁持有时间过长
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
-- ... 可能还有1000条更新
COMMIT;
-- 优化:拆分为小事务
BEGIN;
UPDATE account SET balance = balance - 100 WHERE user_id = 1;
COMMIT;
BEGIN;
UPDATE account SET balance = balance + 100 WHERE user_id = 2;
COMMIT;
-- ... 循环处理
长事务监控:
-- 查找执行时间超过60秒的事务
SELECT * FROM information_schema.processlist
WHERE time > 60 AND command != 'Sleep';
-- 查找未提交的事务
SELECT * FROM information_schema.innodb_trx
WHERE trx_started < NOW() - INTERVAL 60 SECOND;
四、缓存策略与数据预热
4.1 多级缓存架构
架构设计:
用户请求 → CDN → Nginx缓存 → 应用缓存(Redis) → 数据库
Redis缓存示例:
@Service
public class UserService {
private static final String USER_CACHE_PREFIX = "user:";
private static final long CACHE_TTL = 3600; // 1小时
@Autowired
private UserRepository userRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
// 1. 先从缓存获取
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 2. 缓存未命中,查询数据库
user = userRepository.findById(id).orElse(null);
if (user != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(key, user, CACHE_TTL, TimeUnit.SECONDS);
}
return user;
}
public void updateUser(User user) {
// 更新数据库
userRepository.save(user);
// 删除缓存(避免脏数据)
String key = USER_CACHE_PREFIX + user.getId();
redisTemplate.delete(key);
}
}
缓存穿透防护:
public User getUserById(Long id) {
String key = USER_CACHE_PREFIX + id;
// 1. 查询缓存
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 2. 缓存空值防止穿透
String nullKey = key + ":null";
if (Boolean.TRUE.equals(redisTemplate.hasKey(nullKey))) {
return null;
}
// 3. 查询数据库
user = userRepository.findById(id).orElse(null);
if (user != null) {
redisTemplate.opsForValue().set(key, user, CACHE_TTL, TimeUnit.SECONDS);
} else {
// 缓存空值,TTL设为较短时间
redisTemplate.opsForValue().set(nullKey, "", 60, TimeUnit.SECONDS);
}
return user;
}
4.2 缓存预热
预热脚本示例:
#!/usr/bin/env python3
import redis
import mysql.connector
import json
def preload_hot_data():
# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 连接MySQL
db = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="business"
)
cursor = db.cursor(dictionary=True)
# 预热热点数据(如热门商品)
cursor.execute("""
SELECT product_id, name, price, stock
FROM products
WHERE is_hot = 1 AND stock > 0
""")
for product in cursor.fetchall():
key = f"product:{product['product_id']}"
redis_client.setex(
key,
3600, # 1小时过期
json.dumps(product)
)
cursor.close()
db.close()
if __name__ == "__main__":
preload_hot_data()
五、高并发下的锁优化
5.1 InnoDB行锁优化
1. 索引与行锁:
-- 错误:没有索引会导致表锁
UPDATE orders SET status = 'PAID' WHERE user_id = 123; -- user_id无索引
-- 正确:确保WHERE条件列有索引
CREATE INDEX idx_user_id ON orders(user_id);
UPDATE orders SET status = 'PAID' WHERE user_id = 123;
2. 间隙锁(Gap Lock)优化:
-- 范围查询可能产生间隙锁
SELECT * FROM orders WHERE id BETWEEN 1000 AND 2000 FOR UPDATE;
-- 优化:使用精确查询或调整隔离级别
-- 方案1:使用IN查询
SELECT * FROM orders WHERE id IN (1000, 1001, ..., 2000) FOR UPDATE;
-- 方案2:降低隔离级别(需评估业务风险)
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
5.2 乐观锁与悲观锁选择
乐观锁实现:
-- 表结构增加版本号字段
CREATE TABLE product (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
stock INT,
version INT DEFAULT 0
);
-- 更新时检查版本号
UPDATE product
SET stock = stock - 1, version = version + 1
WHERE id = 100 AND version = 2; -- 假设当前版本是2
-- 检查影响行数
-- 如果影响行数为0,说明版本号已变化,需要重试
Java实现乐观锁:
@Transactional
public boolean deductStock(Long productId, int quantity) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
Product product = productRepository.findById(productId);
int currentVersion = product.getVersion();
int updated = productRepository.updateStock(productId, quantity, currentVersion);
if (updated > 0) {
return true;
}
retryCount++;
try {
Thread.sleep(50); // 短暂延迟后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return false;
}
悲观锁实现:
-- 显式加锁
BEGIN;
SELECT stock FROM product WHERE id = 100 FOR UPDATE;
-- 业务逻辑处理
UPDATE product SET stock = stock - 1 WHERE id = 100;
COMMIT;
5.3 死锁检测与处理
死锁日志分析:
-- 开启死锁日志
SET GLOBAL innodb_print_all_deadlocks = ON;
-- 查看最近死锁信息
SHOW ENGINE INNODB STATUS\G
死锁预防策略:
// 1. 固定加锁顺序
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// 始终按ID顺序加锁
Long firstId = Math.min(fromId, toId);
Long secondId = Math.max(fromId, toId);
jdbcTemplate.execute(connection -> {
// 先锁第一个账户
PreparedStatement ps1 = connection.prepareStatement(
"SELECT balance FROM account WHERE id = ? FOR UPDATE");
ps1.setLong(1, firstId);
ps1.executeQuery();
// 再锁第二个账户
PreparedStatement ps2 = connection.prepareStatement(
"SELECT balance FROM account WHERE id = ? FOR UPDATE");
ps2.setLong(1, secondId);
ps2.executeQuery();
// 执行转账逻辑
// ...
return null;
});
}
六、监控与诊断工具
6.1 MySQL性能监控指标
关键指标:
- QPS/TPS:每秒查询/事务数
- 连接数:Threads_connected vs max_connections
- 缓存命中率:InnoDB Buffer Pool命中率
- 锁等待:Innodb_row_lock_waits, Innodb_row_lock_time_avg
- 慢查询:Slow_queries
监控SQL示例:
-- 查看实时性能指标
SHOW GLOBAL STATUS LIKE 'Threads_connected';
SHOW GLOBAL STATUS LIKE 'Queries';
SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read%';
-- 计算缓存命中率
-- 命中率 = (1 - Innodb_buffer_pool_reads / Innodb_buffer_pool_read_requests) * 100
SELECT
(1 - (SELECT Variable_value FROM information_schema.GLOBAL_STATUS
WHERE Variable_name = 'Innodb_buffer_pool_reads') /
(SELECT Variable_value FROM information_schema.GLOBAL_STATUS
WHERE Variable_name = 'Innodb_buffer_pool_read_requests')) * 100
AS buffer_pool_hit_rate;
6.2 慢查询日志分析
配置慢查询日志:
# my.cnf
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 # 记录超过1秒的查询
log_queries_not_using_indexes = 1
min_examined_row_limit = 1000
使用pt-query-digest分析:
# 安装Percona Toolkit
sudo apt-get install percona-toolkit
# 分析慢查询日志
pt-query-digest /var/log/mysql/slow.log > slow_report.txt
# 分析最近1小时的慢查询
pt-query-digest --since='1h' /var/log/mysql/slow.log
6.3 Performance Schema使用
启用Performance Schema:
-- 检查是否启用
SELECT * FROM performance_schema.setup_instruments
WHERE NAME LIKE '%wait/io/table/sql/handler%';
-- 启用所有监控
UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES';
诊断SQL性能:
-- 查看最耗时的SQL
SELECT
DIGEST_TEXT,
COUNT_STAR,
AVG_TIMER_WAIT/1000000000000 AS avg_seconds,
SUM_ROWS_EXAMINED
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;
-- 查看表I/O情况
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
COUNT_READ,
COUNT_WRITE,
SUM_TIMER_WAIT/1000000000000 AS total_seconds
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;
七、高并发场景实战案例
7.1 秒杀系统设计
数据库设计:
-- 秒杀商品表
CREATE TABLE seckill_product (
id BIGINT PRIMARY KEY,
name VARCHAR(200),
total_stock INT,
start_time DATETIME,
end_time DATETIME,
version INT DEFAULT 0
);
-- 秒杀订单表
CREATE TABLE seckill_order (
id BIGINT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
quantity INT,
create_time DATETIME,
UNIQUE KEY uk_user_product (user_id, product_id)
);
-- 库存扣减表(独立出来减少锁竞争)
CREATE TABLE seckill_stock (
product_id BIGINT PRIMARY KEY,
stock INT,
version INT
);
秒杀服务实现:
@Service
public class SeckillService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String STOCK_KEY = "seckill:stock:";
private static final String ORDER_KEY = "seckill:order:";
/**
* 秒杀下单
*/
@Transactional
public boolean seckill(Long userId, Long productId, int quantity) {
// 1. 预扣库存(Redis预减)
String stockKey = STOCK_KEY + productId;
Long stock = redisTemplate.opsForValue().decrement(stockKey, quantity);
if (stock < 0) {
// 库存不足,回滚
redisTemplate.opsForValue().increment(stockKey, quantity);
return false;
}
// 2. 生成订单(异步写入数据库)
String orderKey = ORDER_KEY + userId + ":" + productId;
if (Boolean.TRUE.equals(redisTemplate.hasKey(orderKey))) {
// 重复下单
redisTemplate.opsForValue().increment(stockKey, quantity);
return false;
}
// 3. 发送MQ异步创建订单
OrderMessage message = new OrderMessage(userId, productId, quantity);
mqProducer.send("seckill-order", message);
// 4. 标记已下单
redisTemplate.opsForValue().set(orderKey, 1, 30, TimeUnit.MINUTES);
return true;
}
/**
* 异步创建数据库订单
*/
@Transactional
public void createOrder(OrderMessage message) {
// 检查数据库库存(最终一致性)
Integer stock = jdbcTemplate.queryForObject(
"SELECT stock FROM seckill_stock WHERE product_id = ?",
Integer.class, message.getProductId());
if (stock == null || stock < message.getQuantity()) {
// 库存不足,回滚Redis
String stockKey = STOCK_KEY + message.getProductId();
redisTemplate.opsForValue().increment(stockKey, message.getQuantity());
return;
}
// 扣减数据库库存
int updated = jdbcTemplate.update(
"UPDATE seckill_stock SET stock = stock - ?, version = version + 1 " +
"WHERE product_id = ? AND stock >= ?",
message.getQuantity(), message.getProductId(), message.getQuantity());
if (updated == 0) {
// 库存不足,回滚
String stockKey = STOCK_KEY + message.getProductId();
redisTemplate.opsForValue().increment(stockKey, message.getQuantity());
return;
}
// 创建订单
jdbcTemplate.update(
"INSERT INTO seckill_order (id, user_id, product_id, quantity, create_time) " +
"VALUES (?, ?, ?, ?, NOW())",
generateOrderId(), message.getUserId(), message.getProductId(), message.getQuantity());
}
}
库存预热脚本:
#!/usr/bin/env python3
import redis
def preload_stock():
r = redis.Redis(host='localhost', port=6379, db=0)
# 从MySQL加载秒杀商品库存到Redis
# 这里模拟1000个商品,每个100库存
for product_id in range(1, 1001):
stock_key = f"seckill:stock:{product_id}"
r.set(stock_key, 100)
r.expire(stock_key, 3600) # 1小时过期
if __name__ == "__main__":
preload_stock()
7.2 社交媒体Feed流优化
数据库设计:
-- 用户表
CREATE TABLE users (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
avatar VARCHAR(200)
);
-- 帖子表(分表)
CREATE TABLE posts_0 (
id BIGINT PRIMARY KEY,
user_id BIGINT,
content TEXT,
create_time DATETIME,
INDEX idx_user_time (user_id, create_time DESC)
);
-- posts_1, posts_2, posts_3 ... 共16张表
-- 关注关系表
CREATE TABLE follows (
user_id BIGINT,
follow_id BIGINT,
create_time DATETIME,
PRIMARY KEY (user_id, follow_id),
INDEX idx_follow_id (follow_id)
);
Feed流查询优化:
@Service
public class FeedService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 获取用户Feed流(关注的人的帖子)
*/
public List<Post> getFeed(Long userId, int offset, int limit) {
String cacheKey = "feed:" + userId + ":" + offset + ":" + limit;
// 1. 尝试从缓存获取
List<Post> cached = (List<Post>) redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return cached;
}
// 2. 获取关注列表
List<Long> followIds = getFollowIds(userId);
if (followIds.isEmpty()) {
return Collections.emptyList();
}
// 3. 分表查询(根据user_id取模)
Map<Long, List<Long>> tableMap = new HashMap<>();
for (Long followId : followIds) {
int tableIndex = (int) (followId % 16);
tableMap.computeIfAbsent((long) tableIndex, k -> new ArrayList<>()).add(followId);
}
// 4. 并行查询各分表
List<Post> allPosts = tableMap.entrySet().parallelStream()
.flatMap(entry -> {
String tableName = "posts_" + entry.getKey();
List<Long> userIds = entry.getValue();
return getPostsFromTable(tableName, userIds, offset, limit).stream();
})
.sorted(Comparator.comparing(Post::getCreateTime).reversed())
.limit(limit)
.collect(Collectors.toList());
// 5. 写入缓存
redisTemplate.opsForValue().set(cacheKey, allPosts, 60, TimeUnit.SECONDS);
return allPosts;
}
private List<Long> getFollowIds(Long userId) {
return jdbcTemplate.queryForList(
"SELECT follow_id FROM follows WHERE user_id = ?",
Long.class, userId);
}
private List<Post> getPostsFromTable(String tableName, List<Long> userIds, int offset, int limit) {
String sql = String.format(
"SELECT p.*, u.username, u.avatar " +
"FROM %s p JOIN users u ON p.user_id = u.id " +
"WHERE p.user_id IN (%s) " +
"ORDER BY p.create_time DESC " +
"LIMIT ? OFFSET ?",
tableName,
userIds.stream().map(String::valueOf).collect(Collectors.joining(",")));
return jdbcTemplate.query(sql, new Object[]{limit, offset}, (rs, rowNum) -> {
Post post = new Post();
post.setId(rs.getLong("id"));
post.setUserId(rs.getLong("user_id"));
post.setContent(rs.getString("content"));
post.setCreateTime(rs.getTimestamp("create_time"));
post.setUsername(rs.getString("username"));
post.setAvatar(rs.getString("avatar"));
return post;
});
}
}
八、总结与最佳实践
8.1 高并发优化检查清单
架构层面:
- [ ] 实现读写分离
- [ ] 考虑分库分表
- [ ] 使用连接池并合理配置
- [ ] 引入多级缓存
配置层面:
- [ ] innodb_buffer_pool_size设置为内存的50-70%
- [ ] max_connections足够但不过大
- [ ] 慢查询日志已开启
- [ ] 合理设置innodb_flush_log_at_trx_commit
SQL层面:
- [ ] 关键查询都有合适索引
- [ ] 避免SELECT *
- [ ] 避免大事务
- [ ] 批量操作替代单条操作
- [ ] 避免索引失效的写法
监控层面:
- [ ] 部署监控系统(Prometheus + Grafana)
- [ ] 配置告警规则
- [ ] 定期分析慢查询日志
- [ ] 监控锁等待和死锁
8.2 持续优化建议
- 定期压测:使用sysbench或JMeter进行压力测试,发现性能瓶颈
- 灰度发布:SQL优化先在从库测试,确认无误后再应用到主库
- 数据归档:定期归档历史数据,保持单表数据量在合理范围(建议<1000万行)
- 参数调优:根据业务特点持续调整MySQL参数,记录每次变更的影响
- 知识沉淀:建立团队的SQL规范,定期分享优化案例
通过以上策略的综合应用,可以有效应对海量用户同时访问的挑战,构建高性能、高可用的MySQL数据库系统。记住,优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。
