引言

在现代应用架构中,数据是企业的核心资产。MongoDB作为一款流行的NoSQL数据库,广泛应用于各种规模的项目中。然而,与传统关系型数据库不同,MongoDB的备份策略需要考虑其分布式特性、灵活的数据模型以及独特的存储引擎。本文将从基础概念出发,逐步深入到高级实战技巧,为您提供一份全面的MongoDB备份策略指南。

一、MongoDB备份基础概念

1.1 为什么需要备份?

数据丢失可能由多种原因造成:

  • 硬件故障:磁盘损坏、服务器宕机
  • 人为错误:误删数据、错误的更新操作
  • 软件故障:数据库崩溃、bug导致的数据损坏
  • 安全威胁:勒索软件攻击、恶意删除
  • 灾难恢复:自然灾害、数据中心故障

1.2 MongoDB备份的特点

与传统关系型数据库相比,MongoDB备份有以下特点:

  • 文档级备份:可以备份单个集合或数据库
  • 增量备份:支持基于oplog的增量备份
  • 分布式备份:支持副本集和分片集群的备份
  • 灵活的恢复粒度:可以恢复到特定时间点

二、基础备份方法

2.1 mongodump工具

mongodump是MongoDB官方提供的备份工具,它以BSON格式导出数据。

2.1.1 基本用法

# 备份整个数据库
mongodump --host localhost --port 27017 --db mydb --out /backup/mongodb/

# 备份指定集合
mongodump --host localhost --port 27017 --db mydb --collection users --out /backup/mongodb/

# 带认证的备份
mongodump --host localhost --port 27017 --username admin --password password --authenticationDatabase admin --db mydb --out /backup/mongodb/

2.1.2 增量备份

MongoDB的增量备份基于oplog(操作日志)。首先需要启用oplog:

// 在副本集配置中启用oplog
rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" }
  ]
})

// 检查oplog大小
db.getReplicationInfo()

然后使用mongodump--oplog选项:

# 创建基础备份
mongodump --host localhost --port 27017 --oplog --out /backup/mongodb/base/

# 后续增量备份(基于时间点)
mongodump --host localhost --port 27017 --oplog --out /backup/mongodb/incremental/ --query '{"ts": {"$gte": Timestamp(1625097600, 1)}}'

2.2 文件系统备份

对于单节点MongoDB,可以直接备份数据目录:

# 停止MongoDB服务
sudo systemctl stop mongod

# 复制数据目录
sudo cp -r /var/lib/mongodb /backup/mongodb/data/

# 启动MongoDB服务
sudo systemctl start mongod

注意:这种方法需要停机,不适合生产环境。

2.3 MongoDB Atlas云备份

如果您使用MongoDB Atlas,可以利用其内置的备份功能:

// 在Atlas控制台中配置备份策略
// 1. 进入集群设置
// 2. 选择"Backup"选项卡
// 3. 配置备份频率和保留策略
// 4. 启用点时间恢复(PITR)

三、高级备份策略

3.1 副本集备份策略

在副本集中,备份应该在Secondary节点上进行,以避免影响Primary节点的性能。

3.1.1 备份Secondary节点

# 连接到Secondary节点进行备份
mongodump --host secondary-host:27017 --port 27017 --db mydb --out /backup/mongodb/

3.1.2 使用备份工具

推荐使用专业的备份工具,如Percona Backup for MongoDB

# 安装Percona Backup for MongoDB
sudo apt-get install percona-backup-mongodb

# 配置备份策略
cat > /etc/percona-backup-mongodb/backup.yaml << EOF
storage:
  type: filesystem
  filesystem:
    path: /backup/mongodb
scheduling:
  fullBackup:
    schedule: "0 2 * * *"  # 每天凌晨2点
    retention: 7d
  incrementalBackup:
    schedule: "0 */6 * * *"  # 每6小时
    retention: 3d
EOF

# 启动备份服务
sudo systemctl start pmbm-agent

3.2 分片集群备份

分片集群的备份需要考虑多个分片和配置服务器。

3.2.1 备份配置服务器

# 备份配置服务器副本集
mongodump --host config-server:27019 --db config --out /backup/mongodb/config/

3.2.2 备份分片

# 为每个分片执行备份
for shard in shard1 shard2 shard3; do
  mongodump --host ${shard}:27018 --db mydb --out /backup/mongodb/${shard}/
done

3.2.3 使用MongoDB Ops Manager

MongoDB Ops Manager提供了企业级的备份解决方案:

// 配置Ops Manager备份策略
// 1. 安装Ops Manager Agent
// 2. 配置备份存储(S3、NFS等)
// 3. 设置备份计划
// 4. 配置告警和监控

3.3 点时间恢复(PITR)

点时间恢复允许恢复到特定时间点,对于误操作恢复至关重要。

3.3.1 启用PITR

// 在副本集配置中启用PITR
rs.reconfig({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" }
  ],
  settings: {
    chainingAllowed: true,
    heartbeatIntervalMillis: 2000,
    electionTimeoutMillis: 10000,
    catchUpTimeoutMillis: 60000,
    catchUpTakeoverDelayMillis: 30000,
    heartbeatIntervalSecs: 10,
    electionTimeoutSecs: 10,
    catchUpTimeoutSecs: 60,
    catchUpTakeoverDelaySecs: 30
  }
})

3.3.2 执行PITR恢复

# 1. 准备基础备份
mongodump --host localhost --port 27017 --oplog --out /backup/mongodb/base/

# 2. 应用oplog到特定时间点
mongorestore --host localhost --port 27017 --oplogReplay --oplogLimit "1625097600:1" /backup/mongodb/base/

四、备份验证与测试

4.1 备份验证的重要性

备份只有在成功恢复时才真正有效。定期验证备份是确保业务连续性的关键。

4.2 自动化验证脚本

#!/usr/bin/env python3
"""
MongoDB备份验证脚本
"""

import subprocess
import json
import time
from datetime import datetime

class BackupValidator:
    def __init__(self, backup_path, test_db_name="test_restore"):
        self.backup_path = backup_path
        self.test_db_name = test_db_name
        self.test_host = "localhost"
        self.test_port = 27018  # 使用不同端口避免冲突
    
    def restore_to_test(self):
        """将备份恢复到测试环境"""
        try:
            # 启动临时MongoDB实例
            cmd = [
                "mongod",
                "--dbpath", "/tmp/mongodb_test",
                "--port", str(self.test_port),
                "--bind_ip", "127.0.0.1",
                "--fork",
                "--logpath", "/tmp/mongodb_test.log"
            ]
            subprocess.run(cmd, check=True)
            
            # 等待服务启动
            time.sleep(5)
            
            # 恢复备份
            restore_cmd = [
                "mongorestore",
                "--host", self.test_host,
                "--port", str(self.test_port),
                "--db", self.test_db_name,
                self.backup_path
            ]
            subprocess.run(restore_cmd, check=True)
            
            return True
        except subprocess.CalledProcessError as e:
            print(f"恢复失败: {e}")
            return False
    
    def verify_data_integrity(self):
        """验证数据完整性"""
        try:
            # 连接测试数据库
            from pymongo import MongoClient
            client = MongoClient(self.test_host, self.test_port)
            
            # 检查数据库是否存在
            db_names = client.list_database_names()
            if self.test_db_name not in db_names:
                print(f"数据库 {self.test_db_name} 不存在")
                return False
            
            # 检查集合数量
            db = client[self.test_db_name]
            collections = db.list_collection_names()
            print(f"找到 {len(collections)} 个集合")
            
            # 检查文档数量
            total_docs = 0
            for coll_name in collections:
                count = db[coll_name].count_documents({})
                total_docs += count
                print(f"集合 {coll_name}: {count} 个文档")
            
            # 检查数据一致性(示例:检查用户集合)
            if "users" in collections:
                user_count = db.users.count_documents({})
                print(f"用户总数: {user_count}")
                
                # 检查是否有重复的用户ID
                pipeline = [
                    {"$group": {"_id": "$_id", "count": {"$sum": 1}}},
                    {"$match": {"count": {"$gt": 1}}}
                ]
                duplicates = list(db.users.aggregate(pipeline))
                if duplicates:
                    print(f"发现重复的用户ID: {duplicates}")
                    return False
            
            client.close()
            return True
            
        except Exception as e:
            print(f"验证失败: {e}")
            return False
    
    def cleanup(self):
        """清理测试环境"""
        try:
            # 停止临时MongoDB实例
            subprocess.run(["mongod", "--shutdown", "--port", str(self.test_port)])
            
            # 删除临时数据
            subprocess.run(["rm", "-rf", "/tmp/mongodb_test"])
            subprocess.run(["rm", "-f", "/tmp/mongodb_test.log"])
            
            print("测试环境清理完成")
        except Exception as e:
            print(f"清理失败: {e}")
    
    def run_validation(self):
        """运行完整的验证流程"""
        print(f"开始备份验证: {self.backup_path}")
        print(f"时间: {datetime.now()}")
        
        try:
            # 1. 恢复到测试环境
            print("步骤1: 恢复备份到测试环境...")
            if not self.restore_to_test():
                return False
            
            # 2. 验证数据完整性
            print("步骤2: 验证数据完整性...")
            if not self.verify_data_integrity():
                return False
            
            # 3. 清理
            print("步骤3: 清理测试环境...")
            self.cleanup()
            
            print("备份验证成功!")
            return True
            
        except Exception as e:
            print(f"验证过程中发生错误: {e}")
            self.cleanup()
            return False

# 使用示例
if __name__ == "__main__":
    validator = BackupValidator("/backup/mongodb/latest")
    success = validator.run_validation()
    
    if success:
        print("✅ 备份验证通过")
    else:
        print("❌ 备份验证失败")
        # 这里可以添加告警逻辑

4.3 定期验证计划

#!/bin/bash
# backup_validation.sh - 每周运行一次备份验证

BACKUP_DIR="/backup/mongodb"
LOG_FILE="/var/log/mongodb_backup_validation.log"
ALERT_EMAIL="dba@example.com"

# 获取最新的完整备份
LATEST_BACKUP=$(find $BACKUP_DIR -name "mongodump-*" -type d | sort -r | head -1)

if [ -z "$LATEST_BACKUP" ]; then
    echo "未找到备份" | tee -a $LOG_FILE
    echo "未找到备份" | mail -s "MongoDB备份验证失败" $ALERT_EMAIL
    exit 1
fi

# 运行验证脚本
python3 /opt/scripts/backup_validator.py "$LATEST_BACKUP" >> $LOG_FILE 2>&1

if [ $? -eq 0 ]; then
    echo "备份验证成功" | tee -a $LOG_FILE
else
    echo "备份验证失败" | tee -a $LOG_FILE
    echo "备份验证失败,请检查日志: $LOG_FILE" | mail -s "MongoDB备份验证失败" $ALERT_EMAIL
fi

五、备份存储与管理

5.1 备份存储策略

5.1.1 3-2-1备份原则

  • 3:至少3份数据副本
  • 2:存储在2种不同介质上
  • 1:至少1份异地备份

5.1.2 备份存储方案

# 本地存储(快速恢复)
/backup/mongodb/local/

# 网络附加存储(NAS)
/backup/mongodb/nas/

# 云存储(S3、Azure Blob等)
# 使用rclone同步到云存储
rclone sync /backup/mongodb/ remote:backups/mongodb/

# 磁带备份(长期归档)
# 使用tar和mt命令
tar -czf /backup/mongodb/archive-$(date +%Y%m%d).tar.gz /backup/mongodb/latest/
mt -f /dev/st0 load
tar -czf /dev/st0 /backup/mongodb/archive-$(date +%Y%m%d).tar.gz

5.2 备份生命周期管理

#!/usr/bin/env python3
"""
备份生命周期管理脚本
"""

import os
import shutil
from datetime import datetime, timedelta
import logging

class BackupLifecycleManager:
    def __init__(self, backup_root, retention_policy):
        """
        retention_policy: {
            'daily': 7,      # 保留7天的每日备份
            'weekly': 4,     # 保留4周的每周备份
            'monthly': 12,   # 保留12个月的每月备份
            'yearly': 3      # 保留3年的年度备份
        }
        """
        self.backup_root = backup_root
        self.retention_policy = retention_policy
        self.setup_logging()
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/backup_lifecycle.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def get_backup_age(self, backup_path):
        """获取备份的年龄(天数)"""
        backup_time = datetime.fromtimestamp(os.path.getctime(backup_path))
        age = (datetime.now() - backup_time).days
        return age
    
    def classify_backup(self, backup_path):
        """分类备份类型"""
        age = self.get_backup_age(backup_path)
        
        if age <= 1:
            return 'daily'
        elif age <= 7:
            return 'weekly'
        elif age <= 30:
            return 'monthly'
        elif age <= 365:
            return 'yearly'
        else:
            return 'archive'
    
    def should_keep(self, backup_path, backup_type):
        """判断是否应该保留备份"""
        age = self.get_backup_age(backup_path)
        
        if backup_type == 'daily':
            return age < self.retention_policy['daily']
        elif backup_type == 'weekly':
            return age < self.retention_policy['weekly'] * 7
        elif backup_type == 'monthly':
            return age < self.retention_policy['monthly'] * 30
        elif backup_type == 'yearly':
            return age < self.retention_policy['yearly'] * 365
        else:
            return False
    
    def manage_backups(self):
        """管理备份生命周期"""
        self.logger.info("开始备份生命周期管理")
        
        # 遍历备份目录
        for item in os.listdir(self.backup_root):
            backup_path = os.path.join(self.backup_root, item)
            
            if not os.path.isdir(backup_path):
                continue
            
            # 分类备份
            backup_type = self.classify_backup(backup_path)
            age = self.get_backup_age(backup_path)
            
            # 判断是否保留
            if self.should_keep(backup_path, backup_type):
                self.logger.info(f"保留备份: {item} (类型: {backup_type}, 年龄: {age}天)")
            else:
                # 删除过期备份
                self.logger.warning(f"删除过期备份: {item} (类型: {backup_type}, 年龄: {age}天)")
                try:
                    shutil.rmtree(backup_path)
                    self.logger.info(f"成功删除: {item}")
                except Exception as e:
                    self.logger.error(f"删除失败: {item} - {e}")
        
        self.logger.info("备份生命周期管理完成")

# 使用示例
if __name__ == "__main__":
    # 配置保留策略
    retention_policy = {
        'daily': 7,      # 保留7天的每日备份
        'weekly': 4,     # 保留4周的每周备份
        'monthly': 12,   # 保留12个月的每月备份
        'yearly': 3      # 保留3年的年度备份
    }
    
    manager = BackupLifecycleManager('/backup/mongodb', retention_policy)
    manager.manage_backups()

六、备份自动化与监控

6.1 使用Cron定时任务

# /etc/cron.d/mongodb-backup
# 每天凌晨2点执行完整备份
0 2 * * * root /opt/scripts/mongodb_backup.sh

# 每6小时执行增量备份
0 */6 * * * root /opt/scripts/mongodb_incremental_backup.sh

# 每周日执行备份验证
0 3 * * 0 root /opt/scripts/backup_validation.sh

# 每月1号执行备份清理
0 4 1 * * root /opt/scripts/backup_cleanup.sh

6.2 备份监控告警

#!/usr/bin/env python3
"""
备份监控与告警脚本
"""

import smtplib
import json
import requests
from datetime import datetime, timedelta
import logging

class BackupMonitor:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        
        self.setup_logging()
    
    def setup_logging(self):
        """设置日志"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/backup_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def check_backup_age(self):
        """检查备份年龄"""
        backup_dir = self.config['backup_dir']
        max_age_hours = self.config['max_backup_age_hours']
        
        # 查找最新的备份
        import os
        backups = []
        for item in os.listdir(backup_dir):
            path = os.path.join(backup_dir, item)
            if os.path.isdir(path):
                mtime = os.path.getmtime(path)
                backups.append((path, mtime))
        
        if not backups:
            return False, "未找到任何备份"
        
        # 获取最新的备份
        latest_backup = max(backups, key=lambda x: x[1])
        backup_time = datetime.fromtimestamp(latest_backup[1])
        age_hours = (datetime.now() - backup_time).total_seconds() / 3600
        
        if age_hours > max_age_hours:
            return False, f"备份过时: {age_hours:.1f}小时 (最大允许: {max_age_hours}小时)"
        
        return True, f"备份正常: {age_hours:.1f}小时"
    
    def check_backup_size(self):
        """检查备份大小"""
        backup_dir = self.config['backup_dir']
        min_size_gb = self.config['min_backup_size_gb']
        
        import os
        total_size = 0
        for item in os.listdir(backup_dir):
            path = os.path.join(backup_dir, item)
            if os.path.isdir(path):
                total_size += sum(os.path.getsize(os.path.join(dirpath, filename)) 
                                 for dirpath, dirnames, filenames in os.walk(path) 
                                 for filename in filenames)
        
        size_gb = total_size / (1024**3)
        
        if size_gb < min_size_gb:
            return False, f"备份大小异常: {size_gb:.2f}GB (最小: {min_size_gb}GB)"
        
        return True, f"备份大小正常: {size_gb:.2f}GB"
    
    def check_restore_test(self):
        """检查恢复测试结果"""
        test_log = self.config.get('restore_test_log', '/var/log/restore_test.log')
        
        try:
            with open(test_log, 'r') as f:
                lines = f.readlines()
                if lines:
                    last_line = lines[-1].strip()
                    if "成功" in last_line or "success" in last_line.lower():
                        return True, "恢复测试成功"
                    else:
                        return False, f"恢复测试失败: {last_line}"
                else:
                    return False, "恢复测试日志为空"
        except FileNotFoundError:
            return False, "恢复测试日志不存在"
    
    def send_alert(self, subject, message, level="warning"):
        """发送告警"""
        alert_method = self.config.get('alert_method', 'email')
        
        if alert_method == 'email':
            self.send_email_alert(subject, message, level)
        elif alert_method == 'slack':
            self.send_slack_alert(subject, message, level)
        elif alert_method == 'webhook':
            self.send_webhook_alert(subject, message, level)
    
    def send_email_alert(self, subject, message, level):
        """发送邮件告警"""
        try:
            smtp_config = self.config['smtp']
            msg = f"Subject: [{level.upper()}] {subject}\n\n{message}"
            
            server = smtplib.SMTP(smtp_config['host'], smtp_config['port'])
            server.starttls()
            server.login(smtp_config['username'], smtp_config['password'])
            server.sendmail(smtp_config['from'], smtp_config['to'], msg)
            server.quit()
            
            self.logger.info(f"邮件告警已发送: {subject}")
        except Exception as e:
            self.logger.error(f"发送邮件告警失败: {e}")
    
    def send_slack_alert(self, subject, message, level):
        """发送Slack告警"""
        try:
            webhook_url = self.config['slack_webhook']
            payload = {
                "text": f"*[{level.upper()}] {subject}*\n{message}",
                "username": "MongoDB Backup Monitor",
                "icon_emoji": ":database:"
            }
            requests.post(webhook_url, json=payload)
            self.logger.info(f"Slack告警已发送: {subject}")
        except Exception as e:
            self.logger.error(f"发送Slack告警失败: {e}")
    
    def send_webhook_alert(self, subject, message, level):
        """发送Webhook告警"""
        try:
            webhook_url = self.config['webhook_url']
            payload = {
                "event": "backup_alert",
                "level": level,
                "subject": subject,
                "message": message,
                "timestamp": datetime.now().isoformat()
            }
            requests.post(webhook_url, json=payload)
            self.logger.info(f"Webhook告警已发送: {subject}")
        except Exception as e:
            self.logger.error(f"发送Webhook告警失败: {e}")
    
    def run_monitoring(self):
        """运行监控"""
        self.logger.info("开始备份监控")
        
        alerts = []
        
        # 检查备份年龄
        age_ok, age_msg = self.check_backup_age()
        if not age_ok:
            alerts.append(("备份年龄异常", age_msg, "critical"))
        
        # 检查备份大小
        size_ok, size_msg = self.check_backup_size()
        if not size_ok:
            alerts.append(("备份大小异常", size_msg, "warning"))
        
        # 检查恢复测试
        restore_ok, restore_msg = self.check_restore_test()
        if not restore_ok:
            alerts.append(("恢复测试失败", restore_msg, "critical"))
        
        # 发送告警
        if alerts:
            for subject, message, level in alerts:
                self.send_alert(subject, message, level)
        else:
            self.logger.info("所有检查通过,无告警")
        
        return len(alerts) == 0

# 使用示例
if __name__ == "__main__":
    monitor = BackupMonitor('/etc/backup_monitor_config.json')
    success = monitor.run_monitoring()
    
    if success:
        print("✅ 监控检查通过")
    else:
        print("❌ 监控检查发现异常")

七、灾难恢复计划

7.1 灾难恢复场景

7.1.1 场景1:单节点故障

# 1. 停止故障节点
sudo systemctl stop mongod

# 2. 从备份恢复
mongorestore --host new-host:27017 --port 27017 --db mydb /backup/mongodb/latest/mydb/

# 3. 重新配置副本集(如果是副本集)
rs.reconfig({
  _id: "rs0",
  members: [
    { _id: 0, host: "new-host:27017" }
  ]
})

7.1.2 场景2:整个数据中心故障

# 1. 在新数据中心启动MongoDB
mongod --dbpath /data/mongodb --replSet rs0 --bind_ip_all

# 2. 恢复配置服务器
mongorestore --host config-server:27019 --db config /backup/mongodb/config/

# 3. 恢复分片数据
for shard in shard1 shard2 shard3; do
  mongorestore --host ${shard}:27018 --db mydb /backup/mongodb/${shard}/
done

# 4. 重新平衡分片
sh.startBalancer()

7.2 灾难恢复演练

#!/usr/bin/env python3
"""
灾难恢复演练脚本
"""

import subprocess
import time
import json
from datetime import datetime

class DisasterRecoveryDrill:
    def __init__(self, config_file):
        with open(config_file, 'r') as f:
            self.config = json.load(f)
        
        self.drill_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.results = {}
    
    def simulate_failure(self):
        """模拟故障"""
        print(f"[{self.drill_id}] 模拟故障场景: {self.config['failure_scenario']}")
        
        if self.config['failure_scenario'] == 'single_node':
            # 模拟单节点故障
            print("模拟单节点故障...")
            # 这里可以实际停止MongoDB服务
            # subprocess.run(["sudo", "systemctl", "stop", "mongod"])
        
        elif self.config['failure_scenario'] == 'data_center':
            # 模拟数据中心故障
            print("模拟数据中心故障...")
            # 这里可以模拟网络隔离
        
        elif self.config['failure_scenario'] == 'data_corruption':
            # 模拟数据损坏
            print("模拟数据损坏...")
            # 这里可以模拟删除数据文件
            # subprocess.run(["rm", "-rf", "/var/lib/mongodb/*"])
    
    def execute_recovery(self):
        """执行恢复"""
        print(f"[{self.drill_id}] 开始执行恢复流程")
        
        start_time = time.time()
        
        try:
            # 1. 恢复配置服务器
            print("步骤1: 恢复配置服务器...")
            config_restore_cmd = [
                "mongorestore",
                "--host", self.config['config_server'],
                "--db", "config",
                self.config['backup_path'] + "/config/"
            ]
            subprocess.run(config_restore_cmd, check=True)
            
            # 2. 恢复分片数据
            print("步骤2: 恢复分片数据...")
            for shard in self.config['shards']:
                shard_restore_cmd = [
                    "mongorestore",
                    "--host", shard,
                    "--db", self.config['database'],
                    self.config['backup_path'] + f"/{shard}/"
                ]
                subprocess.run(shard_restore_cmd, check=True)
            
            # 3. 重新平衡
            print("步骤3: 重新平衡分片...")
            balance_cmd = [
                "mongo",
                "--eval", "sh.startBalancer()"
            ]
            subprocess.run(balance_cmd, check=True)
            
            # 4. 验证恢复
            print("步骤4: 验证恢复...")
            verify_cmd = [
                "mongo",
                "--eval", f"db.getSiblingDB('{self.config['database']}').stats()"
            ]
            result = subprocess.run(verify_cmd, capture_output=True, text=True)
            
            recovery_time = time.time() - start_time
            
            self.results = {
                "drill_id": self.drill_id,
                "scenario": self.config['failure_scenario'],
                "recovery_time_seconds": recovery_time,
                "status": "success",
                "verification_output": result.stdout,
                "timestamp": datetime.now().isoformat()
            }
            
            print(f"[{self.drill_id}] 恢复完成,耗时: {recovery_time:.2f}秒")
            
        except subprocess.CalledProcessError as e:
            self.results = {
                "drill_id": self.drill_id,
                "scenario": self.config['failure_scenario'],
                "recovery_time_seconds": time.time() - start_time,
                "status": "failed",
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
            print(f"[{self.drill_id}] 恢复失败: {e}")
    
    def save_results(self):
        """保存演练结果"""
        results_file = f"/var/log/drill_results_{self.drill_id}.json"
        with open(results_file, 'w') as f:
            json.dump(self.results, f, indent=2)
        
        print(f"演练结果已保存到: {results_file}")
        
        # 发送报告
        if self.config.get('send_report', False):
            self.send_report()
    
    def send_report(self):
        """发送演练报告"""
        import smtplib
        from email.mime.text import MIMEText
        
        subject = f"MongoDB灾难恢复演练报告 - {self.drill_id}"
        body = f"""
        灾难恢复演练报告
        =================
        
        演练ID: {self.drill_id}
        场景: {self.results.get('scenario', 'N/A')}
        状态: {self.results.get('status', 'N/A')}
        恢复时间: {self.results.get('recovery_time_seconds', 0):.2f}秒
        时间: {self.results.get('timestamp', 'N/A')}
        
        详细信息:
        {json.dumps(self.results, indent=2)}
        """
        
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.config['report_from']
        msg['To'] = ', '.join(self.config['report_to'])
        
        try:
            server = smtplib.SMTP(self.config['smtp_host'], self.config['smtp_port'])
            server.starttls()
            server.login(self.config['smtp_username'], self.config['smtp_password'])
            server.send_message(msg)
            server.quit()
            print("演练报告已发送")
        except Exception as e:
            print(f"发送报告失败: {e}")
    
    def run_drill(self):
        """运行完整的演练"""
        print(f"开始灾难恢复演练: {self.drill_id}")
        print(f"场景: {self.config['failure_scenario']}")
        
        # 1. 模拟故障
        self.simulate_failure()
        
        # 2. 执行恢复
        self.execute_recovery()
        
        # 3. 保存结果
        self.save_results()
        
        print(f"灾难恢复演练完成: {self.drill_id}")

# 使用示例
if __name__ == "__main__":
    drill = DisasterRecoveryDrill('/etc/disaster_recovery_config.json')
    drill.run_drill()

八、最佳实践总结

8.1 备份策略建议

  1. 频率

    • 完整备份:每天一次
    • 增量备份:每4-6小时一次
    • 事务日志:实时(如果启用PITR)
  2. 保留策略

    • 每日备份:保留7天
    • 每周备份:保留4周
    • 每月备份:保留12个月
    • 年度备份:保留3年
  3. 存储策略

    • 本地存储:用于快速恢复
    • 异地存储:用于灾难恢复
    • 云存储:用于长期归档

8.2 监控与告警

  1. 关键指标监控

    • 备份年龄
    • 备份大小
    • 恢复测试结果
    • 备份成功率
  2. 告警阈值

    • 备份年龄 > 24小时:告警
    • 备份大小异常:告警
    • 恢复测试失败:紧急告警

8.3 安全考虑

  1. 备份加密

    # 使用GPG加密备份
    tar -czf - /backup/mongodb/latest/ | gpg --encrypt --recipient dba@example.com > /backup/mongodb/latest.tar.gz.gpg
    
  2. 访问控制

    • 限制备份目录的访问权限
    • 使用专用的备份用户
    • 定期轮换备份凭证

8.4 性能优化

  1. 备份时机

    • 在业务低峰期执行备份
    • 避免在主节点备份
    • 使用Secondary节点进行备份
  2. 资源管理

    • 限制备份带宽使用
    • 使用压缩减少存储空间
    • 定期清理旧备份

九、常见问题与解决方案

9.1 备份失败

问题:备份过程中出现”connection refused”错误

解决方案

# 检查MongoDB服务状态
sudo systemctl status mongod

# 检查防火墙设置
sudo ufw status

# 检查MongoDB配置文件
cat /etc/mongod.conf | grep bind_ip

9.2 恢复失败

问题:恢复时出现”namespace mismatch”错误

解决方案

# 检查备份版本与目标版本是否兼容
mongod --version

# 使用--nsFrom和--nsTo参数重命名命名空间
mongorestore --nsFrom "mydb.*" --nsTo "mydb_restored.*" /backup/mongodb/

9.3 备份空间不足

问题:备份目录磁盘空间不足

解决方案

# 检查磁盘使用情况
df -h /backup

# 清理旧备份
find /backup/mongodb -type d -mtime +30 -exec rm -rf {} \;

# 压缩旧备份
find /backup/mongodb -type d -mtime +7 -exec tar -czf {}.tar.gz {} \; -exec rm -rf {} \;

十、总结

MongoDB备份策略需要根据具体的业务需求、数据规模和架构特点来制定。从基础的mongodump工具到高级的自动化备份系统,从单节点备份到分布式集群备份,每种方案都有其适用场景。

关键要点

  1. 定期备份:根据业务需求制定合理的备份频率
  2. 验证备份:定期测试备份的可恢复性
  3. 多重存储:遵循3-2-1备份原则
  4. 自动化:减少人为错误,提高可靠性
  5. 监控告警:及时发现并解决问题
  6. 定期演练:确保灾难恢复计划的有效性

通过本文介绍的策略和工具,您可以构建一个健壮、可靠的MongoDB备份系统,为业务连续性提供坚实保障。记住,备份不是目的,而是手段——真正的目标是确保在需要时能够快速、准确地恢复数据。