引言:为什么MongoDB备份至关重要
在现代应用架构中,MongoDB作为领先的NoSQL数据库,承载着大量关键业务数据。然而,许多开发者和DBA往往低估了备份的重要性,直到发生数据丢失或系统故障时才追悔莫及。一个完善的备份策略不仅是数据安全的最后防线,更是业务连续性的基石。
想象一下这样的场景:凌晨3点,你的MongoDB集群突然崩溃,主节点磁盘损坏,而你上一次备份是在一周前。此时,你不仅要面对数据丢失的风险,还要承受业务中断带来的巨大损失。这就是为什么制定高效、可靠的MongoDB备份计划如此重要。
MongoDB的备份不同于传统关系型数据库,它需要考虑分片架构、副本集特性、存储引擎差异以及数据量级等因素。本文将深入探讨MongoDB备份的各个方面,从基础概念到高级策略,帮助你构建一个坚如磐石的数据保护体系。
MongoDB备份的核心概念
1. MongoDB备份的类型
MongoDB主要提供两种备份方式:逻辑备份和物理备份。
逻辑备份使用mongodump工具导出BSON格式的数据,这种方式灵活但速度较慢。例如:
# 使用mongodump进行逻辑备份
mongodump --host localhost --port 27017 --username admin --password secret --out /backup/mongodb/$(date +%Y%m%d)
物理备份则是直接复制MongoDB的数据文件(如WiredTiger的存储文件),速度快但需要停机或特殊处理。对于WiredTiger引擎,物理备份通常需要:
# 物理备份前需要锁定数据库
mongod --dbpath /data/db --repair --repairpath /tmp/repair
# 然后复制数据文件
rsync -av /data/db/ /backup/mongodb/data/
2. 副本集与分片集群的备份差异
MongoDB的架构复杂性直接影响备份策略:
- 副本集备份:通常在Secondary节点执行,避免影响Primary节点的写入性能
- 分片集群备份:需要协调所有分片和配置服务器,确保数据一致性
对于分片集群,一个典型的备份脚本需要:
#!/usr/bin/env python3
import subprocess
import datetime
def backup_sharded_cluster():
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
config_servers = ["config1.example.com:27019", "config2.example.com:27019"]
shards = ["shard1.example.com:27018", "shard2.example.com:27018"]
# 备份配置服务器
for config in config_servers:
cmd = f"mongodump --host {config} --out /backup/config_{timestamp}"
subprocess.run(cmd, shell=True)
# 备份每个分片
for shard in shards:
cmd = f"mongodump --host {shard} --out /backup/shard_{timestamp}"
subprocess.run(cmd, shell=True)
if __name__ == "__main__":
backup_sharded_cluster()
制定高效备份计划的关键要素
1. 备份频率与RPO(恢复点目标)的平衡
备份频率应该基于业务需求和数据变化频率来确定。以下是不同场景的建议:
| 业务类型 | 数据变化频率 | 建议备份频率 | RPO |
|---|---|---|---|
| 电商交易 | 高频写入 | 每小时增量备份 | 1小时 |
| 用户日志 | 中频写入 | 每日全量+每小时增量 | 1小时 |
| 配置数据 | 低频写入 | 每日全量 | 24小时 |
增量备份实现示例:
#!/bin/bash
# MongoDB增量备份脚本(基于Oplog)
BACKUP_DIR="/backup/mongodb/incremental"
OPLOG_FILE="$BACKUP_DIR/oplog.bson"
TIMESTAMP_FILE="$BACKUP_DIR/last_timestamp"
# 获取上次备份的时间戳
if [ -f "$TIMESTAMP_FILE" ]; then
LAST_TS=$(cat "$TIMESTAMP_FILE")
else
LAST_TS=$(mongo --eval "db.adminCommand({replSetGetStatus:1}).optimeDate" --quiet)
fi
# 备份Oplog
mongodump --host secondary.example.com --oplog --out $BACKUP_DIR/current
# 保存当前时间戳
mongo --eval "db.adminCommand({replSetGetStatus:1}).optimeDate" --quiet > $TIMESTAMP_FILE
# 压缩备份
tar -czf $BACKUP_DIR/incremental_$(date +%Y%m%d_%H%M%S).tar.gz -C $BACKUP_DIR current
2. 存储策略:本地 vs 云端
现代备份策略通常采用3-2-1原则:3份数据副本,2种不同存储介质,1份异地备份。
本地存储适合快速恢复,云端存储提供灾难恢复能力。一个混合策略示例:
# backup-config.yaml
backup:
local:
path: /backup/mongodb
retention: 7 days
cloud:
provider: aws
bucket: mongodb-backups-prod
region: us-east-1
retention: 30 days
schedule:
full: "0 2 * * 0" # 每周日2AM
incremental: "0 */4 * * *" # 每4小时
3. 自动化与监控
自动化是高效备份计划的核心。使用cron或Kubernetes CronJob来调度备份任务:
# /etc/cron.d/mongodb-backup
# 每日凌晨2点执行全量备份
0 2 * * 0 root /usr/local/bin/mongodb_full_backup.sh
# 每4小时执行增量备份
0 */4 * * * root /usr/local/bin/mongodb_incremental_backup.sh
# 每日检查备份完整性
0 3 * * * root /usr/local/bin/mongodb_backup_verify.sh
监控脚本示例:
#!/usr/bin/env python3
import smtplib
from email.mime.text import MIMEText
import subprocess
import os
def check_backup_health():
backup_dir = "/backup/mongodb"
latest_backup = max([os.path.join(backup_dir, d) for d in os.listdir(backup_dir)], key=os.path.getmtime)
# 检查备份文件完整性
result = subprocess.run(["mongorestore", "--dryRun", latest_backup],
capture_output=True, text=True)
if result.returncode != 0:
send_alert(f"备份验证失败: {result.stderr}")
return False
# 检查备份大小是否合理
backup_size = subprocess.check_output(["du", "-sb", latest_backup]).split()[0]
if int(backup_size) < 1000000: # 小于1MB
send_alert(f"备份文件过小,可能不完整: {backup_size} bytes")
return False
return True
def send_alert(message):
msg = MIMEText(message)
msg['Subject'] = 'MongoDB Backup Alert'
msg['From'] = 'backup@example.com'
msg['To'] = 'dba@example.com'
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
解决常见备份难题
1. 大数据量备份时间过长
问题:当数据量达到TB级别时,传统备份方式耗时过长。
解决方案:
- 使用文件系统快照(LVM、ZFS)
- 采用并行备份技术
- 实施增量备份策略
LVM快照备份示例:
#!/bin/bash
# LVM快照备份MongoDB
MOUNT_POINT="/data/db"
VG_NAME="vg_mongodb"
LV_NAME="lv_mongodb"
SNAP_NAME="snap_mongodb"
# 创建LVM快照(需要先暂停写入或使用fsync)
lvcreate -L 10G -s -n $SNAP_NAME $VG_NAME/$LV_NAME
# 挂载快照
mkdir -p /mnt/mongodb_snapshot
mount /dev/$VG_NAME/$SNAP_NAME /mnt/mongodb_snapshot
# 复制数据文件(此时可以恢复写入)
rsync -av /mnt/mongodb_snapshot/ /backup/mongodb/snapshot_$(date +%Y%m%d)/
# 清理
umount /mnt/mongodb_snapshot
lvremove -f $VG_NAME/$SNAP_NAME
2. 备份过程中的性能影响
问题:备份操作会消耗大量I/O和CPU资源,影响线上服务。
解决方案:
- 在Secondary节点执行备份
- 使用
--oplog参数保证一致性 - 限制备份速度(
--rateLimit)
# 在Secondary节点备份,限制带宽为50MB/s
mongodump --host secondary.example.com:27017 \
--oplog \
--rateLimit 50 \
--out /backup/mongodb/$(date +%Y%m%d)
3. 备份验证难题
问题:如何确保备份文件可以成功恢复?
解决方案:建立定期的备份验证机制。
#!/bin/bash
# 自动化备份验证
BACKUP_DIR="/backup/mongodb"
TEST_DB_PATH="/tmp/mongodb_test_restore"
TEST_PORT=27027
# 选择最近的备份
LATEST_BACKUP=$(ls -td $BACKUP_DIR/*/ | head -1)
# 启动临时MongoDB实例
mkdir -p $TEST_DB_PATH
mongod --dbpath $TEST_DB_PATH --port $TEST_PORT --fork --logpath /tmp/mongodb_test.log
# 尝试恢复
mongorestore --host localhost --port $TEST_PORT --drop $LATEST_BACKUP
# 验证数据完整性
RESULT=$(mongo --host localhost --port $TEST_PORT --eval "db.adminCommand({listDatabases:1})" --quiet)
# 清理
mongod --dbpath $TEST_DB_PATH --shutdown
rm -rf $TEST_DB_PATH
if [ $? -eq 0 ]; then
echo "Backup verification SUCCESS: $LATEST_BACKUP"
else
echo "Backup verification FAILED: $LATEST_BACKUP"
exit 1
fi
恢复挑战与最佳实践
1. 点时间恢复(Point-in-Time Recovery)
点时间恢复允许你恢复到任意时间点的数据状态,这对于修复人为错误至关重要。
实现步骤:
- 使用Oplog恢复到特定时间点
- 结合全量备份和增量备份
# 恢复到2024-01-15 14:30:00的状态
# 1. 恢复全量备份
mongorestore --host localhost --port 27017 --drop /backup/mongodb/full_20240115/
# 2. 恢复增量备份(Oplog)
mongorestore --host localhost --port 27017 --oplogReplay --oplogLimit "2024-01-15T14:30:00" /backup/mongodb/incremental_20240115/
2. 分片集群的恢复
分片集群的恢复更加复杂,需要确保所有分片和配置服务器的一致性。
恢复流程:
- 停止所有分片和配置服务器
- 按顺序恢复配置服务器
- 恢复每个分片
- 重新启动集群
#!/usr/bin/env python3
import subprocess
import time
def restore_sharded_cluster(backup_path):
# 1. 恢复配置服务器
config_servers = ["config1.example.com:27019", "config2.example.com:27019"]
for config in config_servers:
cmd = f"mongorestore --host {config} --port 27019 --drop {backup_path}/config"
subprocess.run(cmd, shell=True, check=True)
# 2. 恢复分片
shards = ["shard1.example.com:27018", "shard2.example.com:27018"]
for shard in shards:
cmd = f"mongorestore --host {shard} --port 27018 --drop {backup_path}/shard"
subprocess.run(cmd, shell=True, check=True)
# 3. 等待数据同步
time.sleep(60)
# 4. 验证集群状态
subprocess.run("mongo --eval 'db.adminCommand({listShards:1})'", shell=True)
if __name__ == "__main__":
restore_sharded_cluster("/backup/mongodb/cluster_20240115")
3. 跨版本恢复的兼容性
MongoDB版本升级时,备份文件可能无法直接恢复。需要特别注意:
- 版本兼容性矩阵:确保目标版本支持源版本的数据格式
- 升级路径:有时需要先升级到中间版本
- 测试恢复:在升级前务必测试恢复流程
# 检查MongoDB版本兼容性
mongod --version
# 如果版本不兼容,需要使用--maintainInsertionOrder参数
mongorestore --maintainInsertionOrder --drop /backup/mongodb/
高级备份策略
1. 增量备份与时间点恢复
增量备份可以显著减少存储需求和备份时间。结合Oplog,可以实现精确的时间点恢复。
Oplog结构理解: Oplog是MongoDB副本集中的特殊集合,记录所有数据变更操作。格式为:
{
"ts": Timestamp(1234567890, 1),
"op": "i", // i: insert, u: update, d: delete
"ns": "database.collection",
"o": { ... } // 操作内容
}
增量备份脚本:
#!/bin/bash
# 基于Oplog的增量备份
BACKUP_BASE="/backup/mongodb"
OPLOG_SIZE=$(mongo --eval "db.oplog.rs.stats().maxSize" --quiet)
# 获取当前Oplog起始位置
CURRENT_OPLOG_START=$(mongo --eval "db.oplog.rs.find().sort({$natural:1}).limit(1).next().ts" --quiet)
# 备份Oplog片段
mongodump --host secondary.example.com --db local --collection oplog.rs --query '{ts:{$gte:Timestamp('$(date +%s)',1)}}' --out $BACKUP_BASE/oplog_$(date +%Y%m%d_%H%M%S)
# 记录备份位置
echo $CURRENT_OPLOG_START > $BACKUP_BASE/last_oplog_position
2. 云原生备份方案
现代MongoDB部署越来越多地采用容器化和Kubernetes。云原生备份方案需要考虑:
- 持久卷快照:利用Kubernetes CSI驱动
- 对象存储集成:直接备份到S3、GCS等
- Operator模式:使用MongoDB Ops Manager或自定义Operator
Kubernetes CronJob备份示例:
apiVersion: batch/v1
kind: CronJob
metadata:
name: mongodb-backup
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: mongo:6.0
command:
- /bin/bash
- -c
- |
mongodump --host mongodb-service --out /backup/$(date +%Y%m%d)
tar -czf /backup/mongodb_$(date +%Y%m%d).tar.gz -C /backup $(date +%Y%m%d)
aws s3 cp /backup/mongodb_$(date +%Y%m%d).tar.gz s3://mongodb-backups/
volumeMounts:
- name: backup-storage
mountPath: /backup
volumes:
- name: backup-storage
persistentVolumeClaim:
claimName: backup-pvc
restartPolicy: OnFailure
3. 备份加密与安全性
备份文件包含敏感数据,必须进行加密保护。
使用GPG加密备份:
#!/bin/bash
# 加密备份文件
BACKUP_FILE="/backup/mongodb_$(date +%Y%m%d).tar.gz"
ENCRYPTED_FILE="$BACKUP_FILE.gpg"
# 使用GPG加密(需要预先配置密钥)
gpg --encrypt --recipient dba@example.com --output $ENCRYPTED_FILE $BACKUP_FILE
# 删除原始文件
rm $BACKUP_FILE
# 上传到S3
aws s3 cp $ENCRYPTED_FILE s3://mongodb-backups-encrypted/
MongoDB字段级加密备份: 对于使用客户端字段级加密的应用,备份时需要特别注意:
- 备份加密后的数据
- 安全存储加密密钥
- 确保密钥管理系统的可用性
备份监控与告警体系
1. 监控指标
关键监控指标包括:
- 备份成功率
- 备份持续时间
- 备份文件大小
- 恢复测试结果
- 存储空间使用率
Prometheus监控指标示例:
#!/usr/bin/env python3
# 导出备份指标到Prometheus
import time
import subprocess
from prometheus_client import start_http_server, Gauge
# 定义指标
backup_success = Gauge('mongodb_backup_success', 'Backup success status')
backup_duration = Gauge('mongodb_backup_duration_seconds', 'Backup duration')
backup_size = Gauge('mongodb_backup_size_bytes', 'Backup size')
def collect_metrics():
# 执行备份并测量指标
start_time = time.time()
try:
result = subprocess.run([
"mongodump", "--host", "localhost",
"--out", "/tmp/metrics_backup"
], capture_output=True, text=True, timeout=3600)
duration = time.time() - start_time
if result.returncode == 0:
backup_success.set(1)
backup_duration.set(duration)
# 计算备份大小
size = subprocess.check_output(["du", "-sb", "/tmp/metrics_backup"]).split()[0]
backup_size.set(int(size))
else:
backup_success.set(0)
except subprocess.TimeoutExpired:
backup_success.set(0)
backup_duration.set(3600) # 超时
if __name__ == '__main__':
start_http_server(8000)
while True:
collect_metrics()
time.sleep(300) # 每5分钟收集一次
2. 告警规则
Prometheus告警规则:
groups:
- name: mongodb_backup
rules:
- alert: MongoDBBackupFailed
expr: mongodb_backup_success == 0
for: 5m
labels:
severity: critical
annotations:
summary: "MongoDB backup failed"
description: "MongoDB backup has failed for more than 5 minutes"
- alert: MongoDBBackupTooSlow
expr: mongodb_backup_duration_seconds > 7200
for: 10m
labels:
severity: warning
annotations:
summary: "MongoDB backup is too slow"
description: "Backup duration exceeds 2 hours"
- alert: MongoDBBackupSizeAnomaly
expr: abs(mongodb_backup_size_bytes - avg_over_time(mongodb_backup_size_bytes[1d])) > 1000000000
for: 10m
labels:
severity: warning
annotations:
summary: "MongoDB backup size anomaly detected"
description: "Backup size deviates significantly from normal"
3. 日志与审计
详细的备份日志对于故障排查和合规性至关重要。
日志记录脚本:
#!/bin/bash
# 备份日志记录
LOG_FILE="/var/log/mongodb_backup.log"
BACKUP_ID=$(date +%Y%m%d_%H%M%S)
log_message() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] [ID:$BACKUP_ID] $1" >> $LOG_FILE
}
log_message "START backup operation"
# 执行备份
mongodump --host localhost --out /backup/mongodb/$BACKUP_ID 2>&1 | while read line; do
log_message "DUMP: $line"
done
if [ ${PIPESTATUS[0]} -eq 0 ]; then
log_message "SUCCESS backup completed"
# 记录备份信息
BACKUP_SIZE=$(du -sh /backup/mongodb/$BACKUP_ID | cut -f1)
log_message "INFO backup size: $BACKUP_SIZE"
# 上传到云端
aws s3 sync /backup/mongodb/$BACKUP_ID s3://mongodb-backups/$BACKUP_ID/ 2>&1 | while read line; do
log_message "S3: $line"
done
if [ ${PIPESTATUS[0]} -eq 0 ]; then
log_message "SUCCESS cloud upload completed"
else
log_message "ERROR cloud upload failed"
fi
else
log_message "ERROR backup failed"
fi
实战案例:构建企业级备份系统
案例背景
假设我们有一个电商平台,MongoDB集群包含:
- 3节点副本集(Primary, Secondary, Arbiter)
- 数据量约500GB
- 每小时写入约10GB数据
- 要求RPO < 1小时,RTO < 4小时
备份架构设计
# backup-architecture.yaml
infrastructure:
mongodb:
replica_set: "rs0"
nodes:
- { host: "mongo1.example.com", role: "primary", port: 27017 }
- { host: "mongo2.example.com", role: "secondary", port: 27017 }
- { host: "mongo3.example.com", role: "arbiter", port: 27017 }
storage:
local: "/backup/mongodb"
cloud: "s3://ecommerce-mongodb-backups"
retention:
daily: 7
weekly: 4
monthly: 12
schedule:
full_backup:
time: "0 2 * * 0" # 每周日2AM
compression: "gzip"
encryption: true
incremental_backup:
interval: "0 */1 * * *" # 每小时
method: "oplog"
verification:
time: "0 3 * * *" # 每日3AM
test_restore: true
cleanup:
time: "0 4 * * 0" # 每周日4AM
dry_run: false
monitoring:
prometheus_endpoint: "http://prometheus:9090"
alertmanager: "http://alertmanager:9093"
metrics_port: 8000
alerts:
- name: "backup_failure"
condition: "backup_success == 0"
severity: "critical"
- name: "slow_backup"
condition: "backup_duration > 7200"
severity: "warning"
security:
encryption:
method: "gpg"
key_id: "DBA_TEAM_KEY"
access_control:
backup_user: "backup_operator"
permissions: ["backupAnyDatabase", "clusterMonitor"]
audit_logging: true
实现脚本
主备份协调脚本:
#!/usr/bin/env python3
"""
MongoDB企业级备份协调器
功能:调度、执行、验证、清理备份任务
"""
import yaml
import subprocess
import schedule
import time
import logging
from datetime import datetime, timedelta
import boto3
import gnupg
class MongoDBBackupManager:
def __init__(self, config_path):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
self.setup_logging()
self.s3_client = boto3.client('s3')
self.gpg = gnupg.GPG()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mongodb_backup_manager.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def execute_full_backup(self):
"""执行全量备份"""
self.logger.info("Starting full backup")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"/backup/mongodb/full_{timestamp}"
try:
# 在Secondary节点执行备份
cmd = [
"mongodump",
"--host", "mongo2.example.com",
"--port", "27017",
"--oplog",
"--out", backup_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=7200)
if result.returncode != 0:
self.logger.error(f"Backup failed: {result.stderr}")
return False
# 压缩
tar_cmd = ["tar", "-czf", f"{backup_path}.tar.gz", "-C", "/backup/mongodb", f"full_{timestamp}"]
subprocess.run(tar_cmd, check=True)
# 加密
self.encrypt_file(f"{backup_path}.tar.gz")
# 上传到S3
self.upload_to_s3(f"{backup_path}.tar.gz.gpg", "full")
# 清理本地文件
subprocess.run(["rm", "-rf", backup_path, f"{backup_path}.tar.gz"])
self.logger.info(f"Full backup completed: {timestamp}")
return True
except subprocess.TimeoutExpired:
self.logger.error("Backup timed out")
return False
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
return False
def execute_incremental_backup(self):
"""执行增量备份"""
self.logger.info("Starting incremental backup")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"/backup/mongodb/incremental_{timestamp}"
try:
# 读取上次备份位置
with open("/backup/mongodb/last_oplog_ts", "r") as f:
last_ts = f.read().strip()
# 备份Oplog
cmd = [
"mongodump",
"--host", "mongo2.example.com",
"--db", "local",
"--collection", "oplog.rs",
"--query", f'{{ts:{{$gte:Timestamp({last_ts})}}}}',
"--out", backup_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800)
if result.returncode != 0:
self.logger.error(f"Incremental backup failed: {result.stderr}")
return False
# 记录新的时间戳
new_ts = self.get_current_oplog_ts()
with open("/backup/mongodb/last_oplog_ts", "w") as f:
f.write(new_ts)
# 压缩和加密
tar_cmd = ["tar", "-czf", f"{backup_path}.tar.gz", "-C", "/backup/mongodb", f"incremental_{timestamp}"]
subprocess.run(tar_cmd, check=True)
self.encrypt_file(f"{backup_path}.tar.gz")
self.upload_to_s3(f"{backup_path}.tar.gz.gpg", "incremental")
# 清理
subprocess.run(["rm", "-rf", backup_path, f"{backup_path}.tar.gz"])
self.logger.info(f"Incremental backup completed: {timestamp}")
return True
except Exception as e:
self.logger.error(f"Incremental backup error: {e}")
return False
def encrypt_file(self, file_path):
"""加密文件"""
self.logger.info(f"Encrypting {file_path}")
with open(file_path, 'rb') as f:
encrypted = self.gpg.encrypt_file(
f,
recipients=[self.config['security']['encryption']['key_id']],
output=f"{file_path}.gpg"
)
if encrypted.ok:
self.logger.info(f"Encryption successful: {file_path}.gpg")
else:
raise Exception(f"Encryption failed: {encrypted.status}")
def upload_to_s3(self, file_path, backup_type):
"""上传到S3"""
self.logger.info(f"Uploading {file_path} to S3")
bucket = self.config['infrastructure']['storage']['cloud'].replace("s3://", "")
key = f"{backup_type}/{datetime.now().strftime('%Y/%m/%d')}/{os.path.basename(file_path)}"
self.s3_client.upload_file(file_path, bucket, key)
self.logger.info(f"Upload completed: s3://{bucket}/{key}")
def verify_backups(self):
"""验证备份完整性"""
self.logger.info("Starting backup verification")
# 选择最近的备份
s3_bucket = self.config['infrastructure']['storage']['cloud'].replace("s3://", "")
objects = self.s3_client.list_objects_v2(
Bucket=s3_bucket,
Prefix=f"full/{datetime.now().strftime('%Y/%m/%d')}/"
)
if 'Contents' not in objects:
self.logger.error("No backups found for verification")
return False
latest_backup = sorted(objects['Contents'], key=lambda x: x['LastModified'])[-1]
# 下载并解密
backup_file = f"/tmp/verify_{os.path.basename(latest_backup['Key'])}"
self.s3_client.download_file(s3_bucket, latest_backup['Key'], backup_file)
# 解密
with open(backup_file, 'rb') as f:
decrypted = self.gpg.decrypt_file(f)
if not decrypted.ok:
self.logger.error("Decryption failed during verification")
return False
# 解压
verify_path = "/tmp/verify_backup"
subprocess.run(["mkdir", "-p", verify_path], check=True)
subprocess.run(["tar", "-xzf", backup_file, "-C", verify_path], check=True)
# 尝试恢复到测试环境
test_port = 27027
test_dbpath = "/tmp/mongodb_verify"
# 启动测试实例
subprocess.run(["mkdir", "-p", test_dbpath], check=True)
mongod_proc = subprocess.Popen([
"mongod", "--dbpath", test_dbpath, "--port", str(test_port),
"--fork", "--logpath", "/tmp/mongodb_verify.log"
])
time.sleep(5) # 等待启动
# 恢复
restore_path = f"{verify_path}/{os.path.splitext(os.path.splitext(latest_backup['Key'])[0])[0]}"
result = subprocess.run([
"mongorestore", "--host", "localhost", "--port", str(test_port),
"--drop", restore_path
], capture_output=True, text=True)
# 清理
subprocess.run(["mongod", "--dbpath", test_dbpath, "--shutdown"])
subprocess.run(["rm", "-rf", test_dbpath, verify_path, backup_file])
if result.returncode == 0:
self.logger.info("Backup verification SUCCESS")
return True
else:
self.logger.error(f"Backup verification FAILED: {result.stderr}")
return False
def cleanup_old_backups(self):
"""清理过期备份"""
self.logger.info("Starting cleanup of old backups")
retention = self.config['infrastructure']['storage']['retention']
s3_bucket = self.config['infrastructure']['storage']['cloud'].replace("s3://", "")
# 清理S3中的旧备份
for backup_type in ['full', 'incremental']:
days = retention.get(backup_type, 7)
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y/%m/%d')
objects = self.s3_client.list_objects_v2(
Bucket=s3_bucket,
Prefix=f"{backup_type}/"
)
if 'Contents' in objects:
for obj in objects['Contents']:
if obj['Key'] < f"{backup_type}/{cutoff_date}/":
self.s3_client.delete_object(Bucket=s3_bucket, Key=obj['Key'])
self.logger.info(f"Deleted old backup: {obj['Key']}")
# 清理本地旧备份
local_path = self.config['infrastructure']['storage']['local']
for backup_type in ['full', 'incremental']:
days = retention.get(backup_type, 7)
cutoff_time = time.time() - (days * 86400)
for item in os.listdir(local_path):
item_path = os.path.join(local_path, item)
if os.path.getmtime(item_path) < cutoff_time:
subprocess.run(["rm", "-rf", item_path])
self.logger.info(f"Deleted local backup: {item_path}")
def run(self):
"""启动备份管理器"""
self.logger.info("MongoDB Backup Manager starting...")
# 注册任务
schedule.every().day.at("02:00").do(self.execute_full_backup)
schedule.every().hour.at(":00").do(self.execute_incremental_backup)
schedule.every().day.at("03:00").do(self.verify_backups)
schedule.every().day.at("04:00").do(self.cleanup_old_backups)
# 立即执行一次验证
self.verify_backups()
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
manager = MongoDBBackupManager("/etc/mongodb_backup_config.yaml")
manager.run()
总结与最佳实践清单
核心原则
- 3-2-1原则:3份数据副本,2种存储介质,1份异地备份
- 定期测试:至少每月进行一次完整的恢复测试
- 自动化一切:手动操作是错误的主要来源
- 监控驱动:没有监控的备份等于没有备份
推荐工具组合
- 备份工具:mongodump + mongorestore(逻辑备份),LVM/ZFS快照(物理备份)
- 存储:本地SSD + AWS S3 + Glacier(长期归档)
- 调度:Cron + Python脚本(复杂逻辑)
- 监控:Prometheus + Grafana + Alertmanager
- 加密:GPG + KMS(密钥管理)
常见陷阱与避免方法
- 只在Primary备份:影响写入性能,应在Secondary备份
- 不验证备份:备份可能损坏或不完整
- 忽略Oplog大小:Oplog过小会导致增量备份失败
- 不考虑版本兼容性:跨版本恢复可能失败
- 明文存储密钥:备份加密密钥需要安全存储
检查清单
- [ ] 备份策略文档化
- [ ] 自动化脚本经过测试
- [ ] 监控和告警已配置
- [ ] 定期恢复测试计划
- [ ] 备份加密已启用
- [ ] 访问权限最小化
- [ ] 日志审计已开启
- [ ] 灾难恢复流程已演练
通过遵循这些原则和实践,你可以构建一个可靠、高效、安全的MongoDB备份系统,确保在任何灾难情况下都能快速恢复数据,保障业务连续性。记住,备份的价值只有在恢复成功时才能体现,因此定期测试和验证是整个备份策略中最重要的环节。
