引言:理解高并发环境下的MySQL挑战

在现代互联网应用中,高并发场景已经成为常态。无论是电商秒杀、社交媒体的热点事件,还是金融交易系统,MySQL数据库都面临着前所未有的压力。高并发环境下,数据库性能瓶颈会直接导致系统响应延迟、用户体验下降,甚至服务不可用。

高并发对MySQL的主要挑战包括:

  • 连接数激增:大量并发连接导致资源耗尽
  • 锁竞争:行锁、表锁的争用导致事务等待
  • I/O瓶颈:磁盘读写速度跟不上内存处理速度
  • CPU过载:复杂查询和排序消耗大量CPU资源
  • 内存不足:缓冲池命中率低,频繁的磁盘I/O

本文将从多个维度深入探讨MySQL高并发优化策略,帮助您构建高性能的数据库系统。

一、架构层面的优化策略

1.1 读写分离架构

读写分离是应对高并发查询的最有效策略之一。通过将读操作和写操作分离到不同的数据库实例,可以显著提升系统吞吐量。

实现方式

  • 主库(Master):处理所有写操作(INSERT、UPDATE、DELETE)
  • 从库(Slave):处理所有读操作(SELECT)
  • 中间件:使用ProxySQL、MyCat或应用层路由

代码示例:应用层读写分离实现

// Spring Boot配置多数据源
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    @Bean
    public DataSource routingDataSource() {
        DynamicDataSource routingDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave", slaveDataSource());
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        return routingDataSource;
    }
}

// 动态数据源路由
public class DynamicDataSource extends AbstractRoutingDataSource {
    private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
    
    public static void setDataSource(String dataSource) {
        CONTEXT_HOLDER.set(dataSource);
    }
    
    public static void clearDataSource() {
        CONTEXT_HOLDER.remove();
    }
    
    @Override
    protected Object determineCurrentLookupKey() {
        return CONTEXT_HOLDER.get();
    }
}

// Service层使用
@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    public User getUserById(Long id) {
        // 读操作路由到从库
        DynamicDataSource.setDataSource("slave");
        try {
            return userRepository.findById(id);
        } finally {
            DynamicDataSource.clearDataSource();
        }
    }
    
    public void updateUser(User user) {
        // 写操作路由到主库
        DynamicDataSource.setDataSource("master");
        try {
            userRepository.save(user);
        } finally {
            DynamicDataSource.clearDataSource();
        }
    }
}

注意事项

  • 主从复制延迟问题:需要确保从库数据相对实时
  • 数据一致性:写操作后的立即读操作可能需要从主库读取
  • 监控主从延迟:SHOW SLAVE STATUS中的Seconds_Behind_Master

1.2 数据库分片(Sharding)

当单表数据量超过千万级时,需要考虑分片策略。分片将大表拆分成多个小表,分散到不同的数据库实例。

水平分片策略

  • 按用户ID取模分片
  • 按时间范围分片
  • 按地理位置分片

代码示例:分片路由实现

// 分片规则接口
public interface ShardingStrategy {
    String getDataSource(Long shardKey);
    String getTableName(Long shardKey);
}

// 用户表分片策略(按用户ID取模)
public class UserShardingStrategy implements ShardingStrategy {
    private static final int SHARD_COUNT = 4;
    
    @Override
    public String getDataSource(Long userId) {
        int shardIndex = (int) (userId % SHARD_COUNT);
        return "ds_" + shardIndex;
    }
    
    @Override
    public String getTableName(Long userId) {
        int shardIndex = (int) (userId % SHARD_COUNT);
        return "user_" + shardIndex;
    }
}

// 分片服务
@Service
public class ShardingUserService {
    
    @Autowired
    private ShardingStrategy shardingStrategy;
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    public void createUser(Long userId, String name) {
        String dataSource = shardingStrategy.getDataSource(userId);
        String tableName = shardingStrategy.getTableName(userId);
        
        // 动态切换数据源
        DynamicDataSource.setDataSource(dataSource);
        
        String sql = String.format(
            "INSERT INTO %s (id, name, created_at) VALUES (?, ?, NOW())",
            tableName
        );
        
        try {
            jdbcTemplate.update(sql, userId, name);
        } finally {
            DynamicDataSource.clearDataSource();
        }
    }
    
    public User getUser(Long userId) {
        String dataSource = shardingStrategy.getDataSource(userId);
        String tableName = sharmingStrategy.getTableName(userId);
        
        DynamicDataSource.setDataSource(dataSource);
        
        String sql = String.format("SELECT * FROM %s WHERE id = ?", tableName);
        
        try {
            List<Map<String, Object>> results = jdbcTemplate.queryForList(sql, userId);
            if (results.isEmpty()) {
                return null;
            }
            Map<String, Object> row = results.getSQL(0);
            return new User((Long) row.get("id"), (String) row.get("name"));
        } finally {
            DynamicDataSource.clearDataSource();
        }
    }
}

1.3 缓存层优化

引入Redis等缓存系统,减少数据库直接访问。

代码示例:缓存穿透和雪崩防护

@Service
public class CachedUserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String USER_CACHE_PREFIX = "user:";
    private static final long CACHE_TTL = 3600; // 1小时
    
    // 防止缓存穿透:对空值也进行缓存
    public User getUserWithPenetrationProtection(Long id) {
        String cacheKey = USER_CACHE_PREFIX + id;
        
        // 1. 先查缓存
        Object cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            if (cached instanceof String && "NULL".equals(cached)) {
                return null; // 返回空值缓存
            }
            return (User) cached;
        }
        
        // 2. 缓存未命中,查数据库
        User user = userRepository.findById(id);
        
        // 3. 写入缓存(包括空值)
        if (user != null) {
            redisTemplate.opsForValue().set(cacheKey, user, CACHE_TTL, TimeUnit.SECONDS);
        } else {
            // 缓存空值,防止缓存穿透
            redisTemplate.opsForValue().set(cacheKey, "NULL", 60, TimeUnit.SECONDS);
        }
        
        return user;
    }
    
    // 防止缓存雪崩:设置随机过期时间
    public User getUserWithSnowflakeProtection(Long id) {
        String cacheKey = USER_CACHE_PREFIX + id;
        
        Object cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            return (User) cached;
        }
        
        User user = userRepository.findById(id);
        
        if (user != null) {
            // 随机TTL:基础TTL + 随机值(0-300秒)
            long randomTtl = CACHE_TTL + new Random().nextInt(300);
            redisTemplate.opsForValue().set(cacheKey, user, randomTTL, TimeUnit.SECONDS);
        }
        
        return user;
    }
}

二、MySQL配置参数优化

2.1 InnoDB核心参数调优

InnoDB是MySQL的默认存储引擎,其配置直接影响高并发性能。

关键参数说明

# my.cnf 核心配置示例

[mysqld]
# 连接相关
max_connections = 2000          # 最大连接数,根据业务调整
max_user_connections = 1800     # 单用户最大连接数
wait_timeout = 600              # 非交互连接超时时间(秒)
interactive_timeout = 600       # 交互连接超时时间(秒)

# InnoDB缓冲池(最重要的参数)
innodb_buffer_pool_size = 12G   # 通常设置为物理内存的50-70%
innodb_buffer_pool_instances = 8 # 缓冲池实例数,减少竞争

# 日志文件
innodb_log_file_size = 2G       # 重做日志文件大小
innodb_log_buffer_size = 64M    # 日志缓冲区大小
innodb_flush_log_at_trx_commit = 1 # 事务提交策略(1=最安全,2=性能更好)

# I/O相关
innodb_flush_method = O_DIRECT  # 直接I/O,避免双缓存
innodb_io_capacity = 2000       # InnoDB可用的IOPS(SSD可设更高)
innodb_io_capacity_max = 4000   # 最大IOPS

# 并发控制
innodb_thread_concurrency = 0   # InnoDB线程并发数(0表示不限制)
innodb_read_io_threads = 8      # 读线程数
innodb_write_io_threads = 8     # 写线程数

# 锁相关
innodb_lock_wait_timeout = 50   # 锁等待超时(秒)
innodb_rollback_on_timeout = ON # 超时是否回滚整个事务

# 查询缓存(MySQL 8.0已移除,5.7及之前版本)
# query_cache_type = 0          # 关闭查询缓存
# query_cache_size = 0

# 临时表
tmp_table_size = 256M           # 临时表大小
max_heap_table_size = 256M      # 内存表最大大小

# 排序缓冲区
sort_buffer_size = 4M           # 每个线程的排序缓冲区
read_buffer_size = 4M           # 顺序读缓冲区
read_rnd_buffer_size = 8M       # 随机读缓冲区

# 连接缓冲
join_buffer_size = 8M           # 连接缓冲区

# 慢查询日志
slow_query_log = 1              # 开启慢查询日志
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2             # 慢查询阈值(秒)
log_queries_not_using_indexes = 1 # 记录未使用索引的查询

# 主从复制相关
server_id = 1                   # 服务器ID(主从必须不同)
log_bin = mysql-bin             # 二进制日志
binlog_format = ROW             # 二进制日志格式(ROW最安全)
sync_binlog = 1                 # 二进制日志刷盘策略(1=每次提交都刷盘)
expire_logs_days = 7            # 日志保留天数

# 其他
character_set_server = utf8mb4  # 字符集
collation_server = utf8mb4_unicode_ci
default_storage_engine = InnoDB

参数调优建议

  • innodb_buffer_pool_size:这是最重要的参数,直接影响数据命中率。可以通过SHOW ENGINE INNODB STATUS查看缓冲池命中率。
  • innodb_flush_log_at_trx_commit:设置为2可以提升性能,但会牺牲1秒的数据安全性。
  • innodb_log_file_size:设置过小会导致频繁checkpoint,设置过大会增加恢复时间。

2.2 监控参数状态

代码示例:监控关键参数

-- 查看当前连接数
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 查看InnoDB缓冲池状态
SHOW ENGINE INNODB STATUS\G

-- 查看锁等待
SELECT * FROM information_schema.INNODB_LOCK_WAITS;

-- 查看当前运行的事务
SELECT * FROM information_schema.INNODB_TRX;

-- 查看慢查询数量
SHOW STATUS LIKE 'Slow_queries';

-- 查看临时表使用情况
SHOW STATUS LIKE 'Created_tmp_disk_tables';
SHOW STATUS LIKE 'Created_tmp_tables';

-- 查看索引使用情况
SHOW STATUS LIKE 'Handler_read%';

-- 查看缓冲池命中率(需要计算)
-- 命中率 = (1 - (innodb_buffer_pool_reads / innodb_buffer_pool_read_requests)) * 100
SELECT 
    (1 - (SUM(CASE WHEN VARIABLE_NAME = 'innodb_buffer_pool_reads' THEN VARIABLE_VALUE END) / 
          SUM(CASE WHEN VARIABLE_NAME = 'innodb_buffer_pool_read_requests' THEN VARIABLE_VALUE END))) * 100 AS buffer_pool_hit_rate
FROM information_schema.GLOBAL_STATUS 
WHERE VARIABLE_NAME IN ('innodb_buffer_pool_reads', 'innodb_buffer_pool_read_requests');

三、SQL语句优化

3.1 索引优化策略

索引是提升查询性能的核心。高并发场景下,合理的索引设计可以减少90%以上的性能问题。

索引设计原则

  • 最左前缀原则:复合索引必须从左到右使用
  • 覆盖索引:查询列全部在索引中,避免回表
  • 索引下推:MySQL 5.6+自动支持
  • 避免冗余索引:定期检查并删除无用索引

代码示例:索引优化实战

-- 创建用户表
CREATE TABLE users (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    status TINYINT DEFAULT 1,
    created_at DATETIME,
    updated_at DATETIME,
    age INT,
    city VARCHAR(50),
    INDEX idx_username (username),
    INDEX idx_status_created (status, created_at), -- 复合索引
    INDEX idx_city_age (city, age)
);

-- 优化前:全表扫描
EXPLAIN SELECT * FROM users WHERE status = 1 AND created_at > '2024-01-01';
-- 结果:type=ALL, rows=1000000(全表扫描)

-- 优化后:使用复合索引
EXPLAIN SELECT id, username, status, created_at FROM users WHERE status = 1 AND created_at > '2024-01-01';
-- 结果:type=range, rows=5000(使用索引)

-- 覆盖索引示例(避免回表)
EXPLAIN SELECT username FROM users WHERE status = 1;
-- 如果username在复合索引中,type=ref,Extra=Using index

-- 索引失效的反例
EXPLAIN SELECT * FROM users WHERE username LIKE '%john%'; -- 前缀模糊查询,索引失效
EXPLAIN SELECT * FROM users WHERE status + 1 = 2; -- 对索引列做运算,索引失效
EXPLAIN SELECT * FROM users WHERE YEAR(created_at) = 2024; -- 函数操作,索引失效

-- 正确的写法
EXPLAIN SELECT * FROM users WHERE username LIKE 'john%'; -- 前缀匹配,索引有效
EXPLAIN SELECT * FROM users WHERE status = 1; -- 直接比较,索引有效
EXPLAIN SELECT * FROM users WHERE created_at BETWEEN '2024-01-01' AND '2024-12-31'; -- 范围查询,索引有效

索引监控与维护

-- 查看表索引使用情况
SELECT 
    table_name,
    index_name,
    rows_read,
    rows_selected
FROM information_schema.STATISTICS
WHERE table_schema = 'your_database';

-- 查看未使用的索引(需要开启userstat)
SELECT 
    table_name,
    index_name,
    rows_read
FROM information_schema.STATISTICS
WHERE table_schema = 'your_database'
  AND index_name != 'PRIMARY'
  AND rows_read = 0;

-- 查看冗余索引(需要pt-index-usage工具)
-- pt-index-usage slow.log --host localhost --user root --password

-- 删除无用索引
DROP INDEX idx_unused ON users;

3.2 避免全表扫描

高并发下,全表扫描是性能杀手。必须确保所有查询都使用索引。

代码示例:避免全表扫描的技巧

-- 反例:导致全表扫描的查询
SELECT * FROM orders WHERE status = 'pending' AND amount > 100;
-- 如果status和amount没有索引,会全表扫描

-- 正例:创建合适的索引
CREATE INDEX idx_status_amount ON orders(status, amount);

-- 反例:OR条件导致索引失效
SELECT * FROM users WHERE username = 'john' OR email = 'john@example.com';
-- 如果只有单列索引,OR可能导致全表扫描

-- 正例:使用UNION ALL或IN
SELECT * FROM users WHERE username = 'john'
UNION ALL
SELECT * FROM users WHERE email = 'john@example.com' AND username != 'john';

-- 或者创建复合索引
CREATE INDEX idx_username_email ON users(username, email);

-- 反例:NOT IN或!=
SELECT * FROM users WHERE status != 1;
-- 通常会导致全表扫描

-- 正例:使用IN或明确范围
SELECT * FROM users WHERE status IN (0, 2, 3);
-- 或者使用覆盖索引
SELECT id FROM users WHERE status != 1;

3.3 分页查询优化

高并发分页查询容易出现性能问题,特别是深度分页。

代码示例:分页优化方案

-- 传统分页(深度分页性能差)
SELECT * FROM orders WHERE user_id = 123 ORDER BY id LIMIT 1000000, 20;
-- 扫描1000020行,只返回20行,性能极差

-- 优化方案1:延迟关联(先查ID,再查详细数据)
SELECT o.* FROM orders o
INNER JOIN (
    SELECT id FROM orders 
    WHERE user_id = 123 
    ORDER BY id 
    LIMIT 1000000, 20
) AS tmp ON o.id = tmp.id;

-- 优化方案2:记录上次ID(游标分页)
-- 第一页
SELECT * FROM orders WHERE user_id = 123 ORDER BY id LIMIT 20;
-- 返回最后一条记录的ID:last_id = 1000020

-- 第二页
SELECT * FROM orders WHERE user_id = 123 AND id > 1000020 ORDER BY id LIMIT 20;

-- 优化方案3:使用子查询
SELECT * FROM orders 
WHERE id >= (
    SELECT id FROM orders 
    WHERE user_id = 123 
    ORDER BY id 
    LIMIT 1000000, 1
)
LIMIT 20;

3.4 复杂查询拆分

高并发下,避免单条超大SQL,拆分成多个小查询。

代码示例:复杂查询拆分

-- 反例:单条复杂SQL(多表JOIN,大量数据)
SELECT 
    u.username,
    o.order_no,
    p.product_name,
    SUM(oi.quantity * oi.price) as total_amount
FROM users u
JOIN orders o ON u.id = o.user_id
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
WHERE o.status = 'completed' AND o.created_at > '2024-01-01'
GROUP BY u.id, o.id
HAVING total_amount > 1000
ORDER BY total_amount DESC
LIMIT 100;

-- 优化方案:拆分成多个查询,应用层聚合
-- 查询1:获取符合条件的订单ID
SELECT o.id FROM orders o 
WHERE o.status = 'completed' AND o.created_at > '2024-01-01';

-- 查询2:批量查询订单详情(IN查询)
SELECT * FROM order_items WHERE order_id IN (...);

-- 查询3:批量查询用户信息
SELECT * FROM users WHERE id IN (...);

-- 应用层代码处理聚合逻辑

四、事务与锁优化

4.1 事务设计原则

高并发下,事务设计直接影响锁竞争和系统吞吐量。

核心原则

  • 短事务:事务尽可能短,减少锁持有时间
  • 小事务:单个事务操作数据量要小
  • 顺序操作:避免死锁,按相同顺序访问数据

代码示例:事务优化

// 反例:长事务(持有锁时间过长)
@Transactional
public void processOrder(Long orderId) {
    // 1. 查询订单(加共享锁)
    Order order = orderRepository.findById(orderId);
    
    // 2. 调用外部API(耗时操作,锁一直持有)
    paymentService.validatePayment(order.getPaymentId());
    
    // 3. 更新库存(排他锁)
    inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
    
    // 4. 更新订单状态
    order.setStatus("COMPLETED");
    orderRepository.save(order);
}

// 正例:拆分事务,缩短锁持有时间
public void processOrder(Long orderId) {
    // 1. 先执行非数据库操作(无锁)
    paymentService.validatePayment(order.getPaymentId());
    
    // 2. 短事务处理数据库操作
    transactionTemplate.execute(status -> {
        try {
            // 查询并锁定订单
            Order order = orderRepository.findByIdForUpdate(orderId);
            
            // 扣减库存(在事务内)
            inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
            
            // 更新订单
            order.setStatus("COMPLETED");
            orderRepository.save(order);
            
            return null;
        } catch (Exception e) {
            status.setRollbackOnly();
            throw e;
        }
    });
}

4.2 锁优化策略

InnoDB锁类型

  • 共享锁(S锁):读锁,多个事务可以同时持有
  • 排他锁(X锁):写锁,只有一个事务可以持有
  • 意向锁:表级锁,用于指示事务将要加的行锁类型

代码示例:避免锁竞争

-- 场景:秒杀系统库存扣减

-- 反例:先查询再更新(存在竞态条件)
SELECT stock FROM products WHERE id = 100;
-- 如果stock=1,两个事务同时查询都得到1,然后都执行UPDATE,导致库存为-1

-- 正例1:使用乐观锁(版本号)
UPDATE products 
SET stock = stock - 1, version = version + 1 
WHERE id = 100 AND stock > 0 AND version = #{oldVersion};
-- 如果更新失败,应用层重试

-- 正例2:使用悲观锁(SELECT FOR UPDATE)
BEGIN;
SELECT stock FROM products WHERE id = 100 FOR UPDATE;
-- 其他事务在此等待
UPDATE products SET stock = stock - 1 WHERE id = 100;
COMMIT;

-- 正例3:直接更新(推荐,性能最好)
UPDATE products SET stock = stock - 1 WHERE id = 100 AND stock > 0;
-- 影响行数=0表示库存不足,影响行数=1表示成功

-- 查看锁信息
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;
SELECT * FROM information_schema.INNODB_TRX;

4.3 死锁检测与处理

高并发下死锁不可避免,需要有检测和重试机制。

代码示例:死锁重试机制

@Service
public class DeadlockRetryService {
    
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_DELAY_MS = 100;
    
    @Transactional
    public <T> T executeWithRetry(Supplier<T> operation) {
        int retryCount = 0;
        while (retryCount < MAX_RETRIES) {
            try {
                return operation.get();
            } catch (DeadlockException e) {
                retryCount++;
                if (retryCount >= MAX_RETRIES) {
                    throw e;
                }
                // 指数退避
                long delay = RETRY_DELAY_MS * retryCount;
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Operation interrupted", ie);
                }
            }
        }
        throw new RuntimeException("Max retries exceeded");
    }
}

// 使用示例
@Service
public class OrderService {
    
    @Autowired
    private DeadlockRetryService retryService;
    
    public void createOrder(Order order) {
        retryService.executeWithRetry(() -> {
            // 业务逻辑
            orderRepository.save(order);
            inventoryService.decreaseStock(order.getProductId(), order.getQuantity());
            return null;
        });
    }
}

五、监控与诊断工具

5.1 慢查询日志分析

慢查询日志是发现性能问题的金矿。

配置慢查询日志

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL slow_query_log_file = '/var/log/mysql/slow.log';
SET GLOBAL long_query_time = 2; -- 2秒以上的查询记录
SET GLOBAL log_queries_not_using_indexes = 'ON';

-- 查看配置
SHOW VARIABLES LIKE 'slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

分析慢查询日志

# 使用mysqldumpslow分析
mysqldumpslow -s t -t 10 /var/log/mysql/slow.log

# 使用pt-query-digest分析(更强大)
pt-query-digest /var/log/mysql/slow.log > slow_report.txt

# 输出示例:
# Overall: 1.20M total, 12.34QPS, 0.02ms avg
# Profile: Rank Query_id           Response_time    Calls R/Call V/M   Item
# ==== ================== ================ ====== ====== ===== ====
#    1 0xF9A57DD5A4C...  521.3499 43.4%   12345 0.0422  0.02 SELECT users
#    2 0xB4C27D5A4C...   234.5678 19.5%    5678 0.0413  0.01 SELECT orders

5.2 Performance Schema监控

MySQL 5.6+提供的性能监控工具。

代码示例:使用Performance Schema

-- 开启相关监控
UPDATE performance_schema.setup_instruments 
SET ENABLED = 'YES', TIMED = 'YES' 
WHERE NAME LIKE 'statement/%';

UPDATE performance_schema.setup_consumers 
SET ENABLED = 'YES' 
WHERE NAME IN ('events_statements_history_long', 'events_statements_current');

-- 查看最耗时的SQL
SELECT 
    DIGEST_TEXT,
    COUNT_STAR as exec_count,
    AVG_TIMER_WAIT/1000000000000 as avg_time_sec,
    MAX_TIMER_WAIT/1000000000000 as max_time_sec,
    SUM_ROWS_EXAMINED as total_rows
FROM performance_schema.events_statements_summary_by_digest
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 10;

-- 查看表I/O情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    COUNT_READ,
    COUNT_WRITE,
    SUM_NUMBER_OF_BYTES_READ,
    SUM_NUMBER_OF_BYTES_WRITE
FROM performance_schema.table_io_waits_summary_by_table
ORDER BY SUM_TIMER_WAIT DESC
LIMIT 10;

-- 查看索引使用情况
SELECT 
    OBJECT_SCHEMA,
    OBJECT_NAME,
    INDEX_NAME,
    COUNT_FETCH,
    COUNT_INSERT,
    COUNT_UPDATE,
    COUNT_DELETE
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE INDEX_NAME IS NOT NULL
ORDER BY COUNT_FETCH DESC;

5.3 实时监控脚本

代码示例:Python监控脚本

#!/usr/bin/env python3
# mysql_monitor.py

import mysql.connector
import time
import json
from datetime import datetime

class MySQLMonitor:
    def __init__(self, host, user, password, database='mysql'):
        self.connection = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
    
    def get_status(self, pattern=None):
        """获取MySQL状态变量"""
        cursor = self.connection.cursor()
        if pattern:
            cursor.execute("SHOW STATUS LIKE %s", (pattern,))
        else:
            cursor.execute("SHOW STATUS")
        return dict(cursor.fetchall())
    
    def get_variables(self, pattern=None):
        """获取MySQL配置变量"""
        cursor = self.connection.cursor()
        if pattern:
            cursor.execute("SHOW VARIABLES LIKE %s", (pattern,))
        else:
            cursor.execute("SHOW VARIABLES")
        return dict(cursor.fetchall())
    
    def get_processlist(self):
        """获取当前连接列表"""
        cursor = self.connection.cursor(dictionary=True)
        cursor.execute("SHOW PROCESSLIST")
        return cursor.fetchall()
    
    def get_innodb_status(self):
        """获取InnoDB状态"""
        cursor = self.connection.cursor()
        cursor.execute("SHOW ENGINE INNODB STATUS")
        result = cursor.fetchone()
        return result[2] if result else ""
    
    def check_performance(self):
        """性能检查"""
        status = self.get_status()
        variables = self.get_variables()
        
        # 计算缓冲池命中率
        buffer_pool_reads = int(status.get('Innodb_buffer_pool_reads', 0))
        buffer_pool_read_requests = int(status.get('Innodb_buffer_pool_read_requests', 0))
        
        if buffer_pool_read_requests > 0:
            hit_rate = (1 - buffer_pool_reads / buffer_pool_read_requests) * 100
        else:
            hit_rate = 100
        
        # 检查连接数
        threads_connected = int(status.get('Threads_connected', 0))
        max_connections = int(variables.get('max_connections', 0))
        connection_usage = (threads_connected / max_connections) * 100
        
        # 检查慢查询
        slow_queries = int(status.get('Slow_queries', 0))
        
        # 检查临时表
        tmp_disk_tables = int(status.get('Created_tmp_disk_tables', 0))
        tmp_tables = int(status.get('Created_tmp_tables', 0))
        tmp_disk_ratio = (tmp_disk_tables / tmp_tables * 100) if tmp_tables > 0 else 0
        
        return {
            'timestamp': datetime.now().isoformat(),
            'buffer_pool_hit_rate': round(hit_rate, 2),
            'connection_usage': round(connection_usage, 2),
            'threads_connected': threads_connected,
            'slow_queries': slow_queries,
            'tmp_disk_ratio': round(tmp_disk_ratio, 2),
            'tmp_tables': tmp_tables,
            'tmp_disk_tables': tmp_disk_tables
        }
    
    def get_top_queries(self, limit=5):
        """获取最耗资源的查询"""
        cursor = self.connection.cursor(dictionary=True)
        cursor.execute("""
            SELECT 
                ID,
                USER,
                HOST,
                DB,
                COMMAND,
                TIME,
                STATE,
                INFO
            FROM information_schema.PROCESSLIST
            WHERE TIME > 5
            ORDER BY TIME DESC
            LIMIT %s
        """, (limit,))
        return cursor.fetchall()
    
    def monitor_continuously(self, interval=60):
        """持续监控"""
        print(f"开始监控MySQL性能,间隔: {interval}秒")
        print("-" * 80)
        
        while True:
            try:
                # 性能指标
                perf = self.check_performance()
                print(f"[{perf['timestamp']}]")
                print(f"  缓冲池命中率: {perf['buffer_pool_hit_rate']}%")
                print(f"  连接使用率: {perf['connection_usage']}% ({perf['threads_connected']})")
                print(f"  慢查询数: {perf['slow_queries']}")
                print(f"  临时表磁盘比率: {perf['tmp_disk_ratio']}%")
                
                # 顶部查询
                top_queries = self.get_top_queries(3)
                if top_queries:
                    print("  长时间运行的查询:")
                    for query in top_queries:
                        print(f"    ID:{query['ID']} USER:{query['USER']} TIME:{query['TIME']}s")
                        if query['INFO']:
                            print(f"    SQL: {query['INFO'][:100]}...")
                
                print("-" * 80)
                time.sleep(interval)
                
            except Exception as e:
                print(f"监控错误: {e}")
                time.sleep(10)

# 使用示例
if __name__ == '__main__':
    monitor = MySQLMonitor(
        host='localhost',
        user='monitor',
        password='password'
    )
    
    # 单次检查
    print(json.dumps(monitor.check_performance(), indent=2))
    
    # 持续监控
    # monitor.monitor_continuously(interval=30)

六、常见瓶颈问题解决方案

6.1 连接数耗尽

问题现象ERROR 1040 (08004): Too many connections

解决方案

-- 1. 临时增加连接数(重启失效)
SET GLOBAL max_connections = 2000;

-- 2. 查看当前连接状态
SHOW STATUS LIKE 'Threads_connected';
SHOW STATUS LIKE 'Max_used_connections';

-- 3. 查看连接详情
SELECT 
    USER,
    HOST,
    DB,
    COMMAND,
    COUNT(*) as conn_count
FROM information_schema.PROCESSLIST
GROUP BY USER, HOST, DB, COMMAND
ORDER BY conn_count DESC;

-- 4. 杀掉空闲连接(谨慎操作)
SELECT CONCAT('KILL ', id, ';') 
FROM information_schema.PROCESSLIST 
WHERE COMMAND = 'Sleep' AND TIME > 600;

-- 5. 应用层优化:连接池配置
-- HikariCP配置示例
# application.yml
spring:
  datasource:
    hikari:
      maximum-pool-size: 50
      minimum-idle: 10
      connection-timeout: 30000
      idle-timeout: 600000
      max-lifetime: 1800000
      leak-detection-threshold: 60000

6.2 锁等待超时

问题现象ERROR 1205 (HY000): Lock wait timeout exceeded

解决方案

-- 1. 查看当前锁等待
SELECT 
    r.trx_id waiting_trx_id,
    r.trx_mysql_thread_id waiting_thread,
    r.trx_query waiting_query,
    b.trx_id blocking_trx_id,
    b.trx_mysql_thread_id blocking_thread,
    b.trx_query blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;

-- 2. 查看长时间运行的事务
SELECT 
    trx_id,
    trx_started,
    trx_mysql_thread_id,
    trx_query,
    TIMESTAMPDIFF(SECOND, trx_started, NOW()) as duration_seconds
FROM information_schema.INNODB_TRX
WHERE trx_started < DATE_SUB(NOW(), INTERVAL 5 MINUTE);

-- 3. 杀掉阻塞事务(谨慎操作)
KILL [blocking_thread_id];

-- 4. 应用层优化:设置合理的锁等待超时
SET GLOBAL innodb_lock_wait_timeout = 50;
-- 在事务中设置
SET SESSION innodb_lock_wait_timeout = 10;

6.3 慢查询导致CPU飙升

问题现象:CPU使用率持续90%以上,慢查询日志激增

解决方案

-- 1. 实时查看CPU消耗最高的查询
SELECT 
    THREAD_ID,
    PROCESSLIST_ID,
    PROCESSLIST_USER,
    PROCESSLIST_HOST,
    PROCESSLIST_DB,
    PROCESSLIST_COMMAND,
    PROCESSLIST_TIME,
    PROCESSLIST_INFO,
    SUM(CURRENT_NUMBER_OF_BYTES_USED) as memory_used
FROM performance_schema.threads
WHERE PROCESSLIST_INFO IS NOT NULL
ORDER BY PROCESSLIST_TIME DESC;

-- 2. 查看当前执行计划
EXPLAIN ANALYZE SELECT * FROM large_table WHERE status = 1;

-- 3. 临时kill高CPU查询
SELECT CONCAT('KILL ', PROCESSLIST_ID, ';') 
FROM performance_schema.threads
WHERE PROCESSLIST_TIME > 30 
  AND PROCESSLIST_COMMAND = 'Query';

-- 4. 应用层限流
-- 使用Guava RateLimiter
RateLimiter rateLimiter = RateLimiter.create(10.0); // 每秒10个请求

public void queryWithLimit(String sql) {
    if (rateLimiter.tryAcquire()) {
        // 执行查询
        jdbcTemplate.query(sql, ...);
    } else {
        throw new RateLimitException("查询过于频繁");
    }
}

6.4 主从复制延迟

问题现象:从库数据落后于主库,读写分离时数据不一致

解决方案

-- 1. 查看复制延迟
SHOW SLAVE STATUS\G
-- 关注:Seconds_Behind_Master

-- 2. 查看主库binlog位置
SHOW MASTER STATUS;

-- 3. 查看从库IO线程状态
SHOW SLAVE STATUS\G
-- Slave_IO_Running: Yes
-- Slave_SQL_Running: Yes

-- 4. 优化复制参数(主库)
SET GLOBAL binlog_cache_size = 4M;
SET GLOBAL sync_binlog = 1;

-- 5. 优化复制参数(从库)
SET GLOBAL slave_parallel_workers = 4; -- 并行复制
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';

-- 6. 应用层解决方案:强制读主库
-- 对于刚写入的数据,短时间内从主库读取
public User getUserAfterWrite(Long id, Long writeTimestamp) {
    if (System.currentTimeMillis() - writeTimestamp < 1000) {
        // 1秒内从主库读
        return getUserFromMaster(id);
    } else {
        return getUserFromSlave(id);
    }
}

七、高级优化技巧

7.1 使用覆盖索引避免回表

代码示例

-- 表结构
CREATE TABLE user_profile (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    nickname VARCHAR(50),
    avatar VARCHAR(200),
    bio TEXT,
    created_at DATETIME,
    INDEX idx_user_id (user_id)
);

-- 反例:需要回表
SELECT nickname, avatar, bio 
FROM user_profile 
WHERE user_id = 123;
-- 需要扫描idx_user_id找到id,再根据id回表查询所有列

-- 正例:创建覆盖索引
ALTER TABLE user_profile ADD INDEX idx_user_cover (user_id, nickname, avatar, bio);

-- 现在查询只需要扫描索引
EXPLAIN SELECT nickname, avatar, bio 
FROM user_profile 
WHERE user_id = 123;
-- Extra: Using index

7.2 索引下推(ICP)

MySQL 5.6+自动支持,减少回表次数。

代码示例

-- 表结构
CREATE TABLE orders (
    id BIGINT PRIMARY KEY,
    user_id BIGINT,
    status VARCHAR(20),
    amount DECIMAL(10,2),
    INDEX idx_user_status (user_id, status)
);

-- 索引下推示例
SELECT * FROM orders 
WHERE user_id = 123 AND status LIKE 'pending%';

-- MySQL 5.6+会自动在索引层面过滤status,减少回表次数
-- 在EXPLAIN结果中,Extra列会显示"Using index condition"

7.3 批量操作优化

代码示例

-- 反例:逐条插入(性能差)
INSERT INTO orders (user_id, amount) VALUES (1, 100.00);
INSERT INTO orders (user_id, amount) VALUES (2, 200.00);
INSERT INTO orders (user_id, amount) VALUES (3, 300.00);

-- 正例:批量插入
INSERT INTO orders (user_id, amount) VALUES 
(1, 100.00),
(2, 200.00),
(3, 300.00);

-- 反例:逐条更新
UPDATE products SET stock = stock - 1 WHERE id = 1;
UPDATE products SET stock = stock - 1 WHERE id = 2;
UPDATE products SET stock = stock - 1 WHERE id = 3;

-- 正例:批量更新
UPDATE products 
SET stock = stock - 1 
WHERE id IN (1, 2, 3);

-- 或者使用CASE
UPDATE products 
SET stock = CASE 
    WHEN id = 1 THEN stock - 1
    WHEN id = 2 THEN stock - 2
    WHEN id = 3 THEN stock - 3
END
WHERE id IN (1, 2, 3);

7.4 避免大字段查询

代码示例

-- 表结构(包含TEXT字段)
CREATE TABLE articles (
    id BIGINT PRIMARY KEY,
    title VARCHAR(200),
    content TEXT, -- 大字段
    summary VARCHAR(500),
    created_at DATETIME,
    INDEX idx_created (created_at)
);

-- 反例:查询大字段
SELECT * FROM articles WHERE created_at > '2024-01-01';
-- 会读取content字段,占用大量I/O和内存

-- 正例:只查询需要的字段
SELECT id, title, summary, created_at 
FROM articles 
WHERE created_at > '2024-01-01';

-- 如果需要content,单独查询
SELECT content FROM articles WHERE id = ?;

八、总结与最佳实践

8.1 高并发优化检查清单

架构层面

  • [ ] 是否实施读写分离
  • [ ] 是否考虑分库分表
  • [ ] 是否引入缓存层(Redis)
  • [ ] 是否使用消息队列削峰

配置层面

  • [ ] innodb_buffer_pool_size是否合理(物理内存50-70%)
  • [ ] max_connections是否足够
  • [ ] innodb_log_file_size是否合适
  • [ ] 慢查询日志是否开启

SQL层面

  • [ ] 所有查询都有合适的索引
  • [ ] 避免SELECT *,只查询需要的字段
  • [ ] 避免深度分页
  • [ ] 复杂查询是否拆分

事务层面

  • [ ] 事务是否短小
  • [ ] 是否按相同顺序访问数据
  • [ ] 是否有死锁重试机制

监控层面

  • [ ] 是否监控慢查询
  • [ ] 是否监控连接数
  • [ ] 是否监控锁等待
  • [ ] 是否监控主从延迟

8.2 性能优化黄金法则

  1. 测量优先:不要猜测,用数据说话
  2. 二八原则:80%的性能问题由20%的查询导致
  3. 分层优化:从架构→配置→SQL逐层优化
  4. 持续改进:性能优化是持续过程,不是一次性工作

8.3 推荐工具

  • 慢查询分析:pt-query-digest
  • 索引分析:pt-index-usage
  • 表结构优化:pt-online-schema-change
  • 死锁分析:innodb_lock_waits
  • 实时监控:Percona Monitoring and Management (PMM)

通过以上策略的综合应用,可以有效解决MySQL高并发场景下的性能瓶颈问题,构建稳定、高效的数据库系统。记住,优化是一个持续的过程,需要根据业务发展和数据增长不断调整策略。