引言:为什么需要系统设计学校?

在当今快速发展的技术世界中,构建可扩展的系统架构是每个软件工程师和架构师的核心技能。然而,系统设计往往是一个复杂且需要大量实践经验的领域。传统的学习方式通常需要数年时间,但通过结构化的21周计划,我们可以系统地掌握从基础概念到高级架构设计的全过程。

本指南将带你走过一个为期21周的实战旅程,每周聚焦一个关键主题,通过理论学习、实践项目和代码示例,帮助你从零开始构建可扩展的系统架构。无论你是初学者还是有经验的开发者,这个计划都将为你提供清晰的路径和实用的工具。

第1-3周:基础概念与系统思维

第1周:理解系统设计的基本原则

主题句:系统设计的核心在于理解需求、权衡取舍和模块化思维。

支持细节

  1. 需求分析:明确系统要解决的问题,区分功能需求和非功能需求(如可扩展性、可靠性、性能)。
  2. 权衡取舍:没有完美的设计,只有适合当前场景的方案。例如,CAP定理(一致性、可用性、分区容错性)告诉我们,在分布式系统中必须做出选择。
  3. 模块化:将系统分解为独立的模块,每个模块负责特定功能,降低复杂度。

示例:设计一个简单的博客系统。

  • 功能需求:用户注册、发布文章、评论、搜索。
  • 非功能需求:支持1000并发用户,响应时间<500ms。
  • 权衡:为了快速上线,初期使用单体架构,后期根据需求拆分为微服务。

第2周:数据存储基础

主题句:选择合适的数据存储方案是系统设计的关键。

支持细节

  1. 关系型数据库(RDBMS):适合结构化数据,支持ACID事务。例如MySQL、PostgreSQL。
  2. 非关系型数据库(NoSQL):适合非结构化或半结构化数据,扩展性好。例如MongoDB(文档型)、Redis(键值型)。
  3. 数据模型设计:理解范式化与反范式化的权衡,设计合理的表结构或文档结构。

代码示例:使用Python和SQLAlchemy定义一个简单的博客文章模型。

from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    articles = relationship('Article', back_populates='author')

class Article(Base):
    __tablename__ = 'articles'
    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(Text, nullable=False)
    user_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.utcnow)
    author = relationship('User', back_populates='articles')

第3周:网络与通信基础

主题句:理解网络协议和通信模式是构建分布式系统的基石。

支持细节

  1. HTTP/HTTPS:了解请求-响应模型、状态码、RESTful API设计。
  2. TCP/IP:理解三次握手、四次挥手、可靠传输。
  3. 消息队列:用于异步通信和解耦系统组件。例如RabbitMQ、Kafka。

代码示例:使用Python的Flask框架创建一个简单的RESTful API。

from flask import Flask, jsonify, request

app = Flask(__name__)

# 模拟数据库
articles_db = []

@app.route('/articles', methods=['GET'])
def get_articles():
    return jsonify(articles_db)

@app.route('/articles', methods=['POST'])
def create_article():
    data = request.get_json()
    if not data or 'title' not in data or 'content' not in data:
        return jsonify({'error': 'Missing title or content'}), 400
    article = {
        'id': len(articles_db) + 1,
        'title': data['title'],
        'content': data['content']
    }
    articles_db.append(article)
    return jsonify(article), 201

if __name__ == '__main__':
    app.run(debug=True)

第4-7周:单体架构与数据库优化

第4周:单体架构设计

主题句:单体架构是大多数系统的起点,理解其优缺点至关重要。

支持细节

  1. 优点:开发简单、部署容易、调试方便。
  2. 缺点:随着系统增长,代码库变得臃肿,扩展性差。
  3. 设计模式:MVC(模型-视图-控制器)模式,将业务逻辑、数据和界面分离。

代码示例:一个简单的单体博客应用结构。

blog-app/
├── app.py          # 主应用文件
├── models.py       # 数据模型
├── views.py        # 视图逻辑
├── templates/      # HTML模板
├── static/         # 静态文件
└── config.py       # 配置文件

第5周:数据库设计与优化

主题句:良好的数据库设计能显著提升系统性能。

支持细节

  1. 索引优化:为常用查询字段创建索引,避免全表扫描。
  2. 查询优化:使用EXPLAIN分析查询计划,避免N+1问题。
  3. 分库分表:当数据量过大时,考虑水平或垂直拆分。

代码示例:使用SQLAlchemy进行数据库查询优化。

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# 创建数据库连接
engine = create_engine('sqlite:///blog.db')
Session = sessionmaker(bind=engine)

# 优化前:N+1查询问题
def get_articles_with_comments_naive():
    session = Session()
    articles = session.query(Article).all()
    for article in articles:
        # 每个文章单独查询评论,导致N+1问题
        comments = session.query(Comment).filter_by(article_id=article.id).all()
        print(f"Article: {article.title}, Comments: {len(comments)}")
    session.close()

# 优化后:使用JOIN或批量查询
def get_articles_with_comments_optimized():
    session = Session()
    # 使用JOIN一次性获取文章和评论
    articles = session.query(Article).join(Comment, Article.id == Comment.article_id).all()
    for article in articles:
        print(f"Article: {article.title}, Comments: {len(article.comments)}")
    session.close()

第6周:缓存策略

主题句:缓存是提升系统性能的有效手段。

支持细节

  1. 缓存层次:客户端缓存、CDN、应用层缓存(如Redis)、数据库缓存。
  2. 缓存策略:LRU(最近最少使用)、TTL(生存时间)。
  3. 缓存一致性:如何保证缓存与数据库的一致性。

代码示例:使用Redis缓存文章数据。

import redis
import json
from functools import wraps

# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def cache_article(ttl=300):
    """装饰器:缓存文章数据"""
    def decorator(func):
        @wraps(func)
        def wrapper(article_id):
            # 生成缓存键
            cache_key = f"article:{article_id}"
            # 尝试从缓存获取
            cached_data = redis_client.get(cache_key)
            if cached_data:
                return json.loads(cached_data)
            # 缓存未命中,执行原函数
            result = func(article_id)
            # 存入缓存
            redis_client.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

@cache_article(ttl=600)
def get_article_from_db(article_id):
    """从数据库获取文章"""
    # 模拟数据库查询
    return {
        'id': article_id,
        'title': f'Article {article_id}',
        'content': f'Content of article {article_id}',
        'created_at': '2023-01-01'
    }

第7周:负载均衡与水平扩展

主题句:负载均衡是实现水平扩展的关键技术。

支持细节

  1. 负载均衡算法:轮询、加权轮询、最少连接数、IP哈希。
  2. 负载均衡器:Nginx、HAProxy、云服务负载均衡器(如AWS ELB)。
  3. 水平扩展:通过增加服务器实例来提升处理能力。

代码示例:使用Nginx配置负载均衡。

# nginx.conf
http {
    upstream blog_backend {
        # 轮询算法
        server 127.0.0.1:5000;
        server 127.0.0.1:5001;
        server 127.0.0.1:5002;
        
        # 可选:加权轮询
        # server 127.0.0.1:5000 weight=3;
        # server 127.0.0.1:5001 weight=2;
        # server 127.0.0.1:5002 weight=1;
    }
    
    server {
        listen 80;
        server_name localhost;
        
        location / {
            proxy_pass http://blog_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
        }
    }
}

第8-12周:分布式系统基础

第8周:分布式系统概念

主题句:理解分布式系统的核心挑战是设计可扩展架构的前提。

支持细节

  1. CAP定理:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三者不可兼得。
  2. BASE理论:基本可用(Basically Available)、软状态(Soft state)、最终一致性(Eventual consistency)。
  3. 分布式事务:两阶段提交(2PC)、三阶段提交(3PC)、Saga模式。

代码示例:模拟一个简单的分布式事务(使用补偿事务)。

import time
from typing import Dict, List

class DistributedTransaction:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, action, compensation):
        """添加事务步骤和补偿操作"""
        self.steps.append(action)
        self.compensations.append(compensation)
    
    def execute(self):
        """执行事务"""
        executed_steps = []
        try:
            for i, step in enumerate(self.steps):
                print(f"执行步骤 {i+1}: {step.__name__}")
                step()
                executed_steps.append(i)
        except Exception as e:
            print(f"事务失败: {e}")
            # 执行补偿操作
            for i in reversed(executed_steps):
                print(f"执行补偿 {i+1}: {self.compensations[i].__name__}")
                self.compensations[i]()
            return False
        print("事务成功完成")
        return True

# 示例:转账事务
def transfer_money(from_account, to_account, amount):
    """转账操作"""
    print(f"从账户 {from_account} 转账 {amount} 到账户 {to_account}")
    # 模拟数据库操作
    time.sleep(0.1)

def compensate_transfer(from_account, to_account, amount):
    """转账补偿操作"""
    print(f"回滚转账: 从账户 {to_account} 退还 {amount} 到账户 {from_account}")
    time.sleep(0.1)

# 创建事务
transaction = DistributedTransaction()
transaction.add_step(
    lambda: transfer_money('A', 'B', 100),
    lambda: compensate_transfer('A', 'B', 100)
)
transaction.add_step(
    lambda: transfer_money('B', 'C', 50),
    lambda: compensate_transfer('B', 'C', 50)
)

# 执行事务
transaction.execute()

第9周:消息队列与异步处理

主题句:消息队列是实现系统解耦和异步处理的利器。

支持细节

  1. 消息队列类型:点对点(Queue)、发布/订阅(Topic)。
  2. 消息可靠性:持久化、确认机制、重试策略。
  3. 常见队列:RabbitMQ(AMQP协议)、Kafka(高吞吐量)、Redis Streams。

代码示例:使用RabbitMQ实现异步任务处理。

import pika
import json
import time

# 生产者:发送消息
def send_message(queue_name, message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 消息持久化
        )
    )
    print(f" [x] Sent {message}")
    connection.close()

# 消费者:处理消息
def consume_messages(queue_name):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    
    def callback(ch, method, properties, body):
        message = json.loads(body)
        print(f" [x] Received {message}")
        # 模拟处理时间
        time.sleep(1)
        print(f" [x] Done processing {message}")
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认消息
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

# 使用示例
if __name__ == '__main__':
    # 发送消息
    send_message('task_queue', {'task_id': 1, 'action': 'send_email'})
    send_message('task_queue', {'task_id': 2, 'action': 'generate_report'})
    
    # 消费消息(在另一个进程或线程中运行)
    # consume_messages('task_queue')

第10周:分布式缓存

主题句:分布式缓存可以显著提升系统性能并减少数据库压力。

支持细节

  1. 缓存策略:Cache-Aside、Read-Through、Write-Through、Write-Behind。
  2. 缓存一致性:如何处理缓存与数据库的同步问题。
  3. 分布式缓存:Redis Cluster、Memcached。

代码示例:实现Cache-Aside模式。

import redis
import json
from datetime import datetime, timedelta

class DistributedCache:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def get(self, key, fallback_func, ttl=300):
        """获取缓存,如果不存在则调用回退函数"""
        cached = self.redis.get(key)
        if cached:
            print(f"Cache hit for key: {key}")
            return json.loads(cached)
        
        print(f"Cache miss for key: {key}")
        # 调用回退函数获取数据
        data = fallback_func()
        # 存入缓存
        self.redis.setex(key, ttl, json.dumps(data))
        return data
    
    def set(self, key, value, ttl=300):
        """设置缓存"""
        self.redis.setex(key, ttl, json.dumps(value))
    
    def delete(self, key):
        """删除缓存"""
        self.redis.delete(key)

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = DistributedCache(redis_client)

def get_user_from_db(user_id):
    """模拟从数据库获取用户数据"""
    print(f"Fetching user {user_id} from database...")
    return {
        'id': user_id,
        'name': f'User {user_id}',
        'email': f'user{user_id}@example.com',
        'created_at': datetime.now().isoformat()
    }

# 获取用户数据
user = cache.get(f"user:{1}", lambda: get_user_from_db(1), ttl=600)
print(user)

# 再次获取,应该从缓存读取
user = cache.get(f"user:{1}", lambda: get_user_from_db(1), ttl=600)
print(user)

第11周:微服务架构入门

主题句:微服务架构将单体应用拆分为小型、独立的服务,提升可扩展性和可维护性。

支持细节

  1. 微服务优点:独立部署、技术异构、故障隔离。
  2. 微服务挑战:服务发现、分布式事务、监控复杂。
  3. 设计原则:单一职责、松耦合、高内聚。

代码示例:使用FastAPI创建一个简单的微服务。

# user_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List

app = FastAPI(title="User Service")

class User(BaseModel):
    id: int
    name: str
    email: str

# 模拟数据库
users_db = {}

@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return users_db[user_id]

@app.post("/users/", response_model=User)
async def create_user(user: User):
    users_db[user.id] = user.dict()
    return user

@app.get("/users/", response_model=List[User])
async def list_users():
    return list(users_db.values())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

第12周:服务发现与配置管理

主题句:服务发现和配置管理是微服务架构的核心基础设施。

支持细节

  1. 服务发现:Consul、Etcd、ZooKeeper、Kubernetes Service。
  2. 配置管理:Spring Cloud Config、Consul KV、etcd。
  3. 动态配置:配置热更新、环境隔离。

代码示例:使用Consul进行服务注册与发现。

import consul
import time
import requests
from threading import Thread

class ServiceRegistry:
    def __init__(self, host='localhost', port=8500):
        self.consul = consul.Consul(host=host, port=port)
    
    def register_service(self, service_name, service_id, address, port, tags=None):
        """注册服务"""
        check = consul.Check.http(f"http://{address}:{port}/health", interval="10s")
        self.consul.agent.service.register(
            name=service_name,
            service_id=service_id,
            address=address,
            port=port,
            tags=tags or [],
            check=check
        )
        print(f"Service {service_name} registered at {address}:{port}")
    
    def discover_service(self, service_name):
        """发现服务"""
        services = self.consul.health.service(service_name, passing=True)[1]
        return [s['Service'] for s in services]

# 使用示例
registry = ServiceRegistry()

# 注册服务
registry.register_service(
    service_name="user-service",
    service_id="user-service-1",
    address="127.0.0.1",
    port=8001,
    tags=["v1"]
)

# 发现服务
services = registry.discover_service("user-service")
print(f"Discovered services: {services}")

第13-17周:高级架构模式

第13周:API网关

主题句:API网关是微服务架构的入口,负责路由、认证、限流等横切关注点。

支持细节

  1. 功能:路由、认证、限流、日志、监控。
  2. 实现:Nginx、Kong、Spring Cloud Gateway、AWS API Gateway。
  3. 设计模式:网关聚合、网关转发。

代码示例:使用Kong作为API网关。

# 安装Kong(Docker方式)
docker run -d --name kong-database \
  -p 5432:5432 \
  -e "POSTGRES_USER=kong" \
  -e "POSTGRES_DB=kong" \
  -e "POSTGRES_PASSWORD=kong" \
  postgres:9.6

docker run --rm \
  --link kong-database:kong-database \
  -e "KONG_DATABASE=postgres" \
  -e "KONG_PG_HOST=kong-database" \
  -e "KONG_PG_PASSWORD=kong" \
  kong:latest kong migrations bootstrap

docker run -d --name kong \
  --link kong-database:kong-database \
  -e "KONG_DATABASE=postgres" \
  -e "KONG_PG_HOST=kong-database" \
  -e "KONG_PG_PASSWORD=kong" \
  -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
  -e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
  -e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
  -e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
  -p 8000:8000 \
  -p 8443:8443 \
  -p 8001:8001 \
  -p 8444:8444 \
  kong:latest

# 配置Kong网关
curl -i -X POST http://localhost:8001/services/ \
  --data name=user-service \
  --data url='http://127.0.0.1:8001'

curl -i -X POST http://localhost:8001/services/user-service/routes \
  --data 'paths[]=/users' \
  --data 'methods[]=GET' \
  --data 'methods[]=POST'

# 添加限流插件
curl -i -X POST http://localhost:8001/services/user-service/plugins \
  --data name=rate-limiting \
  --data config.minute=100 \
  --data config.hour=1000

第14周:事件驱动架构

主题句:事件驱动架构通过事件流实现松耦合,适合高并发场景。

支持细节

  1. 事件源(Event Sourcing):将状态变化存储为事件序列。
  2. CQRS(命令查询职责分离):将读写操作分离,优化性能。
  3. 事件总线:Kafka、RabbitMQ、AWS EventBridge。

代码示例:实现简单的事件源系统。

import json
from datetime import datetime
from typing import List, Dict

class Event:
    def __init__(self, event_type: str, data: Dict):
        self.event_type = event_type
        self.data = data
        self.timestamp = datetime.now().isoformat()
    
    def to_dict(self):
        return {
            'event_type': self.event_type,
            'data': self.data,
            'timestamp': self.timestamp
        }

class EventStore:
    def __init__(self):
        self.events = []
    
    def append(self, event: Event):
        self.events.append(event)
    
    def get_events(self, aggregate_id: str = None) -> List[Event]:
        if aggregate_id:
            return [e for e in self.events if e.data.get('aggregate_id') == aggregate_id]
        return self.events

class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.status = 'created'
        self.amount = 0
        self.version = 0
    
    def apply_event(self, event: Event):
        if event.event_type == 'OrderCreated':
            self.status = 'created'
            self.amount = event.data['amount']
            self.version += 1
        elif event.event_type == 'OrderPaid':
            self.status = 'paid'
            self.version += 1
        elif event.event_type == 'OrderCancelled':
            self.status = 'cancelled'
            self.version += 1
    
    def create_order(self, amount: float, event_store: EventStore):
        event = Event('OrderCreated', {
            'aggregate_id': self.order_id,
            'amount': amount,
            'version': self.version
        })
        event_store.append(event)
        self.apply_event(event)
    
    def pay_order(self, event_store: EventStore):
        if self.status != 'created':
            raise ValueError("Order cannot be paid")
        event = Event('OrderPaid', {
            'aggregate_id': self.order_id,
            'version': self.version
        })
        event_store.append(event)
        self.apply_event(event)

# 使用示例
event_store = EventStore()
order = OrderAggregate('order-123')
order.create_order(100.0, event_store)
order.pay_order(event_store)

# 重建聚合状态
def rebuild_order_state(order_id: str, event_store: EventStore) -> OrderAggregate:
    events = event_store.get_events(order_id)
    order = OrderAggregate(order_id)
    for event in events:
        order.apply_event(event)
    return order

reconstructed_order = rebuild_order_state('order-123', event_store)
print(f"Order status: {reconstructed_order.status}, Amount: {reconstructed_order.amount}")

第15周:数据库分片与分区

主题句:当单数据库无法承载数据量时,分片和分区是必要的扩展手段。

支持细节

  1. 水平分片:将数据按某种规则分布到多个数据库实例。
  2. 垂直分片:按业务模块拆分数据库。
  3. 分片策略:范围分片、哈希分片、地理位置分片。

代码示例:实现简单的哈希分片。

import hashlib
from typing import List, Dict

class DatabaseShard:
    def __init__(self, shard_id: int, connection_string: str):
        self.shard_id = shard_id
        self.connection_string = connection_string
        self.data = {}  # 模拟数据库
    
    def get(self, key: str):
        return self.data.get(key)
    
    def set(self, key: str, value):
        self.data[key] = value

class ShardedDatabase:
    def __init__(self, shards: List[DatabaseShard]):
        self.shards = shards
        self.num_shards = len(shards)
    
    def get_shard(self, key: str) -> DatabaseShard:
        """根据key选择分片"""
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        shard_index = hash_value % self.num_shards
        return self.shards[shard_index]
    
    def get(self, key: str):
        shard = self.get_shard(key)
        return shard.get(key)
    
    def set(self, key: str, value):
        shard = self.get_shard(key)
        shard.set(key, value)

# 使用示例
shards = [
    DatabaseShard(0, "postgresql://localhost/db0"),
    DatabaseShard(1, "postgresql://localhost/db1"),
    DatabaseShard(2, "postgresql://localhost/db2")
]
db = ShardedDatabase(shards)

# 存储数据
db.set("user:1", {"id": 1, "name": "Alice"})
db.set("user:2", {"id": 2, "name": "Bob"})
db.set("user:3", {"id": 3, "name": "Charlie"})

# 读取数据
print(db.get("user:1"))  # 可能在分片0
print(db.get("user:2"))  # 可能在分片1
print(db.get("user:3"))  # 可能在分片2

第16周:分布式锁与协调

主题句:在分布式环境中,协调多个节点的访问需要分布式锁。

支持细节

  1. 分布式锁实现:基于数据库、Redis、ZooKeeper。
  2. 锁的粒度:全局锁、资源锁、乐观锁。
  3. 锁的可靠性:避免死锁、锁超时、锁续期。

代码示例:使用Redis实现分布式锁。

import redis
import time
import uuid
from contextlib import contextmanager

class RedisDistributedLock:
    def __init__(self, redis_client, lock_key, timeout=10):
        self.redis = redis_client
        self.lock_key = lock_key
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
    
    @contextmanager
    def acquire(self):
        """获取锁,使用上下文管理器自动释放"""
        acquired = self._acquire()
        try:
            yield acquired
        finally:
            if acquired:
                self._release()
    
    def _acquire(self):
        """尝试获取锁"""
        # 使用SETNX命令,只有key不存在时才设置
        acquired = self.redis.set(self.lock_key, self.identifier, nx=True, ex=self.timeout)
        if acquired:
            print(f"Lock acquired: {self.lock_key}")
            return True
        print(f"Failed to acquire lock: {self.lock_key}")
        return False
    
    def _release(self):
        """释放锁"""
        # 使用Lua脚本确保原子性
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua_script, 1, self.lock_key, self.identifier)
        if result:
            print(f"Lock released: {self.lock_key}")
        else:
            print(f"Failed to release lock: {self.lock_key}")

# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
lock = RedisDistributedLock(redis_client, "resource:critical", timeout=5)

# 模拟并发访问
def critical_section():
    with lock.acquire() as acquired:
        if acquired:
            print("Executing critical section...")
            time.sleep(2)  # 模拟耗时操作
            print("Critical section completed")
        else:
            print("Could not enter critical section")

# 多个线程尝试获取锁
import threading

threads = []
for i in range(3):
    t = threading.Thread(target=critical_section)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

第17周:监控与可观测性

主题句:监控和可观测性是确保分布式系统健康运行的关键。

支持细节

  1. 监控指标:黄金指标(延迟、流量、错误、饱和度)。
  2. 日志聚合:ELK Stack(Elasticsearch, Logstash, Kibana)。
  3. 分布式追踪:Jaeger、Zipkin、OpenTelemetry。

代码示例:使用Prometheus和Grafana监控应用。

from prometheus_client import start_http_server, Counter, Histogram, Gauge
import time
import random

# 定义指标
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'])
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Number of active connections')

# 模拟HTTP请求处理
def handle_request(method, endpoint):
    REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
    ACTIVE_CONNECTIONS.inc()
    
    start_time = time.time()
    # 模拟处理时间
    processing_time = random.uniform(0.1, 0.5)
    time.sleep(processing_time)
    
    duration = time.time() - start_time
    REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
    ACTIVE_CONNECTIONS.dec()

# 启动Prometheus指标服务器
if __name__ == '__main__':
    start_http_server(8000)
    print("Prometheus metrics server started on port 8000")
    
    # 模拟请求
    while True:
        handle_request('GET', '/api/users')
        handle_request('POST', '/api/articles')
        time.sleep(1)

第18-21周:实战项目与优化

第18周:项目规划与架构设计

主题句:一个成功的项目始于清晰的规划和合理的架构设计。

支持细节

  1. 需求分析:明确项目目标、用户群体、功能需求。
  2. 技术选型:根据需求选择合适的技术栈。
  3. 架构设计:绘制架构图,定义组件和接口。

项目示例:设计一个社交网络平台。

  • 功能需求:用户注册、发帖、点赞、评论、关注、消息通知。
  • 非功能需求:支持10万并发用户,响应时间<200ms,99.9%可用性。
  • 技术栈
    • 前端:React + TypeScript
    • 后端:Go(高性能)或Python(快速开发)
    • 数据库:PostgreSQL(主数据)+ Redis(缓存)
    • 消息队列:Kafka(事件流)
    • 部署:Kubernetes + Docker

架构图(文本描述):

用户 -> CDN -> API网关 -> 微服务集群
    -> 用户服务 -> PostgreSQL
    -> 帖子服务 -> PostgreSQL + Elasticsearch(搜索)
    -> 通知服务 -> Kafka -> 邮件/推送服务
    -> 监控服务 -> Prometheus + Grafana

第19周:实现核心功能

主题句:分阶段实现核心功能,确保每个模块的稳定性和可扩展性。

支持细节

  1. 用户服务:注册、登录、个人信息管理。
  2. 帖子服务:创建、读取、更新、删除帖子。
  3. 社交功能:关注、点赞、评论。

代码示例:用户服务的核心API(使用Go语言)。

// user_service.go
package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/gorilla/mux"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/postgres"
)

// User 模型
type User struct {
    ID        uint      `gorm:"primary_key" json:"id"`
    Username  string    `gorm:"unique;not null" json:"username"`
    Email     string    `gorm:"unique;not null" json:"email"`
    Password  string    `gorm:"not null" json:"-"`
    CreatedAt time.Time `json:"created_at"`
    UpdatedAt time.Time `json:"updated_at"`
}

// 数据库连接
var db *gorm.DB

func initDB() {
    var err error
    db, err = gorm.Open("postgres", "host=localhost port=5432 user=postgres dbname=blog sslmode=disable")
    if err != nil {
        log.Fatalf("Failed to connect to database: %v", err)
    }
    db.AutoMigrate(&User{})
}

// API处理器
func getUsers(w http.ResponseWriter, r *http.Request) {
    var users []User
    db.Find(&users)
    json.NewEncoder(w).Encode(users)
}

func getUser(w http.ResponseWriter, r *http.Request) {
    params := mux.Vars(r)
    var user User
    if db.First(&user, params["id"]).RecordNotFound() {
        http.Error(w, "User not found", http.StatusNotFound)
        return
    }
    json.NewEncoder(w).Encode(user)
}

func createUser(w http.ResponseWriter, r *http.Request) {
    var user User
    json.NewDecoder(r.Body).Decode(&user)
    if err := db.Create(&user).Error; err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(user)
}

func main() {
    initDB()
    defer db.Close()
    
    router := mux.NewRouter()
    router.HandleFunc("/users", getUsers).Methods("GET")
    router.HandleFunc("/users/{id}", getUser).Methods("GET")
    router.HandleFunc("/users", createUser).Methods("POST")
    
    log.Println("Starting user service on :8001")
    log.Fatal(http.ListenAndServe(":8001", router))
}

第20周:性能优化与压力测试

主题句:性能优化是系统设计的持续过程,需要通过测试和监控不断改进。

支持细节

  1. 性能瓶颈分析:使用Profiling工具(如pprof for Go, cProfile for Python)。
  2. 压力测试:使用JMeter、Locust、k6模拟高并发场景。
  3. 优化策略:代码优化、缓存优化、数据库优化、异步处理。

代码示例:使用Locust进行压力测试。

# locustfile.py
from locust import HttpUser, task, between
import random

class SocialNetworkUser(HttpUser):
    wait_time = between(1, 3)
    
    @task(3)
    def get_users(self):
        self.client.get("/users")
    
    @task(2)
    def get_user(self):
        user_id = random.randint(1, 100)
        self.client.get(f"/users/{user_id}")
    
    @task(1)
    def create_user(self):
        username = f"user_{random.randint(1000, 9999)}"
        email = f"{username}@example.com"
        self.client.post("/users", json={
            "username": username,
            "email": email,
            "password": "password123"
        })
    
    @task(2)
    def get_posts(self):
        self.client.get("/posts")
    
    @task(1)
    def create_post(self):
        self.client.post("/posts", json={
            "title": f"Test Post {random.randint(1, 1000)}",
            "content": "This is a test post content.",
            "user_id": random.randint(1, 100)
        })

第21周:部署与运维

主题句:成功的部署和运维是系统稳定运行的保障。

支持细节

  1. 容器化:使用Docker打包应用。
  2. 编排:使用Kubernetes管理容器集群。
  3. CI/CD:自动化构建、测试和部署流程。
  4. 灾难恢复:备份策略、故障转移。

代码示例:Dockerfile和Kubernetes部署文件。

# Dockerfile for User Service
FROM golang:1.19-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o user-service .

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/user-service .
EXPOSE 8001
CMD ["./user-service"]
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: your-registry/user-service:latest
        ports:
        - containerPort: 8001
        resources:
          requests:
            memory: "64Mi"
            cpu: "250m"
          limits:
            memory: "128Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8001
          initialDelaySeconds: 5
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8001
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8001
  type: ClusterIP

总结与持续学习

通过21周的系统设计学校,我们从基础概念出发,逐步深入到分布式系统、高级架构模式,并最终通过实战项目巩固所学知识。这个旅程不仅涵盖了理论知识,还提供了丰富的代码示例和实践指导。

关键收获

  1. 系统思维:从需求分析到架构设计,全面考虑各种因素。
  2. 技术深度:掌握数据库、缓存、消息队列、微服务等核心技术。
  3. 实战能力:通过项目实践,将理论知识转化为实际解决方案。

持续学习建议

  1. 阅读经典书籍:《设计数据密集型应用》、《微服务架构设计模式》。
  2. 参与开源项目:在GitHub上贡献代码,学习优秀架构。
  3. 关注行业动态:关注技术博客、会议演讲,了解最新趋势。
  4. 实践与反思:不断在项目中应用所学,总结经验教训。

系统设计是一个不断演进的领域,没有终点。保持好奇心,持续学习,你将能够构建出更加健壮、可扩展的系统架构。祝你在系统设计的道路上不断进步!