引言:为什么需要系统设计学校?
在当今快速发展的技术世界中,构建可扩展的系统架构是每个软件工程师和架构师的核心技能。然而,系统设计往往是一个复杂且需要大量实践经验的领域。传统的学习方式通常需要数年时间,但通过结构化的21周计划,我们可以系统地掌握从基础概念到高级架构设计的全过程。
本指南将带你走过一个为期21周的实战旅程,每周聚焦一个关键主题,通过理论学习、实践项目和代码示例,帮助你从零开始构建可扩展的系统架构。无论你是初学者还是有经验的开发者,这个计划都将为你提供清晰的路径和实用的工具。
第1-3周:基础概念与系统思维
第1周:理解系统设计的基本原则
主题句:系统设计的核心在于理解需求、权衡取舍和模块化思维。
支持细节:
- 需求分析:明确系统要解决的问题,区分功能需求和非功能需求(如可扩展性、可靠性、性能)。
- 权衡取舍:没有完美的设计,只有适合当前场景的方案。例如,CAP定理(一致性、可用性、分区容错性)告诉我们,在分布式系统中必须做出选择。
- 模块化:将系统分解为独立的模块,每个模块负责特定功能,降低复杂度。
示例:设计一个简单的博客系统。
- 功能需求:用户注册、发布文章、评论、搜索。
- 非功能需求:支持1000并发用户,响应时间<500ms。
- 权衡:为了快速上线,初期使用单体架构,后期根据需求拆分为微服务。
第2周:数据存储基础
主题句:选择合适的数据存储方案是系统设计的关键。
支持细节:
- 关系型数据库(RDBMS):适合结构化数据,支持ACID事务。例如MySQL、PostgreSQL。
- 非关系型数据库(NoSQL):适合非结构化或半结构化数据,扩展性好。例如MongoDB(文档型)、Redis(键值型)。
- 数据模型设计:理解范式化与反范式化的权衡,设计合理的表结构或文档结构。
代码示例:使用Python和SQLAlchemy定义一个简单的博客文章模型。
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
articles = relationship('Article', back_populates='author')
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime, default=datetime.utcnow)
author = relationship('User', back_populates='articles')
第3周:网络与通信基础
主题句:理解网络协议和通信模式是构建分布式系统的基石。
支持细节:
- HTTP/HTTPS:了解请求-响应模型、状态码、RESTful API设计。
- TCP/IP:理解三次握手、四次挥手、可靠传输。
- 消息队列:用于异步通信和解耦系统组件。例如RabbitMQ、Kafka。
代码示例:使用Python的Flask框架创建一个简单的RESTful API。
from flask import Flask, jsonify, request
app = Flask(__name__)
# 模拟数据库
articles_db = []
@app.route('/articles', methods=['GET'])
def get_articles():
return jsonify(articles_db)
@app.route('/articles', methods=['POST'])
def create_article():
data = request.get_json()
if not data or 'title' not in data or 'content' not in data:
return jsonify({'error': 'Missing title or content'}), 400
article = {
'id': len(articles_db) + 1,
'title': data['title'],
'content': data['content']
}
articles_db.append(article)
return jsonify(article), 201
if __name__ == '__main__':
app.run(debug=True)
第4-7周:单体架构与数据库优化
第4周:单体架构设计
主题句:单体架构是大多数系统的起点,理解其优缺点至关重要。
支持细节:
- 优点:开发简单、部署容易、调试方便。
- 缺点:随着系统增长,代码库变得臃肿,扩展性差。
- 设计模式:MVC(模型-视图-控制器)模式,将业务逻辑、数据和界面分离。
代码示例:一个简单的单体博客应用结构。
blog-app/
├── app.py # 主应用文件
├── models.py # 数据模型
├── views.py # 视图逻辑
├── templates/ # HTML模板
├── static/ # 静态文件
└── config.py # 配置文件
第5周:数据库设计与优化
主题句:良好的数据库设计能显著提升系统性能。
支持细节:
- 索引优化:为常用查询字段创建索引,避免全表扫描。
- 查询优化:使用EXPLAIN分析查询计划,避免N+1问题。
- 分库分表:当数据量过大时,考虑水平或垂直拆分。
代码示例:使用SQLAlchemy进行数据库查询优化。
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# 创建数据库连接
engine = create_engine('sqlite:///blog.db')
Session = sessionmaker(bind=engine)
# 优化前:N+1查询问题
def get_articles_with_comments_naive():
session = Session()
articles = session.query(Article).all()
for article in articles:
# 每个文章单独查询评论,导致N+1问题
comments = session.query(Comment).filter_by(article_id=article.id).all()
print(f"Article: {article.title}, Comments: {len(comments)}")
session.close()
# 优化后:使用JOIN或批量查询
def get_articles_with_comments_optimized():
session = Session()
# 使用JOIN一次性获取文章和评论
articles = session.query(Article).join(Comment, Article.id == Comment.article_id).all()
for article in articles:
print(f"Article: {article.title}, Comments: {len(article.comments)}")
session.close()
第6周:缓存策略
主题句:缓存是提升系统性能的有效手段。
支持细节:
- 缓存层次:客户端缓存、CDN、应用层缓存(如Redis)、数据库缓存。
- 缓存策略:LRU(最近最少使用)、TTL(生存时间)。
- 缓存一致性:如何保证缓存与数据库的一致性。
代码示例:使用Redis缓存文章数据。
import redis
import json
from functools import wraps
# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cache_article(ttl=300):
"""装饰器:缓存文章数据"""
def decorator(func):
@wraps(func)
def wrapper(article_id):
# 生成缓存键
cache_key = f"article:{article_id}"
# 尝试从缓存获取
cached_data = redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 缓存未命中,执行原函数
result = func(article_id)
# 存入缓存
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
@cache_article(ttl=600)
def get_article_from_db(article_id):
"""从数据库获取文章"""
# 模拟数据库查询
return {
'id': article_id,
'title': f'Article {article_id}',
'content': f'Content of article {article_id}',
'created_at': '2023-01-01'
}
第7周:负载均衡与水平扩展
主题句:负载均衡是实现水平扩展的关键技术。
支持细节:
- 负载均衡算法:轮询、加权轮询、最少连接数、IP哈希。
- 负载均衡器:Nginx、HAProxy、云服务负载均衡器(如AWS ELB)。
- 水平扩展:通过增加服务器实例来提升处理能力。
代码示例:使用Nginx配置负载均衡。
# nginx.conf
http {
upstream blog_backend {
# 轮询算法
server 127.0.0.1:5000;
server 127.0.0.1:5001;
server 127.0.0.1:5002;
# 可选:加权轮询
# server 127.0.0.1:5000 weight=3;
# server 127.0.0.1:5001 weight=2;
# server 127.0.0.1:5002 weight=1;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://blog_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}
第8-12周:分布式系统基础
第8周:分布式系统概念
主题句:理解分布式系统的核心挑战是设计可扩展架构的前提。
支持细节:
- CAP定理:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三者不可兼得。
- BASE理论:基本可用(Basically Available)、软状态(Soft state)、最终一致性(Eventual consistency)。
- 分布式事务:两阶段提交(2PC)、三阶段提交(3PC)、Saga模式。
代码示例:模拟一个简单的分布式事务(使用补偿事务)。
import time
from typing import Dict, List
class DistributedTransaction:
def __init__(self):
self.steps = []
self.compensations = []
def add_step(self, action, compensation):
"""添加事务步骤和补偿操作"""
self.steps.append(action)
self.compensations.append(compensation)
def execute(self):
"""执行事务"""
executed_steps = []
try:
for i, step in enumerate(self.steps):
print(f"执行步骤 {i+1}: {step.__name__}")
step()
executed_steps.append(i)
except Exception as e:
print(f"事务失败: {e}")
# 执行补偿操作
for i in reversed(executed_steps):
print(f"执行补偿 {i+1}: {self.compensations[i].__name__}")
self.compensations[i]()
return False
print("事务成功完成")
return True
# 示例:转账事务
def transfer_money(from_account, to_account, amount):
"""转账操作"""
print(f"从账户 {from_account} 转账 {amount} 到账户 {to_account}")
# 模拟数据库操作
time.sleep(0.1)
def compensate_transfer(from_account, to_account, amount):
"""转账补偿操作"""
print(f"回滚转账: 从账户 {to_account} 退还 {amount} 到账户 {from_account}")
time.sleep(0.1)
# 创建事务
transaction = DistributedTransaction()
transaction.add_step(
lambda: transfer_money('A', 'B', 100),
lambda: compensate_transfer('A', 'B', 100)
)
transaction.add_step(
lambda: transfer_money('B', 'C', 50),
lambda: compensate_transfer('B', 'C', 50)
)
# 执行事务
transaction.execute()
第9周:消息队列与异步处理
主题句:消息队列是实现系统解耦和异步处理的利器。
支持细节:
- 消息队列类型:点对点(Queue)、发布/订阅(Topic)。
- 消息可靠性:持久化、确认机制、重试策略。
- 常见队列:RabbitMQ(AMQP协议)、Kafka(高吞吐量)、Redis Streams。
代码示例:使用RabbitMQ实现异步任务处理。
import pika
import json
import time
# 生产者:发送消息
def send_message(queue_name, message):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
print(f" [x] Sent {message}")
connection.close()
# 消费者:处理消息
def consume_messages(queue_name):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
def callback(ch, method, properties, body):
message = json.loads(body)
print(f" [x] Received {message}")
# 模拟处理时间
time.sleep(1)
print(f" [x] Done processing {message}")
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
# 使用示例
if __name__ == '__main__':
# 发送消息
send_message('task_queue', {'task_id': 1, 'action': 'send_email'})
send_message('task_queue', {'task_id': 2, 'action': 'generate_report'})
# 消费消息(在另一个进程或线程中运行)
# consume_messages('task_queue')
第10周:分布式缓存
主题句:分布式缓存可以显著提升系统性能并减少数据库压力。
支持细节:
- 缓存策略:Cache-Aside、Read-Through、Write-Through、Write-Behind。
- 缓存一致性:如何处理缓存与数据库的同步问题。
- 分布式缓存:Redis Cluster、Memcached。
代码示例:实现Cache-Aside模式。
import redis
import json
from datetime import datetime, timedelta
class DistributedCache:
def __init__(self, redis_client):
self.redis = redis_client
def get(self, key, fallback_func, ttl=300):
"""获取缓存,如果不存在则调用回退函数"""
cached = self.redis.get(key)
if cached:
print(f"Cache hit for key: {key}")
return json.loads(cached)
print(f"Cache miss for key: {key}")
# 调用回退函数获取数据
data = fallback_func()
# 存入缓存
self.redis.setex(key, ttl, json.dumps(data))
return data
def set(self, key, value, ttl=300):
"""设置缓存"""
self.redis.setex(key, ttl, json.dumps(value))
def delete(self, key):
"""删除缓存"""
self.redis.delete(key)
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = DistributedCache(redis_client)
def get_user_from_db(user_id):
"""模拟从数据库获取用户数据"""
print(f"Fetching user {user_id} from database...")
return {
'id': user_id,
'name': f'User {user_id}',
'email': f'user{user_id}@example.com',
'created_at': datetime.now().isoformat()
}
# 获取用户数据
user = cache.get(f"user:{1}", lambda: get_user_from_db(1), ttl=600)
print(user)
# 再次获取,应该从缓存读取
user = cache.get(f"user:{1}", lambda: get_user_from_db(1), ttl=600)
print(user)
第11周:微服务架构入门
主题句:微服务架构将单体应用拆分为小型、独立的服务,提升可扩展性和可维护性。
支持细节:
- 微服务优点:独立部署、技术异构、故障隔离。
- 微服务挑战:服务发现、分布式事务、监控复杂。
- 设计原则:单一职责、松耦合、高内聚。
代码示例:使用FastAPI创建一个简单的微服务。
# user_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
app = FastAPI(title="User Service")
class User(BaseModel):
id: int
name: str
email: str
# 模拟数据库
users_db = {}
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
if user_id not in users_db:
raise HTTPException(status_code=404, detail="User not found")
return users_db[user_id]
@app.post("/users/", response_model=User)
async def create_user(user: User):
users_db[user.id] = user.dict()
return user
@app.get("/users/", response_model=List[User])
async def list_users():
return list(users_db.values())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
第12周:服务发现与配置管理
主题句:服务发现和配置管理是微服务架构的核心基础设施。
支持细节:
- 服务发现:Consul、Etcd、ZooKeeper、Kubernetes Service。
- 配置管理:Spring Cloud Config、Consul KV、etcd。
- 动态配置:配置热更新、环境隔离。
代码示例:使用Consul进行服务注册与发现。
import consul
import time
import requests
from threading import Thread
class ServiceRegistry:
def __init__(self, host='localhost', port=8500):
self.consul = consul.Consul(host=host, port=port)
def register_service(self, service_name, service_id, address, port, tags=None):
"""注册服务"""
check = consul.Check.http(f"http://{address}:{port}/health", interval="10s")
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=address,
port=port,
tags=tags or [],
check=check
)
print(f"Service {service_name} registered at {address}:{port}")
def discover_service(self, service_name):
"""发现服务"""
services = self.consul.health.service(service_name, passing=True)[1]
return [s['Service'] for s in services]
# 使用示例
registry = ServiceRegistry()
# 注册服务
registry.register_service(
service_name="user-service",
service_id="user-service-1",
address="127.0.0.1",
port=8001,
tags=["v1"]
)
# 发现服务
services = registry.discover_service("user-service")
print(f"Discovered services: {services}")
第13-17周:高级架构模式
第13周:API网关
主题句:API网关是微服务架构的入口,负责路由、认证、限流等横切关注点。
支持细节:
- 功能:路由、认证、限流、日志、监控。
- 实现:Nginx、Kong、Spring Cloud Gateway、AWS API Gateway。
- 设计模式:网关聚合、网关转发。
代码示例:使用Kong作为API网关。
# 安装Kong(Docker方式)
docker run -d --name kong-database \
-p 5432:5432 \
-e "POSTGRES_USER=kong" \
-e "POSTGRES_DB=kong" \
-e "POSTGRES_PASSWORD=kong" \
postgres:9.6
docker run --rm \
--link kong-database:kong-database \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_PASSWORD=kong" \
kong:latest kong migrations bootstrap
docker run -d --name kong \
--link kong-database:kong-database \
-e "KONG_DATABASE=postgres" \
-e "KONG_PG_HOST=kong-database" \
-e "KONG_PG_PASSWORD=kong" \
-e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \
-e "KONG_ADMIN_ACCESS_LOG=/dev/stdout" \
-e "KONG_PROXY_ERROR_LOG=/dev/stderr" \
-e "KONG_ADMIN_ERROR_LOG=/dev/stderr" \
-p 8000:8000 \
-p 8443:8443 \
-p 8001:8001 \
-p 8444:8444 \
kong:latest
# 配置Kong网关
curl -i -X POST http://localhost:8001/services/ \
--data name=user-service \
--data url='http://127.0.0.1:8001'
curl -i -X POST http://localhost:8001/services/user-service/routes \
--data 'paths[]=/users' \
--data 'methods[]=GET' \
--data 'methods[]=POST'
# 添加限流插件
curl -i -X POST http://localhost:8001/services/user-service/plugins \
--data name=rate-limiting \
--data config.minute=100 \
--data config.hour=1000
第14周:事件驱动架构
主题句:事件驱动架构通过事件流实现松耦合,适合高并发场景。
支持细节:
- 事件源(Event Sourcing):将状态变化存储为事件序列。
- CQRS(命令查询职责分离):将读写操作分离,优化性能。
- 事件总线:Kafka、RabbitMQ、AWS EventBridge。
代码示例:实现简单的事件源系统。
import json
from datetime import datetime
from typing import List, Dict
class Event:
def __init__(self, event_type: str, data: Dict):
self.event_type = event_type
self.data = data
self.timestamp = datetime.now().isoformat()
def to_dict(self):
return {
'event_type': self.event_type,
'data': self.data,
'timestamp': self.timestamp
}
class EventStore:
def __init__(self):
self.events = []
def append(self, event: Event):
self.events.append(event)
def get_events(self, aggregate_id: str = None) -> List[Event]:
if aggregate_id:
return [e for e in self.events if e.data.get('aggregate_id') == aggregate_id]
return self.events
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.status = 'created'
self.amount = 0
self.version = 0
def apply_event(self, event: Event):
if event.event_type == 'OrderCreated':
self.status = 'created'
self.amount = event.data['amount']
self.version += 1
elif event.event_type == 'OrderPaid':
self.status = 'paid'
self.version += 1
elif event.event_type == 'OrderCancelled':
self.status = 'cancelled'
self.version += 1
def create_order(self, amount: float, event_store: EventStore):
event = Event('OrderCreated', {
'aggregate_id': self.order_id,
'amount': amount,
'version': self.version
})
event_store.append(event)
self.apply_event(event)
def pay_order(self, event_store: EventStore):
if self.status != 'created':
raise ValueError("Order cannot be paid")
event = Event('OrderPaid', {
'aggregate_id': self.order_id,
'version': self.version
})
event_store.append(event)
self.apply_event(event)
# 使用示例
event_store = EventStore()
order = OrderAggregate('order-123')
order.create_order(100.0, event_store)
order.pay_order(event_store)
# 重建聚合状态
def rebuild_order_state(order_id: str, event_store: EventStore) -> OrderAggregate:
events = event_store.get_events(order_id)
order = OrderAggregate(order_id)
for event in events:
order.apply_event(event)
return order
reconstructed_order = rebuild_order_state('order-123', event_store)
print(f"Order status: {reconstructed_order.status}, Amount: {reconstructed_order.amount}")
第15周:数据库分片与分区
主题句:当单数据库无法承载数据量时,分片和分区是必要的扩展手段。
支持细节:
- 水平分片:将数据按某种规则分布到多个数据库实例。
- 垂直分片:按业务模块拆分数据库。
- 分片策略:范围分片、哈希分片、地理位置分片。
代码示例:实现简单的哈希分片。
import hashlib
from typing import List, Dict
class DatabaseShard:
def __init__(self, shard_id: int, connection_string: str):
self.shard_id = shard_id
self.connection_string = connection_string
self.data = {} # 模拟数据库
def get(self, key: str):
return self.data.get(key)
def set(self, key: str, value):
self.data[key] = value
class ShardedDatabase:
def __init__(self, shards: List[DatabaseShard]):
self.shards = shards
self.num_shards = len(shards)
def get_shard(self, key: str) -> DatabaseShard:
"""根据key选择分片"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
shard_index = hash_value % self.num_shards
return self.shards[shard_index]
def get(self, key: str):
shard = self.get_shard(key)
return shard.get(key)
def set(self, key: str, value):
shard = self.get_shard(key)
shard.set(key, value)
# 使用示例
shards = [
DatabaseShard(0, "postgresql://localhost/db0"),
DatabaseShard(1, "postgresql://localhost/db1"),
DatabaseShard(2, "postgresql://localhost/db2")
]
db = ShardedDatabase(shards)
# 存储数据
db.set("user:1", {"id": 1, "name": "Alice"})
db.set("user:2", {"id": 2, "name": "Bob"})
db.set("user:3", {"id": 3, "name": "Charlie"})
# 读取数据
print(db.get("user:1")) # 可能在分片0
print(db.get("user:2")) # 可能在分片1
print(db.get("user:3")) # 可能在分片2
第16周:分布式锁与协调
主题句:在分布式环境中,协调多个节点的访问需要分布式锁。
支持细节:
- 分布式锁实现:基于数据库、Redis、ZooKeeper。
- 锁的粒度:全局锁、资源锁、乐观锁。
- 锁的可靠性:避免死锁、锁超时、锁续期。
代码示例:使用Redis实现分布式锁。
import redis
import time
import uuid
from contextlib import contextmanager
class RedisDistributedLock:
def __init__(self, redis_client, lock_key, timeout=10):
self.redis = redis_client
self.lock_key = lock_key
self.timeout = timeout
self.identifier = str(uuid.uuid4())
@contextmanager
def acquire(self):
"""获取锁,使用上下文管理器自动释放"""
acquired = self._acquire()
try:
yield acquired
finally:
if acquired:
self._release()
def _acquire(self):
"""尝试获取锁"""
# 使用SETNX命令,只有key不存在时才设置
acquired = self.redis.set(self.lock_key, self.identifier, nx=True, ex=self.timeout)
if acquired:
print(f"Lock acquired: {self.lock_key}")
return True
print(f"Failed to acquire lock: {self.lock_key}")
return False
def _release(self):
"""释放锁"""
# 使用Lua脚本确保原子性
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.lock_key, self.identifier)
if result:
print(f"Lock released: {self.lock_key}")
else:
print(f"Failed to release lock: {self.lock_key}")
# 使用示例
redis_client = redis.Redis(host='localhost', port=6379, db=0)
lock = RedisDistributedLock(redis_client, "resource:critical", timeout=5)
# 模拟并发访问
def critical_section():
with lock.acquire() as acquired:
if acquired:
print("Executing critical section...")
time.sleep(2) # 模拟耗时操作
print("Critical section completed")
else:
print("Could not enter critical section")
# 多个线程尝试获取锁
import threading
threads = []
for i in range(3):
t = threading.Thread(target=critical_section)
threads.append(t)
t.start()
for t in threads:
t.join()
第17周:监控与可观测性
主题句:监控和可观测性是确保分布式系统健康运行的关键。
支持细节:
- 监控指标:黄金指标(延迟、流量、错误、饱和度)。
- 日志聚合:ELK Stack(Elasticsearch, Logstash, Kibana)。
- 分布式追踪:Jaeger、Zipkin、OpenTelemetry。
代码示例:使用Prometheus和Grafana监控应用。
from prometheus_client import start_http_server, Counter, Histogram, Gauge
import time
import random
# 定义指标
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'])
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Number of active connections')
# 模拟HTTP请求处理
def handle_request(method, endpoint):
REQUEST_COUNT.labels(method=method, endpoint=endpoint).inc()
ACTIVE_CONNECTIONS.inc()
start_time = time.time()
# 模拟处理时间
processing_time = random.uniform(0.1, 0.5)
time.sleep(processing_time)
duration = time.time() - start_time
REQUEST_DURATION.labels(method=method, endpoint=endpoint).observe(duration)
ACTIVE_CONNECTIONS.dec()
# 启动Prometheus指标服务器
if __name__ == '__main__':
start_http_server(8000)
print("Prometheus metrics server started on port 8000")
# 模拟请求
while True:
handle_request('GET', '/api/users')
handle_request('POST', '/api/articles')
time.sleep(1)
第18-21周:实战项目与优化
第18周:项目规划与架构设计
主题句:一个成功的项目始于清晰的规划和合理的架构设计。
支持细节:
- 需求分析:明确项目目标、用户群体、功能需求。
- 技术选型:根据需求选择合适的技术栈。
- 架构设计:绘制架构图,定义组件和接口。
项目示例:设计一个社交网络平台。
- 功能需求:用户注册、发帖、点赞、评论、关注、消息通知。
- 非功能需求:支持10万并发用户,响应时间<200ms,99.9%可用性。
- 技术栈:
- 前端:React + TypeScript
- 后端:Go(高性能)或Python(快速开发)
- 数据库:PostgreSQL(主数据)+ Redis(缓存)
- 消息队列:Kafka(事件流)
- 部署:Kubernetes + Docker
架构图(文本描述):
用户 -> CDN -> API网关 -> 微服务集群
-> 用户服务 -> PostgreSQL
-> 帖子服务 -> PostgreSQL + Elasticsearch(搜索)
-> 通知服务 -> Kafka -> 邮件/推送服务
-> 监控服务 -> Prometheus + Grafana
第19周:实现核心功能
主题句:分阶段实现核心功能,确保每个模块的稳定性和可扩展性。
支持细节:
- 用户服务:注册、登录、个人信息管理。
- 帖子服务:创建、读取、更新、删除帖子。
- 社交功能:关注、点赞、评论。
代码示例:用户服务的核心API(使用Go语言)。
// user_service.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/postgres"
)
// User 模型
type User struct {
ID uint `gorm:"primary_key" json:"id"`
Username string `gorm:"unique;not null" json:"username"`
Email string `gorm:"unique;not null" json:"email"`
Password string `gorm:"not null" json:"-"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// 数据库连接
var db *gorm.DB
func initDB() {
var err error
db, err = gorm.Open("postgres", "host=localhost port=5432 user=postgres dbname=blog sslmode=disable")
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
db.AutoMigrate(&User{})
}
// API处理器
func getUsers(w http.ResponseWriter, r *http.Request) {
var users []User
db.Find(&users)
json.NewEncoder(w).Encode(users)
}
func getUser(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
var user User
if db.First(&user, params["id"]).RecordNotFound() {
http.Error(w, "User not found", http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(user)
}
func createUser(w http.ResponseWriter, r *http.Request) {
var user User
json.NewDecoder(r.Body).Decode(&user)
if err := db.Create(&user).Error; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(user)
}
func main() {
initDB()
defer db.Close()
router := mux.NewRouter()
router.HandleFunc("/users", getUsers).Methods("GET")
router.HandleFunc("/users/{id}", getUser).Methods("GET")
router.HandleFunc("/users", createUser).Methods("POST")
log.Println("Starting user service on :8001")
log.Fatal(http.ListenAndServe(":8001", router))
}
第20周:性能优化与压力测试
主题句:性能优化是系统设计的持续过程,需要通过测试和监控不断改进。
支持细节:
- 性能瓶颈分析:使用Profiling工具(如pprof for Go, cProfile for Python)。
- 压力测试:使用JMeter、Locust、k6模拟高并发场景。
- 优化策略:代码优化、缓存优化、数据库优化、异步处理。
代码示例:使用Locust进行压力测试。
# locustfile.py
from locust import HttpUser, task, between
import random
class SocialNetworkUser(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_users(self):
self.client.get("/users")
@task(2)
def get_user(self):
user_id = random.randint(1, 100)
self.client.get(f"/users/{user_id}")
@task(1)
def create_user(self):
username = f"user_{random.randint(1000, 9999)}"
email = f"{username}@example.com"
self.client.post("/users", json={
"username": username,
"email": email,
"password": "password123"
})
@task(2)
def get_posts(self):
self.client.get("/posts")
@task(1)
def create_post(self):
self.client.post("/posts", json={
"title": f"Test Post {random.randint(1, 1000)}",
"content": "This is a test post content.",
"user_id": random.randint(1, 100)
})
第21周:部署与运维
主题句:成功的部署和运维是系统稳定运行的保障。
支持细节:
- 容器化:使用Docker打包应用。
- 编排:使用Kubernetes管理容器集群。
- CI/CD:自动化构建、测试和部署流程。
- 灾难恢复:备份策略、故障转移。
代码示例:Dockerfile和Kubernetes部署文件。
# Dockerfile for User Service
FROM golang:1.19-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o user-service .
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/user-service .
EXPOSE 8001
CMD ["./user-service"]
# kubernetes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: your-registry/user-service:latest
ports:
- containerPort: 8001
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- protocol: TCP
port: 80
targetPort: 8001
type: ClusterIP
总结与持续学习
通过21周的系统设计学校,我们从基础概念出发,逐步深入到分布式系统、高级架构模式,并最终通过实战项目巩固所学知识。这个旅程不仅涵盖了理论知识,还提供了丰富的代码示例和实践指导。
关键收获:
- 系统思维:从需求分析到架构设计,全面考虑各种因素。
- 技术深度:掌握数据库、缓存、消息队列、微服务等核心技术。
- 实战能力:通过项目实践,将理论知识转化为实际解决方案。
持续学习建议:
- 阅读经典书籍:《设计数据密集型应用》、《微服务架构设计模式》。
- 参与开源项目:在GitHub上贡献代码,学习优秀架构。
- 关注行业动态:关注技术博客、会议演讲,了解最新趋势。
- 实践与反思:不断在项目中应用所学,总结经验教训。
系统设计是一个不断演进的领域,没有终点。保持好奇心,持续学习,你将能够构建出更加健壮、可扩展的系统架构。祝你在系统设计的道路上不断进步!
