引言:理解高并发场景下的MySQL挑战
在现代互联网应用中,高并发访问已经成为常态。无论是电商平台的秒杀活动,还是社交媒体的热点事件,MySQL数据库都面临着前所未有的压力。当并发连接数达到数千甚至上万时,简单的查询可能变得缓慢,写入操作可能产生锁等待,严重时甚至导致数据库服务不可用。
高并发对MySQL的主要挑战包括:
- CPU资源竞争:大量查询同时执行,CPU调度压力剧增
- 内存资源耗尽:连接数过多导致内存不足
- 磁盘I/O瓶颈:频繁的读写操作使磁盘成为瓶颈
- 锁竞争:行锁、表锁导致的等待和死锁
- 网络延迟:大量数据传输导致网络拥堵
本文将从索引优化、查询优化、架构优化和读写分离四个层面,系统性地讲解MySQL高并发处理策略,并提供完整的实战代码示例。
一、索引优化:高并发的基石
1.1 索引的基本原理与类型选择
索引是MySQL性能优化的核心。在高并发场景下,合理的索引设计可以将查询性能提升100倍甚至更多。
B-Tree索引(默认索引类型)
B-Tree索引是最常用的索引类型,适用于全值匹配、范围查询和排序操作。
-- 创建用户表的复合索引
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
status TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username_email (username, email),
INDEX idx_status_created (status, created_at)
);
-- 查看索引使用情况
EXPLAIN SELECT * FROM users WHERE username = 'john_doe' AND email = 'john@example.com';
索引选择建议:
- 高选择性列优先:选择性高的列(唯一值多)索引效果更好
- 最左前缀原则:复合索引必须遵循最左匹配
- 覆盖索引:查询列全部在索引中,避免回表
Hash索引(精确匹配场景)
Hash索引适用于等值查询,不支持范围查询。
-- Memory引擎表使用Hash索引
CREATE TABLE cache_data (
cache_key VARCHAR(100) PRIMARY KEY,
cache_value TEXT,
INDEX idx_hash USING HASH (cache_key)
) ENGINE=MEMORY;
全文索引(文本搜索场景)
-- 文章表的全文索引
CREATE TABLE articles (
id INT PRIMARY KEY AUTO_INCREMENT,
title VARCHAR(200),
content TEXT,
FULLTEXT INDEX idx_fulltext (title, content)
) ENGINE=InnoDB;
-- 全文搜索查询
SELECT * FROM articles
WHERE MATCH(title, content) AGAINST('database optimization' IN NATURAL LANGUAGE MODE);
1.2 高并发场景下的索引设计实战
场景1:电商订单查询优化
-- 原始表结构(无优化)
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
status TINYINT,
amount DECIMAL(10,2),
create_time DATETIME,
update_time DATETIME
);
-- 高并发查询需求:
-- 1. 查询用户未完成订单(最频繁)
-- 2. 查询某时间段订单
-- 3. 查询某状态订单统计
-- 优化后的索引设计
ALTER TABLE orders ADD INDEX idx_user_status (user_id, status);
ALTER TABLE orders ADD INDEX idx_create_time (create_time);
ALTER TABLE orders ADD INDEX idx_status_amount (status, amount);
-- 查询性能对比
-- 优化前:全表扫描,扫描100万行,耗时2.3秒
EXPLAIN SELECT * FROM orders WHERE user_id = 12345 AND status = 0;
-- 优化后:索引扫描,扫描10行,耗时0.002秒
-- 执行计划:type=ref, key=idx_user_status, rows=10
场景2:社交平台消息流优化
-- 用户消息表
CREATE TABLE user_messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
sender_id BIGINT NOT NULL,
message_type TINYINT NOT NULL,
content TEXT,
is_read TINYINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_type_read (user_id, message_type, is_read, created_at DESC)
);
-- 高并发查询:获取用户未读消息列表
SELECT * FROM user_messages
WHERE user_id = 10001
AND message_type = 1
AND is_read = 0
ORDER BY created_at DESC
LIMIT 20;
-- 索引覆盖优化(避免回表)
CREATE INDEX idx_covering ON user_messages (
user_id, message_type, is_read, created_at DESC
) INCLUDE (id, content); -- MySQL 8.0+支持INCLUDE语法
1.3 索引维护与监控
监控索引使用情况
-- 查看索引使用统计(MySQL 8.0+)
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
INDEX_NAME,
COUNT_FETCH,
COUNT_INSERT,
COUNT_UPDATE,
COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE OBJECT_SCHEMA = 'your_database'
ORDER BY COUNT_FETCH DESC;
-- 查找未使用的索引(可能浪费写入性能)
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
t.INDEX_NAME,
t.ROWS_READ
FROM performance_schema.table_io_waits_summary_by_index_usage t
LEFT JOIN information_schema.STATISTICS s
ON t.TABLE_SCHEMA = s.TABLE_SCHEMA
AND t.TABLE_NAME = s.TABLE_NAME
AND t.INDEX_NAME = s.INDEX_NAME
WHERE t.ROWS_READ = 0
AND t.INDEX_NAME IS NOT NULL;
索引碎片整理
-- 查看表碎片率
SELECT
TABLE_NAME,
ROUND((DATA_LENGTH + INDEX_LENGTH) / 1024 / 1024, 2) AS total_mb,
ROUND(DATA_FREE / 1024 / 1024, 2) AS free_mb,
ROUND(DATA_FREE / (DATA_LENGTH + INDEX_LENGTH) * 100, 2) AS fragment_ratio
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'your_database'
AND DATA_FREE > 1024 * 1024; -- 超过1MB碎片
-- 重建索引(在线DDL,MySQL 5.6+)
ALTER TABLE orders ENGINE=InnoDB; -- 重建表和索引
-- 或者单独重建索引(MySQL 8.0+)
ALTER TABLE orders DROP INDEX idx_old, ADD INDEX idx_new (column1, column2);
二、查询优化:让SQL飞起来
2.1 高效SQL编写原则
避免SELECT *
-- 反例:查询所有列,导致大量不必要的数据传输和内存占用
SELECT * FROM users WHERE id = 10001;
-- 正例:只查询需要的列
SELECT id, username, email FROM users WHERE id = 10001;
-- 更优:使用覆盖索引
CREATE INDEX idx_cover ON users (id, username, email);
SELECT id, username, email FROM users WHERE id = 10001; -- 索引覆盖,无需回表
合理使用LIMIT
-- 分页查询优化(避免深度分页问题)
-- 传统方式(越往后越慢)
SELECT * FROM orders WHERE user_id = 12345 ORDER BY id LIMIT 1000000, 20;
-- 优化方式1:使用子查询(先定位ID,再关联)
SELECT o.* FROM orders o
INNER JOIN (
SELECT id FROM orders
WHERE user_id = 12345
ORDER BY id
LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;
-- 优化方式2:使用位置记录(业务层优化)
-- 记录上一页最后一条记录的ID
SELECT * FROM orders
WHERE user_id = 12345 AND id > 1000000
ORDER BY id LIMIT 20;
避免不必要的排序
-- 反例:不必要的ORDER BY(索引已排序)
SELECT * FROM users ORDER BY created_at DESC LIMIT 10;
-- 正例:利用索引排序(索引包含created_at DESC)
CREATE INDEX idx_created ON users (created_at DESC);
SELECT * FROM users LIMIT 10; -- 直接读取索引,无需额外排序
2.2 高级查询优化技巧
使用EXPLAIN分析执行计划
-- 示例:分析复杂查询
EXPLAIN
SELECT
u.username,
COUNT(o.order_id) AS order_count,
SUM(o.amount) AS total_amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.status = 1
AND o.create_time >= '2024-01-01'
GROUP BY u.id
HAVING order_count > 5
ORDER BY total_amount DESC;
-- 关键指标解读:
-- type: ALL(全表扫描)→ index(索引扫描)→ range(范围扫描)→ ref(索引查找)→ const(常量查找)
-- rows: 预计扫描行数(越小越好)
-- Extra: Using index(覆盖索引)→ Using where(条件过滤)→ Using filesort(文件排序,需优化)
优化子查询
-- 反例:相关子查询(性能差)
SELECT * FROM orders o
WHERE o.amount > (
SELECT AVG(amount) FROM orders WHERE user_id = o.user_id
);
-- 正例:使用JOIN优化
SELECT o.* FROM orders o
INNER JOIN (
SELECT user_id, AVG(amount) AS avg_amount
FROM orders
GROUP BY user_id
) AS stats ON o.user_id = stats.user_id
WHERE o.amount > stats.avg_amount;
-- 更优:使用窗口函数(MySQL 8.0+)
SELECT * FROM (
SELECT *,
AVG(amount) OVER (PARTITION BY user_id) AS avg_amount
FROM orders
) AS tmp
WHERE amount > avg_amount;
优化OR条件
-- 反例:OR条件导致索引失效
SELECT * FROM users WHERE username = 'alice' OR email = 'alice@example.com';
-- 正例:使用UNION ALL
SELECT * FROM users WHERE username = 'alice'
UNION ALL
SELECT * FROM users WHERE email = 'alice@example.com';
-- 或者使用IN(如果列有索引)
SELECT * FROM users WHERE username IN ('alice', 'bob');
2.3 高并发场景下的查询模式优化
批量操作替代循环单条操作
# Python示例:批量插入 vs 循环插入
import mysql.connector
# 反例:循环单条插入(1000次网络往返)
def insert_slow(data_list):
conn = mysql.connector.connect(...)
cursor = conn.cursor()
for item in data_list:
cursor.execute(
"INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)",
(item['user_id'], item['action'], item['timestamp'])
)
conn.commit()
# 正例:批量插入(1次网络往返)
def insert_fast(data_list):
conn = mysql.connector.connect(...)
cursor = conn.cursor()
sql = "INSERT INTO logs (user_id, action, timestamp) VALUES (%s, %s, %s)"
values = [(item['user_id'], item['action'], item['timestamp']) for item in data_list]
cursor.executemany(sql, values) # 批量执行
conn.commit()
# 性能对比:1000条记录,批量插入快50-100倍
使用存储过程减少网络往返
-- 存储过程:批量更新库存
DELIMITER $$
CREATE PROCEDURE batch_update_stock(
IN order_items JSON -- 订单项JSON数组
)
BEGIN
DECLARE i INT DEFAULT 0;
DECLARE item_count INT;
DECLARE product_id INT;
DECLARE quantity INT;
SET item_count = JSON_LENGTH(order_items);
-- 开启事务
START TRANSACTION;
WHILE i < item_count DO
SET product_id = JSON_EXTRACT(order_items, CONCAT('$[', i, '].product_id'));
SET quantity = JSON_EXTRACT(order_items, CONCAT('$[', i, '].quantity'));
-- 更新库存(带乐观锁)
UPDATE product_stock
SET stock = stock - quantity,
version = version + 1
WHERE product_id = product_id
AND stock >= quantity
AND version = JSON_EXTRACT(order_items, CONCAT('$[', i, '].version'));
IF ROW_COUNT() = 0 THEN
-- 库存不足或版本冲突
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Stock update failed';
RETURN;
END IF;
SET i = i + 1;
END WHILE;
COMMIT;
END$$
DELIMITER ;
-- 调用示例
CALL batch_update_stock('[{"product_id": 101, "quantity": 2, "version": 5}, {"product_id": 102, "quantity": 1, "version": 3}]');
三、架构优化:突破单机瓶颈
3.1 连接池优化
连接池参数调优
# Python SQLAlchemy连接池配置
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"mysql+pymysql://user:pass@host/db",
poolclass=QueuePool,
pool_size=20, # 核心连接数(根据CPU核心数调整)
max_overflow=10, # 超出核心连接数时允许的最大连接数
pool_timeout=30, # 获取连接超时时间(秒)
pool_recycle=3600, # 连接回收时间(秒),避免长连接导致的超时
pool_pre_ping=True, # 连接健康检查(防止连接断开)
echo=False # 生产环境关闭SQL日志
)
# 连接池监控
from sqlalchemy import event
from sqlalchemy.pool import Pool
@event.listens_for(Pool, 'checkout')
def checkout(dbapi_con, con_record, con_proxy):
print("获取连接")
@event.listens_for(Pool, 'checkin')
def checkin(dbapi_con, con_record):
print("归还连接")
MySQL服务器连接参数
-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';
-- 调整最大连接数(my.cnf)
[mysqld]
max_connections = 1000 # 最大连接数(根据内存调整,每个连接约10MB)
thread_cache_size = 100 # 线程缓存,减少线程创建开销
wait_timeout = 600 # 非交互连接超时时间(秒)
interactive_timeout = 3600 # 交互连接超时时间(秒)
-- 查看连接状态
SHOW PROCESSLIST; -- 查看当前所有连接
KILL 12345; -- 杀掉指定连接
3.2 缓存策略
应用层缓存(Redis)
import redis
import json
from functools import wraps
# Redis连接配置
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5
)
def cache_query(ttl=300):
"""查询结果缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# 执行查询
result = func(*args, **kwargs)
# 写入缓存
redis_client.setex(key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# 使用示例
@cache_query(ttl=60)
def get_user_profile(user_id):
# 模拟数据库查询
conn = mysql.connector.connect(...)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return cursor.fetchone()
# 缓存更新策略(写操作时清除缓存)
def update_user_profile(user_id, data):
# 更新数据库
conn = mysql.connector.connect(...)
cursor = conn.cursor()
cursor.execute("UPDATE users SET ... WHERE id = %s", (user_id,))
conn.commit()
# 清除缓存
redis_client.delete(f"cache:get_user_profile:({user_id},)")
MySQL查询缓存(已废弃,推荐使用外部缓存)
-- MySQL 5.7及之前版本的查询缓存(MySQL 8.0已移除)
-- 查看查询缓存状态
SHOW VARIABLES LIKE 'query_cache%';
-- 在my.cnf中配置(不推荐在高并发场景使用)
[mysqld]
query_cache_type = 0 # 关闭(高并发下缓存失效开销大)
query_cache_size = 0
3.3 分库分表(垂直拆分与水平拆分)
垂直拆分:按业务模块拆分
-- 原始单表(字段过多,访问频繁)
CREATE TABLE user_all (
id BIGINT PRIMARY KEY,
username VARCHAR(50),
email VARCHAR(100),
phone VARCHAR(20),
password_hash VARCHAR(255),
profile_text TEXT,
avatar_url VARCHAR(255),
last_login DATETIME,
login_count INT,
-- ... 更多字段
INDEX idx_username (username)
);
-- 垂直拆分:核心信息 + 扩展信息
-- 表1:用户核心信息(高频访问)
CREATE TABLE user_core (
id BIGINT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status TINYINT DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB;
-- 表2:用户资料信息(低频访问)
CREATE TABLE user_profile (
user_id BIGINT PRIMARY KEY,
phone VARCHAR(20),
profile_text TEXT,
avatar_url VARCHAR(255),
last_login DATETIME,
login_count INT DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES user_core(id) ON DELETE CASCADE
) ENGINE=InnoDB;
-- 表3:用户统计信息(写入频繁)
CREATE TABLE user_stats (
user_id BIGINT PRIMARY KEY,
login_count INT DEFAULT 0,
order_count INT DEFAULT 0,
last_order_time DATETIME,
total_spent DECIMAL(12,2) DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES user_core(id) ON DELETE CASCADE
) ENGINE=InnoDB;
水平拆分:按用户ID哈希分片
-- 分片表结构(4个分片)
-- 分片0:user_id % 4 = 0
CREATE TABLE orders_0 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
amount DECIMAL(10,2),
status TINYINT,
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- 分片1:user_id % 4 = 1
CREATE TABLE orders_1 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
amount DECIMAL(10,2),
status TINYINT,
create_time DATETIME,
INDEX idx_user_id (user_id)
) ENGINE=InnoDB;
-- 分片2:user_id % 4 = 2
CREATE TABLE orders_2 (...);
-- 分片3:user_id % 4 = 3
CREATE TABLE orders_3 (...);
-- 分片路由函数(应用层实现)
def get_shard_table(user_id):
shard_index = user_id % 4
return f"orders_{shard_index}"
def get_orders_by_user(user_id):
table_name = get_shard_table(user_id)
sql = f"SELECT * FROM {table_name} WHERE user_id = %s"
# 执行查询...
四、读写分离:架构级高并发解决方案
4.1 读写分离原理与架构
读写分离的核心思想是将写操作(INSERT/UPDATE/DELETE)路由到主库(Master),将读操作(SELECT)路由到从库(Slave),从而分担主库压力。
架构图
┌─────────────────────────────────────────┐
│ 应用服务层 │
│ ┌───────────────────────────────────┐ │
│ │ 读写分离中间件/应用层路由 │ │
│ │ - 写操作 → Master │ │
│ │ - 读操作 → Slave │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Master │ │ Slave1 │
│ (写+读) │ │ (只读) │
└─────────────┘ └─────────────┘
│ │
└────────┬─────────┘
│
┌────────▼─────────┐
│ Binlog复制 │
│ (异步/半同步) │
└──────────────────┘
4.2 MySQL主从复制配置
步骤1:主库配置(Master)
-- 1. 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'ReplPassword123!';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;
-- 2. 查看主库状态(记录File和Position)
SHOW MASTER STATUS;
-- 示例输出:
-- +------------------+----------+--------------+------------------+-------------------+
-- | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
-- +------------------+----------+--------------+------------------+-------------------+
-- | mysql-bin.000012 | 154 | | | |
-- +------------------+----------+--------------+------------------+-------------------+
-- 3. 锁表(确保数据一致性,可选)
FLUSH TABLES WITH READ LOCK;
-- 4. 在my.cnf中配置主库
[mysqld]
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW # 推荐ROW格式,确保数据一致性
expire_logs_days = 7 # 自动清理7天前的binlog
max_binlog_size = 100M # 单个binlog文件大小
binlog_cache_size = 4M # binlog缓存大小
-- 5. 解锁(数据导出完成后)
UNLOCK TABLES;
步骤2:从库配置(Slave)
-- 1. 在my.cnf中配置从库
[mysqld]
server-id = 2 # 必须与主库不同
relay_log = /var/log/mysql/mysql-relay-bin.log
log_bin = /var/log/mysql/mysql-bin.log
read_only = 1 # 强制从库只读,防止误写
slave_parallel_workers = 4 # 并行复制线程数(MySQL 5.7+)
-- 2. 从库启动复制
CHANGE MASTER TO
MASTER_HOST='192.168.1.100', -- 主库IP
MASTER_USER='repl', -- 复制用户名
MASTER_PASSWORD='ReplPassword123!', -- 复制密码
MASTER_LOG_FILE='mysql-bin.000012', -- 主库binlog文件名
MASTER_LOG_POS=154, -- 主库binlog位置
MASTER_SSL=1; -- 启用SSL加密(生产环境推荐)
-- 3. 启动从库复制
START SLAVE;
-- 4. 查看复制状态
SHOW SLAVE STATUS\G
-- 关键指标:
-- Slave_IO_Running: Yes # IO线程是否正常
-- Slave_SQL_Running: Yes # SQL线程是否正常
-- Seconds_Behind_Master: 0 # 延迟秒数(0表示无延迟)
-- Last_Error: # 错误信息(如果有)
4.3 应用层实现读写分离
方案1:基于AOP的读写分离(Java Spring Boot)
// 1. 定义数据源注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
DataSourceType value() default DataSourceType.SLAVE;
}
public enum DataSourceType {
MASTER, SLAVE
}
// 2. 动态数据源上下文
public class DataSourceContextHolder {
private static final ThreadLocal<DataSourceType> contextHolder =
new ThreadLocal<>();
public static void setDataSource(DataSourceType type) {
contextHolder.set(type);
}
public static DataSourceType getDataSource() {
return contextHolder.getSlaveFirst() != null ?
contextHolder.get() : DataSourceType.SLAVE;
}
public static void clear() {
contextHolder.remove();
}
}
// 3. AOP拦截器
@Aspect
@Component
public class DataSourceAspect {
@Before("execution(* com.example.service.*.*(..))")
public void before(JoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
// 优先使用方法上的注解
if (method.isAnnotationPresent(DataSource.class)) {
DataSource annotation = method.getAnnotation(DataSource.class);
DataSourceContextHolder.setDataSource(annotation.value());
} else {
// 默认策略:写操作走主库,读操作走从库
String methodName = method.getName();
if (methodName.startsWith("get") || methodName.startsWith("list") ||
methodName.startsWith("query")) {
DataSourceContextHolder.setDataSource(DataSourceType.SLAVE);
} else {
DataSourceContextHolder.setDataSource(DataSourceType.MASTER);
}
}
}
@After("execution(* com.example.service.*.*(..))")
public void after() {
DataSourceContextHolder.clear();
}
}
// 4. 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceContextHolder.getDataSource();
}
}
// 5. 服务层使用
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
// 强制走主库(写操作)
@DataSource(DataSourceType.MASTER)
public User createUser(User user) {
return userRepository.save(user);
}
// 强制走从库(读操作)
@DataSource(DataSourceType.SLAVE)
public User getUserById(Long id) {
return userRepository.findById(id).orElse(null);
}
// 默认走从库(读操作)
public List<User> listUsers() {
return userRepository.findAll();
}
}
方案2:使用ShardingSphere中间件
# ShardingSphere配置(application.yml)
spring:
shardingsphere:
datasource:
names: master,slave0,slave1
master:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.100:3306/mydb
username: root
password: master_password
slave0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.101:3306/mydb
username: root
password: slave_password
slave1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.102:3306/mydb
username: root
password: slave_password
# 读写分离规则
rules:
readwrite-splitting:
data-sources:
mydb:
type: Static
props:
write-data-source-name: master
read-data-source-names: slave0,slave1
load-balance-algorithm-name: round_robin # 轮询负载均衡
# 负载均衡算法
algorithms:
round_robin:
type: ROUND_ROBIN
# 全局配置
props:
sql-show: true # 显示实际路由的SQL(开发环境)
check-table-metadata-enabled: true
方案3:使用ProxySQL(MySQL协议代理)
-- ProxySQL配置(通过Admin接口)
-- 连接ProxySQL管理端口
mysql -u admin -padmin -h 127.0.0.1 -P 6032
-- 配置主库和从库
INSERT INTO mysql_servers (hostgroup_id, hostname, port, weight) VALUES
(10, '192.168.1.100', 3306, 100), -- 主库(hostgroup 10)
(20, '192.168.1.101', 3306, 100), -- 从库1(hostgroup 20)
(20, '192.168.1.102', 3306, 100); -- 从库2(hostgroup 20)
-- 配置读写分离规则
INSERT INTO mysql_query_rules (rule_id, active, match_digest, destination_hostgroup, apply) VALUES
(1, 1, '^SELECT.*FOR UPDATE', 10, 1), -- SELECT FOR UPDATE走主库
(2, 1, '^SELECT', 20, 1); -- 普通SELECT走从库
-- 配置健康检查
UPDATE global_variables SET variable_value='SELECT 1' WHERE variable_name='mysql-ping_interval_server_msec';
LOAD MYSQL VARIABLES TO RUNTIME;
LOAD MYSQL SERVERS TO RUNTIME;
-- 保存配置
SAVE MYSQL SERVERS TO DISK;
SAVE MYSQL QUERY RULES TO DISK;
4.4 主从延迟监控与处理
监控延迟
-- 在从库执行
SHOW SLAVE STATUS\G
-- 关注:Seconds_Behind_Master
-- 更精确的延迟监控(在主库和从库分别执行相同查询)
-- 主库:
SELECT UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(LAST_UPDATE_TIME) AS delay
FROM information_schema.GLOBAL_STATUS
WHERE VARIABLE_NAME = 'LAST_QUERY_COST';
-- 创建监控表(记录延迟)
CREATE TABLE replication_delay (
id INT PRIMARY KEY AUTO_INCREMENT,
check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
delay_seconds INT,
slave_host VARCHAR(100)
);
-- 定时任务监控(Event Scheduler)
CREATE EVENT monitor_replication
ON SCHEDULE EVERY 10 SECOND
DO
INSERT INTO replication_delay (delay_seconds, slave_host)
SELECT
COALESCE(SECONDS_BEHIND_MASTER, 0),
VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME = 'SLAVE_LAG';
处理延迟的策略
# 应用层处理延迟的几种策略
# 策略1:强制读主库(关键业务)
def get_user_with_consistency(user_id):
# 检查业务是否需要强一致性
if need_strong_consistency():
return get_from_master(user_id) # 强制读主库
else:
return get_from_slave(user_id) # 读从库
# 策略2:延迟检测与降级
def get_data_with_fallback(key):
try:
# 先尝试读从库
data = redis_client.get(f"slave:{key}")
if data:
return json.loads(data)
# 从库无数据或延迟过高,读主库
data = get_from_master(key)
# 写入缓存
redis_client.setex(f"slave:{key}", 60, json.dumps(data))
return data
except Exception as e:
# 从库故障,读主库
logger.error(f"Slave error: {e}, fallback to master")
return get_from_master(key)
# 策略3:业务层补偿(最终一致性)
def create_order_with_sync(order_data):
# 1. 写主库
order_id = insert_to_master(order_data)
# 2. 等待从库同步(可选)
if need_immediate_read():
time.sleep(0.5) # 等待500ms
# 3. 返回结果
return order_id
4.5 读写分离的高级优化
半同步复制(减少数据丢失风险)
-- 主库配置半同步复制
-- 安装插件(MySQL 5.7+)
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
-- 配置参数(my.cnf)
[mysqld]
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 1000 # 1秒超时,退化为异步
rpl_semi_sync_master_wait_point = AFTER_SYNC # MySQL 5.7+推荐
-- 查看状态
SHOW VARIABLES LIKE 'rpl_semi_sync_master_status';
SHOW STATUS LIKE 'Rpl_semi_sync_master_status';
-- 从库配置
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';
[mysqld]
rpl_semi_sync_slave_enabled = 1
多级复制(缓解主库压力)
主库 (Master)
│
├─► 一级从库 (Slave1) - 承担部分读流量
│ │
│ └─► 二级从库 (Slave2) - 承担报表查询
│
└─► 一级从库 (Slave3) - 承担读流量
│
└─► 二级从库 (Slave4) - 承担备份
-- 一级从库配置(开启binlog,作为二级从库的主库)
[mysqld]
server-id = 2
log_bin = /var/log/mysql/mysql-bin.log
read_only = 1
log_slave_updates = 1 -- 重要:将从主库接收的更新写入自己的binlog
-- 二级从库配置
CHANGE MASTER TO
MASTER_HOST='192.168.1.101', -- 一级从库IP
MASTER_USER='repl',
MASTER_PASSWORD='password';
五、高并发综合实战案例
5.1 秒杀系统设计
数据库设计
-- 商品表
CREATE TABLE products (
id BIGINT PRIMARY KEY,
name VARCHAR(200),
stock INT NOT NULL, -- 库存
version INT DEFAULT 0, -- 乐观锁版本号
INDEX idx_stock (stock) -- 库存索引
);
-- 订单表(分表)
CREATE TABLE orders_0 (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
quantity INT,
create_time DATETIME,
INDEX idx_user_product (user_id, product_id)
) ENGINE=InnoDB;
-- 秒杀记录表(防重复)
CREATE TABLE seckill_records (
user_id BIGINT,
product_id BIGINT,
order_id BIGINT,
create_time DATETIME,
PRIMARY KEY (user_id, product_id)
) ENGINE=InnoDB;
应用层实现
@Service
public class SeckillService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplate;
// 秒杀主方法
@Transactional
public SeckillResult seckill(Long userId, Long productId, int quantity) {
String lockKey = "seckill:lock:" + productId;
String stockKey = "seckill:stock:" + productId;
try {
// 1. Redis预减库存(减少数据库访问)
Long stock = redisTemplate.opsForValue().decrement(stockKey, quantity);
if (stock < 0) {
redisTemplate.opsForValue().increment(stockKey, quantity); // 回滚
return SeckillResult.fail("库存不足");
}
// 2. 分布式锁(防止超卖)
boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 5, TimeUnit.SECONDS);
if (!locked) {
return SeckillResult.fail("系统繁忙,请重试");
}
// 3. 检查是否已秒杀
String existKey = "seckill:record:" + userId + ":" + productId;
if (redisTemplate.hasKey(existKey)) {
return SeckillResult.fail("您已参与过秒杀");
}
// 4. 数据库操作(走主库)
// 乐观锁更新库存
int updated = jdbcTemplate.update(
"UPDATE products SET stock = stock - ?, version = version + 1 " +
"WHERE id = ? AND stock >= ? AND version = ?",
quantity, productId, quantity, getCurrentVersion(productId)
);
if (updated == 0) {
return SeckillResult.fail("库存不足或已被抢购");
}
// 5. 创建订单
Long orderId = createOrder(userId, productId, quantity);
// 6. 记录到Redis(防重复)
redisTemplate.opsForValue().set(existKey, orderId.toString(), 30, TimeUnit.MINUTES);
// 7. 发送MQ消息(异步处理后续流程)
sendOrderMessage(orderId, userId, productId);
return SeckillResult.success(orderId);
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
private Long createOrder(Long userId, Long productId, int quantity) {
Long orderId = generateOrderId();
int shardIndex = (int) (userId % 4);
String table = "orders_" + shardIndex;
jdbcTemplate.update(
String.format("INSERT INTO %s (order_id, user_id, product_id, quantity, create_time) VALUES (?, ?, ?, ?, NOW())", table),
orderId, userId, productId, quantity
);
return orderId;
}
}
数据库配置优化
# my.cnf 秒杀场景专用配置
[mysqld]
# 连接优化
max_connections = 2000
thread_cache_size = 200
back_log = 500
# InnoDB优化
innodb_buffer_pool_size = 16G # 根据内存调整,建议70-80%内存
innodb_log_file_size = 2G
innodb_flush_log_at_trx_commit = 2 # 性能优先(可能丢失1秒数据)
innodb_flush_method = O_DIRECT
innodb_io_capacity = 2000
# 锁优化
innodb_lock_wait_timeout = 10
innodb_rollback_on_timeout = 1
# 查询缓存(关闭,高并发下无效)
query_cache_type = 0
query_cache_size = 0
# 临时表优化
tmp_table_size = 256M
max_heap_table_size = 256M
5.2 社交平台消息流优化
场景分析
- 读多写少:用户查看消息流频率远高于发送消息
- 实时性要求:新消息需要快速可见
- 数据量大:单用户消息可能达到百万级
优化方案
-- 消息表(分表+分区)
CREATE TABLE messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id BIGINT NOT NULL,
sender_id BIGINT NOT NULL,
content TEXT,
message_type TINYINT,
is_read TINYINT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_type_read (user_id, message_type, is_read, created_at DESC)
) PARTITION BY RANGE (YEAR(created_at)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 消息计数器(缓存)
CREATE TABLE message_counters (
user_id BIGINT PRIMARY KEY,
unread_count INT DEFAULT 0,
total_count INT DEFAULT 0,
last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 应用层读写分离策略
class MessageService:
def send_message(self, sender_id, receiver_id, content):
"""发送消息(写主库)"""
# 1. 插入消息表
msg_id = self.insert_to_master(
"INSERT INTO messages (user_id, sender_id, content, message_type) VALUES (?, ?, ?, ?)",
(receiver_id, sender_id, content, 1)
)
# 2. 更新计数器(主库)
self.insert_to_master(
"INSERT INTO message_counters (user_id, unread_count, total_count) VALUES (?, 1, 1) "
"ON DUPLICATE KEY UPDATE unread_count = unread_count + 1, total_count = total_count + 1",
(receiver_id,)
)
# 3. 发送MQ通知(异步)
self.send_notification(receiver_id, msg_id)
return msg_id
def get_unread_messages(self, user_id, limit=20):
"""获取未读消息(读从库)"""
# 1. 先读计数器(从库)
counter = self.query_from_slave(
"SELECT unread_count FROM message_counters WHERE user_id = %s",
(user_id,)
)
if not counter or counter['unread_count'] == 0:
return [], 0
# 2. 读取消息列表(从库)
messages = self.query_from_slave(
"SELECT * FROM messages WHERE user_id = %s AND is_read = 0 "
"ORDER BY created_at DESC LIMIT %s",
(user_id, limit)
)
return messages, counter['unread_count']
def mark_as_read(self, user_id, message_ids):
"""标记已读(写主库)"""
# 1. 批量更新消息状态
placeholders = ','.join(['%s'] * len(message_ids))
self.insert_to_master(
f"UPDATE messages SET is_read = 1 WHERE id IN ({placeholders}) AND user_id = %s",
message_ids + [user_id]
)
# 2. 更新计数器
self.insert_to_master(
"UPDATE message_counters SET unread_count = GREATEST(0, unread_count - %s) WHERE user_id = %s",
(len(message_ids), user_id)
)
# 3. 清除缓存
self.cache.delete(f"messages:{user_id}:unread")
六、监控与告警
6.1 关键性能指标监控
-- 1. 连接数监控
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';
-- 2. 查询性能监控
SHOW STATUS LIKE 'Slow_queries';
SHOW STATUS LIKE 'Questions';
-- 3. InnoDB状态监控
SHOW ENGINE INNODB STATUS\G
-- 4. 锁等待监控
SELECT * FROM performance_schema.data_lock_waits;
SELECT * FROM performance_schema.data_locks;
-- 5. 慢查询日志分析
-- 在my.cnf中开启
[mysqld]
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 1 # 记录超过1秒的查询
log_queries_not_using_indexes = 1
-- 分析慢日志
mysqldumpslow /var/log/mysql/slow.log
pt-query-digest /var/log/mysql/slow.log
6.2 监控脚本示例
#!/usr/bin/env python3
# MySQL监控脚本
import mysql.connector
import time
import smtplib
from email.mime.text import MIMEText
class MySQLMonitor:
def __init__(self, host, user, password):
self.config = {
'host': host,
'user': user,
'password': password,
'connect_timeout': 5
}
def check_connections(self):
"""检查连接数"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
connected = int(cursor.fetchone()[1])
cursor.execute("SHOW VARIABLES LIKE 'max_connections'")
max_conn = int(cursor.fetchone()[1])
cursor.close()
conn.close()
usage = (connected / max_conn) * 100
if usage > 80:
self.send_alert(f"连接数过高: {connected}/{max_conn} ({usage:.1f}%)")
return connected, max_conn
def check_slow_queries(self):
"""检查慢查询"""
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
cursor.execute("SHOW STATUS LIKE 'Slow_queries'")
slow_queries = int(cursor.fetchone()[1])
cursor.close()
conn.close()
if slow_queries > 100: # 1分钟内超过100个慢查询
self.send_alert(f"慢查询过多: {slow_queries}个")
return slow_queries
def check_replication_lag(self):
"""检查主从延迟"""
try:
conn = mysql.connector.connect(**self.config)
cursor = conn.cursor()
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()
if result:
lag = result[32] # Seconds_Behind_Master
if lag and lag > 60: # 延迟超过60秒
self.send_alert(f"主从延迟过高: {lag}秒")
return lag
except:
pass
finally:
cursor.close()
conn.close()
return None
def send_alert(self, message):
"""发送告警(邮件示例)"""
msg = MIMEText(message)
msg['Subject'] = 'MySQL监控告警'
msg['From'] = 'monitor@example.com'
msg['To'] = 'dba@example.com'
try:
server = smtplib.SMTP('smtp.example.com', 587)
server.login('user', 'pass')
server.send_message(msg)
server.quit()
except Exception as e:
print(f"发送告警失败: {e}")
def run_monitor(self):
"""持续监控"""
while True:
try:
self.check_connections()
self.check_slow_queries()
self.check_replication_lag()
except Exception as e:
print(f"监控错误: {e}")
time.sleep(60) # 每分钟检查一次
if __name__ == '__main__':
monitor = MySQLMonitor('localhost', 'monitor', 'password')
monitor.run_monitor()
七、总结与最佳实践
7.1 高并发优化检查清单
索引优化
- [ ] 所有查询都有合适的索引
- [ ] 避免索引失效(如函数操作、隐式类型转换)
- [ ] 定期检查未使用索引并清理
- [ ] 复合索引遵循最左前缀原则
查询优化
- [ ] 避免SELECT *
- [ ] 合理使用LIMIT,避免深度分页
- [ ] 批量操作替代循环单条
- [ ] 使用EXPLAIN分析慢查询
架构优化
- [ ] 配置合理的连接池参数
- [ ] 实现读写分离
- [ ] 使用Redis缓存热点数据
- [ ] 考虑分库分表(数据量超过千万级)
监控告警
- [ ] 开启慢查询日志
- [ ] 监控连接数和CPU使用率
- [ ] 监控主从延迟
- [ ] 设置合理的告警阈值
7.2 不同并发量下的推荐方案
| 并发量 | 推荐方案 | 预期性能提升 |
|---|---|---|
| < 100 QPS | 索引优化 + 连接池 | 5-10倍 |
| 100-1000 QPS | 查询优化 + Redis缓存 | 10-50倍 |
| 1000-5000 QPS | 读写分离 + 缓存 | 50-200倍 |
| > 5000 QPS | 分库分表 + 读写分离 | 200倍以上 |
7.3 常见误区与避坑指南
过度索引:索引越多越好 ❌
- 每个索引都会增加写入开销,建议单表索引不超过5个
盲目分库分表:数据量不大就分表 ❌
- 分表增加复杂度,数据量超过1000万再考虑
忽视主从延迟:认为读写分离就完美 ❌
- 必须处理延迟问题,关键业务读主库
缓存与数据库不一致:只更新缓存不更新数据库 ❌
- 必须先更新数据库,再删除缓存(Cache Aside模式)
长事务:事务中包含远程调用 ❌
- 事务应短小精悍,避免持有锁过长时间
7.4 性能测试基准
# 使用sysbench进行压力测试
# 安装sysbench
sudo apt-get install sysbench
# 准备测试数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
--mysql-db=testdb --tables=10 --table-size=100000 \
/usr/share/sysbench/oltp_read_write.lua prepare
# 执行测试(100并发,60秒)
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
--mysql-db=testdb --tables=10 --table-size=100000 \
--threads=100 --time=60 --report-interval=10 \
/usr/share/sysbench/oltp_read_write.lua run
# 清理数据
sysbench --mysql-host=localhost --mysql-user=root --mysql-password=pass \
--mysql-db=testdb /usr/share/sysbench/oltp_read_write.lua cleanup
通过本文的系统性优化,您可以将MySQL的并发处理能力提升10-100倍。记住,优化是一个持续的过程,需要根据业务变化和监控数据不断调整策略。建议建立性能基线,定期进行压力测试,确保系统始终处于最佳状态。
