引言
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,它将应用程序的读取(查询)和写入(命令)操作分离到不同的模型和数据存储中。这种模式特别适用于复杂业务场景,能够显著提升系统的可扩展性、性能和维护性。本文将从理论基础出发,逐步深入到实际代码实现,提供一个完整的CQRS实践指南。
1. CQRS理论基础
1.1 什么是CQRS?
CQRS的核心思想是将应用程序的读取(Query)和写入(Command)操作分离。在传统的CRUD(Create, Read, Update, Delete)架构中,同一个数据模型通常同时处理读写操作,这可能导致以下问题:
- 性能瓶颈:读写操作共享同一数据模型,难以针对不同操作进行优化
- 复杂性增加:业务逻辑在读写操作中混杂,难以维护
- 扩展性受限:无法独立扩展读写操作
CQRS通过分离读写职责来解决这些问题:
graph LR
A[客户端] --> B[Command Handler]
A --> C[Query Handler]
B --> D[Write Model]
C --> E[Read Model]
D --> F[Event Store]
E --> F
1.2 CQRS的核心组件
- Command(命令):表示一个意图改变系统状态的操作,通常是动词短语(如”CreateOrder”、”UpdateUser”)
- Query(查询):表示一个只读操作,不改变系统状态(如”GetOrderById”、”GetUserList”)
- Command Handler(命令处理器):处理命令,执行业务逻辑,更新写模型
- Query Handler(查询处理器):处理查询,从读模型中获取数据
- Write Model(写模型):用于写入操作的数据模型,通常包含完整的业务逻辑和验证
- Read Model(读模型):用于读取操作的数据模型,通常经过优化以满足特定查询需求
1.3 CQRS与事件溯源(Event Sourcing)的关系
CQRS经常与事件溯源结合使用,形成更强大的架构模式:
- 事件溯源:将系统状态的变化记录为一系列不可变的事件,而不是直接修改当前状态
- CQRS + 事件溯源:写模型通过事件溯源记录所有状态变化,读模型通过重放事件或订阅事件流来构建
2. CQRS的适用场景
2.1 适合使用CQRS的场景
- 复杂业务逻辑:业务规则复杂,读写操作差异大
- 高并发读取:系统需要处理大量读取请求,需要优化读取性能
- 多数据源:需要从多个数据源聚合数据
- 审计需求:需要完整的操作历史记录
- 微服务架构:不同服务可能需要不同的数据模型
2.2 不适合使用CQRS的场景
- 简单CRUD应用:业务逻辑简单,读写操作差异不大
- 小型项目:增加架构复杂度可能得不偿失
- 实时一致性要求极高:CQRS通常引入最终一致性
3. CQRS代码实践:基础实现
3.1 项目结构设计
一个典型的CQRS项目结构如下:
CQRS-Example/
├── src/
│ ├── Application/
│ │ ├── Commands/ # 命令定义
│ │ ├── Queries/ # 查询定义
│ │ ├── Handlers/ # 处理器
│ │ └── Models/ # 应用层模型
│ ├── Domain/
│ │ ├── Entities/ # 领域实体
│ │ ├── Events/ # 领域事件
│ │ ├── Repositories/ # 仓储接口
│ │ └── Services/ # 领域服务
│ ├── Infrastructure/
│ │ ├── Persistence/ # 持久化实现
│ │ ├── Messaging/ # 消息处理
│ │ └── Caching/ # 缓存实现
│ └── Web/
│ ├── Controllers/ # API控制器
│ └── Models/ # Web模型
├── tests/
└── README.md
3.2 命令和查询的定义
3.2.1 命令定义
// Application/Commands/CreateOrderCommand.cs
public class CreateOrderCommand : ICommand
{
public Guid OrderId { get; set; }
public string CustomerName { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
}
// Application/Commands/UpdateOrderCommand.cs
public class UpdateOrderCommand : ICommand
{
public Guid OrderId { get; set; }
public string CustomerName { get; set; }
public List<OrderItem> Items { get; set; }
public decimal TotalAmount { get; set; }
}
// Application/Commands/DeleteOrderCommand.cs
public class DeleteOrderCommand : ICommand
{
public Guid OrderId { get; set; }
}
3.2.2 查询定义
// Application/Queries/GetOrderByIdQuery.cs
public class GetOrderByIdQuery : IQuery<OrderDto>
{
public Guid OrderId { get; set; }
}
// Application/Queries/GetOrdersByCustomerQuery.cs
public class GetOrdersByCustomerQuery : IQuery<List<OrderDto>>
{
public string CustomerName { get; set; }
public int Page { get; set; } = 1;
public int PageSize { get; set; } = 10;
}
// Application/Queries/GetOrderStatisticsQuery.cs
public class GetOrderStatisticsQuery : IQuery<OrderStatisticsDto>
{
public DateTime FromDate { get; set; }
public DateTime ToDate { get; set; }
}
3.3 处理器实现
3.3.1 命令处理器
// Application/Handlers/Commands/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
private readonly IOrderRepository _orderRepository;
private readonly IEventBus _eventBus;
public CreateOrderCommandHandler(
IOrderRepository orderRepository,
IEventBus eventBus)
{
_orderRepository = orderRepository;
_eventBus = eventBus;
}
public async Task HandleAsync(CreateOrderCommand command)
{
// 1. 验证命令
if (string.IsNullOrWhiteSpace(command.CustomerName))
throw new ArgumentException("Customer name is required");
if (command.Items == null || command.Items.Count == 0)
throw new ArgumentException("Order must have at least one item");
// 2. 创建领域实体
var order = new Order(
command.OrderId,
command.CustomerName,
command.Items,
command.TotalAmount);
// 3. 执行业务规则验证
if (!order.IsValid())
throw new InvalidOperationException("Order validation failed");
// 4. 保存到写模型
await _orderRepository.AddAsync(order);
// 5. 发布领域事件
var orderCreatedEvent = new OrderCreatedEvent
{
OrderId = order.Id,
CustomerName = order.CustomerName,
TotalAmount = order.TotalAmount,
CreatedAt = DateTime.UtcNow
};
await _eventBus.PublishAsync(orderCreatedEvent);
// 6. 返回结果(可选)
return;
}
}
3.3.2 查询处理器
// Application/Handlers/Queries/GetOrderByIdQueryHandler.cs
public class GetOrderByIdQueryHandler : IQueryHandler<GetOrderByIdQuery, OrderDto>
{
private readonly IOrderReadRepository _orderReadRepository;
public GetOrderByIdQueryHandler(IOrderReadRepository orderReadRepository)
{
_orderReadRepository = orderReadRepository;
}
public async Task<OrderDto> HandleAsync(GetOrderByIdQuery query)
{
// 1. 从读模型获取数据
var order = await _orderReadRepository.GetByIdAsync(query.OrderId);
if (order == null)
throw new KeyNotFoundException($"Order with ID {query.OrderId} not found");
// 2. 转换为DTO
var orderDto = new OrderDto
{
Id = order.Id,
CustomerName = order.CustomerName,
Items = order.Items,
TotalAmount = order.TotalAmount,
CreatedAt = order.CreatedAt
};
return orderDto;
}
}
3.4 仓储接口与实现
3.4.1 写模型仓储
// Domain/Repositories/IOrderRepository.cs
public interface IOrderRepository
{
Task AddAsync(Order order);
Task UpdateAsync(Order order);
Task DeleteAsync(Guid id);
Task<Order> GetByIdAsync(Guid id);
}
// Infrastructure/Persistence/OrderRepository.cs
public class OrderRepository : IOrderRepository
{
private readonly WriteDbContext _writeDbContext;
public OrderRepository(WriteDbContext writeDbContext)
{
_writeDbContext = writeDbContext;
}
public async Task AddAsync(Order order)
{
_writeDbContext.Orders.Add(order);
await _writeDbContext.SaveChangesAsync();
}
public async Task UpdateAsync(Order order)
{
_writeDbContext.Orders.Update(order);
await _writeDbContext.SaveChangesAsync();
}
public async Task DeleteAsync(Guid id)
{
var order = await _writeDbContext.Orders.FindAsync(id);
if (order != null)
{
_writeDbContext.Orders.Remove(order);
await _writeDbContext.SaveChangesAsync();
}
}
public async Task<Order> GetByIdAsync(Guid id)
{
return await _writeDbContext.Orders.FindAsync(id);
}
}
3.4.2 读模型仓储
// Domain/Repositories/IOrderReadRepository.cs
public interface IOrderReadRepository
{
Task<OrderReadModel> GetByIdAsync(Guid id);
Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize);
Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate);
}
// Infrastructure/Persistence/OrderReadRepository.cs
public class OrderReadRepository : IOrderReadRepository
{
private readonly ReadDbContext _readDbContext;
public OrderReadRepository(ReadDbContext readDbContext)
{
_readDbContext = readDbContext;
}
public async Task<OrderReadModel> GetByIdAsync(Guid id)
{
return await _readDbContext.OrderReadModels
.AsNoTracking()
.FirstOrDefaultAsync(o => o.Id == id);
}
public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
{
return await _readDbContext.OrderReadModels
.AsNoTracking()
.Where(o => o.CustomerName == customerName)
.OrderByDescending(o => o.CreatedAt)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.ToListAsync();
}
public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
{
var statistics = await _readDbContext.OrderReadModels
.AsNoTracking()
.Where(o => o.CreatedAt >= fromDate && o.CreatedAt <= toDate)
.GroupBy(o => 1)
.Select(g => new OrderStatisticsDto
{
TotalOrders = g.Count(),
TotalRevenue = g.Sum(o => o.TotalAmount),
AverageOrderValue = g.Average(o => o.TotalAmount)
})
.FirstOrDefaultAsync();
return statistics ?? new OrderStatisticsDto();
}
}
3.5 读写模型定义
3.5.1 写模型(领域实体)
// Domain/Entities/Order.cs
public class Order
{
public Guid Id { get; private set; }
public string CustomerName { get; private set; }
public List<OrderItem> Items { get; private set; }
public decimal TotalAmount { get; private set; }
public DateTime CreatedAt { get; private set; }
public DateTime? UpdatedAt { get; private set; }
public OrderStatus Status { get; private set; }
// 构造函数
public Order(Guid id, string customerName, List<OrderItem> items, decimal totalAmount)
{
Id = id;
CustomerName = customerName;
Items = items;
TotalAmount = totalAmount;
CreatedAt = DateTime.UtcNow;
Status = OrderStatus.Pending;
}
// 业务方法
public void Update(string customerName, List<OrderItem> items, decimal totalAmount)
{
CustomerName = customerName;
Items = items;
TotalAmount = totalAmount;
UpdatedAt = DateTime.UtcNow;
}
public void Confirm()
{
if (Status != OrderStatus.Pending)
throw new InvalidOperationException("Only pending orders can be confirmed");
Status = OrderStatus.Confirmed;
UpdatedAt = DateTime.UtcNow;
}
public void Cancel()
{
if (Status == OrderStatus.Completed)
throw new InvalidOperationException("Completed orders cannot be cancelled");
Status = OrderStatus.Cancelled;
UpdatedAt = DateTime.UtcNow;
}
// 验证方法
public bool IsValid()
{
if (string.IsNullOrWhiteSpace(CustomerName))
return false;
if (Items == null || Items.Count == 0)
return false;
if (TotalAmount <= 0)
return false;
return true;
}
}
// Domain/Entities/OrderItem.cs
public class OrderItem
{
public Guid ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
public decimal Subtotal => Quantity * UnitPrice;
}
// Domain/Entities/OrderStatus.cs
public enum OrderStatus
{
Pending,
Confirmed,
Processing,
Shipped,
Completed,
Cancelled
}
3.5.2 读模型(DTO/ViewModel)
// Application/Models/OrderDto.cs
public class OrderDto
{
public Guid Id { get; set; }
public string CustomerName { get; set; }
public List<OrderItemDto> Items { get; set; }
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? UpdatedAt { get; set; }
public string Status { get; set; }
}
// Application/Models/OrderItemDto.cs
public class OrderItemDto
{
public Guid ProductId { get; set; }
public string ProductName { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
public decimal Subtotal { get; set; }
}
// Application/Models/OrderStatisticsDto.cs
public class OrderStatisticsDto
{
public int TotalOrders { get; set; }
public decimal TotalRevenue { get; set; }
public decimal AverageOrderValue { get; set; }
}
3.6 事件处理
3.6.1 领域事件定义
// Domain/Events/OrderCreatedEvent.cs
public class OrderCreatedEvent : IEvent
{
public Guid OrderId { get; set; }
public string CustomerName { get; set; }
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}
// Domain/Events/OrderUpdatedEvent.cs
public class OrderUpdatedEvent : IEvent
{
public Guid OrderId { get; set; }
public string CustomerName { get; set; }
public decimal TotalAmount { get; set; }
public DateTime UpdatedAt { get; set; }
}
// Domain/Events/OrderCancelledEvent.cs
public class OrderCancelledEvent : IEvent
{
public Guid OrderId { get; set; }
public DateTime CancelledAt { get; set; }
}
3.6.2 事件处理器
// Infrastructure/Messaging/EventHandlers/OrderCreatedEventHandler.cs
public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IReadModelUpdater _readModelUpdater;
private readonly INotificationService _notificationService;
public OrderCreatedEventHandler(
IReadModelUpdater readModelUpdater,
INotificationService notificationService)
{
_readModelUpdater = readModelUpdater;
_notificationService = notificationService;
}
public async Task HandleAsync(OrderCreatedEvent @event)
{
// 1. 更新读模型
await _readModelUpdater.UpdateReadModelAsync(@event);
// 2. 发送通知
await _notificationService.SendOrderCreatedNotificationAsync(
@event.OrderId,
@event.CustomerName);
// 3. 记录日志
Console.WriteLine($"Order created: {@event.OrderId}");
}
}
// Infrastructure/Messaging/EventBus.cs
public class EventBus : IEventBus
{
private readonly IServiceProvider _serviceProvider;
public EventBus(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
var handlers = _serviceProvider.GetServices<IEventHandler<TEvent>>();
foreach (var handler in handlers)
{
await handler.HandleAsync(@event);
}
}
}
4. CQRS与事件溯源结合
4.1 事件溯源基础
事件溯源将系统状态的变化记录为一系列不可变的事件。每个事件代表一个状态变化,系统状态通过重放所有事件来重建。
// Domain/Events/IEvent.cs
public interface IEvent
{
Guid AggregateId { get; }
DateTime Timestamp { get; }
int Version { get; }
}
// Domain/Events/EventStore.cs
public class EventStore : IEventStore
{
private readonly List<IEvent> _events = new List<IEvent>();
public async Task SaveAsync(IEvent @event)
{
_events.Add(@event);
// 实际应用中,这里会持久化到数据库
}
public async Task<List<IEvent>> GetEventsAsync(Guid aggregateId)
{
return _events
.Where(e => e.AggregateId == aggregateId)
.OrderBy(e => e.Version)
.ToList();
}
public async Task<List<IEvent>> GetAllEventsAsync()
{
return _events
.OrderBy(e => e.Timestamp)
.ToList();
}
}
4.2 聚合根与事件重放
// Domain/Aggregates/OrderAggregate.cs
public class OrderAggregate
{
private readonly List<IEvent> _uncommittedEvents = new List<IEvent>();
private int _version = 0;
public Guid Id { get; private set; }
public string CustomerName { get; private set; }
public List<OrderItem> Items { get; private set; }
public decimal TotalAmount { get; private set; }
public OrderStatus Status { get; private set; }
// 构造函数(从事件重建状态)
public OrderAggregate(IEnumerable<IEvent> events)
{
foreach (var @event in events)
{
ApplyEvent(@event, false);
}
}
// 业务操作
public void CreateOrder(Guid id, string customerName, List<OrderItem> items, decimal totalAmount)
{
var @event = new OrderCreatedEvent
{
AggregateId = id,
Version = _version + 1,
Timestamp = DateTime.UtcNow,
CustomerName = customerName,
Items = items,
TotalAmount = totalAmount
};
ApplyEvent(@event, true);
}
public void UpdateOrder(string customerName, List<OrderItem> items, decimal totalAmount)
{
var @event = new OrderUpdatedEvent
{
AggregateId = Id,
Version = _version + 1,
Timestamp = DateTime.UtcNow,
CustomerName = customerName,
Items = items,
TotalAmount = totalAmount
};
ApplyEvent(@event, true);
}
// 应用事件
private void ApplyEvent(IEvent @event, bool isNew)
{
switch (@event)
{
case OrderCreatedEvent createdEvent:
Id = createdEvent.AggregateId;
CustomerName = createdEvent.CustomerName;
Items = createdEvent.Items;
TotalAmount = createdEvent.TotalAmount;
Status = OrderStatus.Pending;
break;
case OrderUpdatedEvent updatedEvent:
CustomerName = updatedEvent.CustomerName;
Items = updatedEvent.Items;
TotalAmount = updatedEvent.TotalAmount;
break;
case OrderCancelledEvent cancelledEvent:
Status = OrderStatus.Cancelled;
break;
}
_version = @event.Version;
if (isNew)
{
_uncommittedEvents.Add(@event);
}
}
// 获取未提交的事件
public IEnumerable<IEvent> GetUncommittedEvents()
{
return _uncommittedEvents;
}
// 清除未提交的事件
public void ClearUncommittedEvents()
{
_uncommittedEvents.Clear();
}
}
4.3 事件溯源仓储
// Infrastructure/Persistence/EventSourcingRepository.cs
public class EventSourcingRepository : IOrderRepository
{
private readonly IEventStore _eventStore;
public EventSourcingRepository(IEventStore eventStore)
{
_eventStore = eventStore;
}
public async Task AddAsync(Order order)
{
// 这里实际上应该处理领域事件,而不是直接保存实体
// 为了简化示例,我们假设order已经包含了事件
var events = order.GetUncommittedEvents();
foreach (var @event in events)
{
await _eventStore.SaveAsync(@event);
}
}
public async Task<Order> GetByIdAsync(Guid id)
{
var events = await _eventStore.GetEventsAsync(id);
if (events == null || !events.Any())
return null;
// 重放事件重建聚合状态
var orderAggregate = new OrderAggregate(events);
// 将聚合转换为领域实体(实际应用中可能需要映射)
return new Order(
orderAggregate.Id,
orderAggregate.CustomerName,
orderAggregate.Items,
orderAggregate.TotalAmount);
}
public async Task UpdateAsync(Order order)
{
var events = order.GetUncommittedEvents();
foreach (var @event in events)
{
await _eventStore.SaveAsync(@event);
}
}
public async Task DeleteAsync(Guid id)
{
// 在事件溯源中,删除通常通过发布一个"Deleted"事件来实现
var deleteEvent = new OrderDeletedEvent
{
AggregateId = id,
Version = (await GetNextVersionAsync(id)),
Timestamp = DateTime.UtcNow
};
await _eventStore.SaveAsync(deleteEvent);
}
private async Task<int> GetNextVersionAsync(Guid aggregateId)
{
var events = await _eventStore.GetEventsAsync(aggregateId);
return events?.LastOrDefault()?.Version + 1 ?? 1;
}
}
5. CQRS的高级实现
5.1 使用消息队列实现异步处理
// Infrastructure/Messaging/RabbitMQ/RabbitMQEventBus.cs
public class RabbitMQEventBus : IEventBus
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly IServiceProvider _serviceProvider;
public RabbitMQEventBus(
IConnection connection,
IModel channel,
IServiceProvider serviceProvider)
{
_connection = connection;
_channel = channel;
_serviceProvider = serviceProvider;
}
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
var eventName = @event.GetType().Name;
var message = JsonSerializer.Serialize(@event);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString();
_channel.BasicPublish(
exchange: "events",
routingKey: eventName,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(message));
await Task.CompletedTask;
}
public void Subscribe<TEvent, TEventHandler>()
where TEvent : IEvent
where TEventHandler : IEventHandler<TEvent>
{
var eventName = typeof(TEvent).Name;
_channel.QueueDeclare(
queue: eventName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
_channel.QueueBind(
queue: eventName,
exchange: "events",
routingKey: eventName);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var @event = JsonSerializer.Deserialize<TEvent>(message);
using (var scope = _serviceProvider.CreateScope())
{
var handler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
await handler.HandleAsync(@event);
}
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume(
queue: eventName,
autoAck: false,
consumer: consumer);
}
}
5.2 使用CQRS与DDD(领域驱动设计)结合
// Domain/Services/OrderDomainService.cs
public class OrderDomainService
{
private readonly IOrderRepository _orderRepository;
private readonly IProductRepository _productRepository;
public OrderDomainService(
IOrderRepository orderRepository,
IProductRepository productRepository)
{
_orderRepository = orderRepository;
_productRepository = productRepository;
}
public async Task<Order> CreateOrderWithValidation(
Guid customerId,
List<OrderItemRequest> items)
{
// 1. 验证产品库存
foreach (var item in items)
{
var product = await _productRepository.GetByIdAsync(item.ProductId);
if (product == null)
throw new KeyNotFoundException($"Product {item.ProductId} not found");
if (product.Stock < item.Quantity)
throw new InvalidOperationException($"Insufficient stock for product {item.ProductId}");
}
// 2. 创建订单
var orderItems = items.Select(i => new OrderItem
{
ProductId = i.ProductId,
ProductName = i.ProductName,
Quantity = i.Quantity,
UnitPrice = i.UnitPrice
}).ToList();
var totalAmount = orderItems.Sum(i => i.Subtotal);
var order = new Order(
Guid.NewGuid(),
customerId.ToString(),
orderItems,
totalAmount);
// 3. 扣减库存
foreach (var item in items)
{
await _productRepository.DecreaseStockAsync(item.ProductId, item.Quantity);
}
return order;
}
}
5.3 使用CQRS与微服务架构
// OrderService/Controllers/OrderController.cs
[ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
private readonly IMediator _mediator;
public OrderController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
var command = new CreateOrderCommand
{
OrderId = Guid.NewGuid(),
CustomerName = request.CustomerName,
Items = request.Items,
TotalAmount = request.TotalAmount
};
await _mediator.Send(command);
return CreatedAtAction(nameof(GetOrder), new { id = command.OrderId }, null);
}
[HttpGet("{id}")]
public async Task<IActionResult> GetOrder(Guid id)
{
var query = new GetOrderByIdQuery { OrderId = id };
var order = await _mediator.Send(query);
return Ok(order);
}
[HttpGet("customer/{customerName}")]
public async Task<IActionResult> GetOrdersByCustomer(string customerName, [FromQuery] int page = 1, [FromQuery] int pageSize = 10)
{
var query = new GetOrdersByCustomerQuery
{
CustomerName = customerName,
Page = page,
PageSize = pageSize
};
var orders = await _mediator.Send(query);
return Ok(orders);
}
}
6. CQRS的性能优化
6.1 读模型缓存策略
// Infrastructure/Caching/OrderReadModelCache.cs
public class OrderReadModelCache : IOrderReadRepository
{
private readonly IOrderReadRepository _decoratedRepository;
private readonly IMemoryCache _cache;
private readonly TimeSpan _cacheDuration = TimeSpan.FromMinutes(5);
public OrderReadModelCache(
IOrderReadRepository decoratedRepository,
IMemoryCache cache)
{
_decoratedRepository = decoratedRepository;
_cache = cache;
}
public async Task<OrderReadModel> GetByIdAsync(Guid id)
{
var cacheKey = $"order:{id}";
if (_cache.TryGetValue(cacheKey, out OrderReadModel cachedOrder))
{
return cachedOrder;
}
var order = await _decoratedRepository.GetByIdAsync(id);
if (order != null)
{
_cache.Set(cacheKey, order, _cacheDuration);
}
return order;
}
public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
{
var cacheKey = $"orders:customer:{customerName}:page:{page}:size:{pageSize}";
if (_cache.TryGetValue(cacheKey, out List<OrderReadModel> cachedOrders))
{
return cachedOrders;
}
var orders = await _decoratedRepository.GetByCustomerAsync(customerName, page, pageSize);
if (orders != null)
{
_cache.Set(cacheKey, orders, _cacheDuration);
}
return orders;
}
public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
{
var cacheKey = $"statistics:{fromDate:yyyyMMdd}:{toDate:yyyyMMdd}";
if (_cache.TryGetValue(cacheKey, out OrderStatisticsDto cachedStatistics))
{
return cachedStatistics;
}
var statistics = await _decoratedRepository.GetStatisticsAsync(fromDate, toDate);
if (statistics != null)
{
_cache.Set(cacheKey, statistics, TimeSpan.FromMinutes(1));
}
return statistics;
}
}
6.2 读模型预计算
// Infrastructure/Persistence/PrecomputedOrderReadRepository.cs
public class PrecomputedOrderReadRepository : IOrderReadRepository
{
private readonly ReadDbContext _readDbContext;
private readonly IEventBus _eventBus;
public PrecomputedOrderReadRepository(
ReadDbContext readDbContext,
IEventBus eventBus)
{
_readDbContext = readDbContext;
_eventBus = eventBus;
}
public async Task<OrderReadModel> GetByIdAsync(Guid id)
{
// 直接查询预计算的读模型
return await _readDbContext.OrderReadModels
.AsNoTracking()
.FirstOrDefaultAsync(o => o.Id == id);
}
public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
{
// 使用预计算的索引
return await _readDbContext.OrderReadModels
.AsNoTracking()
.Where(o => o.CustomerName == customerName)
.OrderByDescending(o => o.CreatedAt)
.Skip((page - 1) * pageSize)
.Take(pageSize)
.ToListAsync();
}
public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
{
// 使用预计算的统计表
var statistics = await _readDbContext.OrderStatistics
.AsNoTracking()
.Where(s => s.Date >= fromDate.Date && s.Date <= toDate.Date)
.GroupBy(s => 1)
.Select(g => new OrderStatisticsDto
{
TotalOrders = g.Sum(s => s.TotalOrders),
TotalRevenue = g.Sum(s => s.TotalRevenue),
AverageOrderValue = g.Average(s => s.AverageOrderValue)
})
.FirstOrDefaultAsync();
return statistics ?? new OrderStatisticsDto();
}
}
7. CQRS的测试策略
7.1 命令处理器测试
// Tests/Application/Handlers/CreateOrderCommandHandlerTests.cs
public class CreateOrderCommandHandlerTests
{
[Fact]
public async Task HandleAsync_WithValidCommand_ShouldCreateOrder()
{
// Arrange
var mockRepository = new Mock<IOrderRepository>();
var mockEventBus = new Mock<IEventBus>();
var handler = new CreateOrderCommandHandler(
mockRepository.Object,
mockEventBus.Object);
var command = new CreateOrderCommand
{
OrderId = Guid.NewGuid(),
CustomerName = "John Doe",
Items = new List<OrderItem>
{
new OrderItem
{
ProductId = Guid.NewGuid(),
ProductName = "Product 1",
Quantity = 2,
UnitPrice = 10.00m
}
},
TotalAmount = 20.00m
};
// Act
await handler.HandleAsync(command);
// Assert
mockRepository.Verify(
r => r.AddAsync(It.Is<Order>(o =>
o.Id == command.OrderId &&
o.CustomerName == command.CustomerName &&
o.TotalAmount == command.TotalAmount)),
Times.Once);
mockEventBus.Verify(
e => e.PublishAsync(It.IsAny<OrderCreatedEvent>()),
Times.Once);
}
[Fact]
public async Task HandleAsync_WithInvalidCustomerName_ShouldThrowException()
{
// Arrange
var mockRepository = new Mock<IOrderRepository>();
var mockEventBus = new Mock<IEventBus>();
var handler = new CreateOrderCommandHandler(
mockRepository.Object,
mockEventBus.Object);
var command = new CreateOrderCommand
{
OrderId = Guid.NewGuid(),
CustomerName = "", // 无效的客户名
Items = new List<OrderItem>
{
new OrderItem
{
ProductId = Guid.NewGuid(),
ProductName = "Product 1",
Quantity = 2,
UnitPrice = 10.00m
}
},
TotalAmount = 20.00m
};
// Act & Assert
await Assert.ThrowsAsync<ArgumentException>(() => handler.HandleAsync(command));
}
}
7.2 查询处理器测试
// Tests/Application/Handlers/GetOrderByIdQueryHandlerTests.cs
public class GetOrderByIdQueryHandlerTests
{
[Fact]
public async Task HandleAsync_WithValidId_ShouldReturnOrder()
{
// Arrange
var mockRepository = new Mock<IOrderReadRepository>();
var expectedOrder = new OrderReadModel
{
Id = Guid.NewGuid(),
CustomerName = "John Doe",
Items = new List<OrderItemDto>
{
new OrderItemDto
{
ProductId = Guid.NewGuid(),
ProductName = "Product 1",
Quantity = 2,
UnitPrice = 10.00m,
Subtotal = 20.00m
}
},
TotalAmount = 20.00m,
CreatedAt = DateTime.UtcNow
};
mockRepository
.Setup(r => r.GetByIdAsync(expectedOrder.Id))
.ReturnsAsync(expectedOrder);
var handler = new GetOrderByIdQueryHandler(mockRepository.Object);
var query = new GetOrderByIdQuery { OrderId = expectedOrder.Id };
// Act
var result = await handler.HandleAsync(query);
// Assert
Assert.NotNull(result);
Assert.Equal(expectedOrder.Id, result.Id);
Assert.Equal(expectedOrder.CustomerName, result.CustomerName);
Assert.Equal(expectedOrder.TotalAmount, result.TotalAmount);
}
[Fact]
public async Task HandleAsync_WithInvalidId_ShouldThrowException()
{
// Arrange
var mockRepository = new Mock<IOrderReadRepository>();
mockRepository
.Setup(r => r.GetByIdAsync(It.IsAny<Guid>()))
.ReturnsAsync((OrderReadModel)null);
var handler = new GetOrderByIdQueryHandler(mockRepository.Object);
var query = new GetOrderByIdQuery { OrderId = Guid.NewGuid() };
// Act & Assert
await Assert.ThrowsAsync<KeyNotFoundException>(() => handler.HandleAsync(query));
}
}
8. CQRS的部署与运维
8.1 Docker部署配置
# Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["CQRS-Example/CQRS-Example.csproj", "CQRS-Example/"]
RUN dotnet restore "CQRS-Example/CQRS-Example.csproj"
COPY . .
WORKDIR "/src/CQRS-Example"
RUN dotnet build "CQRS-Example.csproj" -c Release -o /app/build
FROM build AS publish
RUN dotnet publish "CQRS-Example.csproj" -c Release -o /app/publish
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "CQRS-Example.dll"]
8.2 Docker Compose配置
# docker-compose.yml
version: '3.8'
services:
cqrs-api:
build: .
ports:
- "5000:80"
environment:
- ASPNETCORE_ENVIRONMENT=Production
- ConnectionStrings__WriteDb=Server=write-db;Database=CQRS_Write;User=sa;Password=YourPassword123!;
- ConnectionStrings__ReadDb=Server=read-db;Database=CQRS_Read;User=sa;Password=YourPassword123!;
- RabbitMQ__Host=rabbitmq
- RabbitMQ__Port=5672
depends_on:
- write-db
- read-db
- rabbitmq
networks:
- cqrs-network
write-db:
image: mcr.microsoft.com/mssql/server:2022-latest
environment:
- ACCEPT_EULA=Y
- SA_PASSWORD=YourPassword123!
ports:
- "1433:1433"
volumes:
- write-db-data:/var/opt/mssql
networks:
- cqrs-network
read-db:
image: postgres:15
environment:
- POSTGRES_PASSWORD=YourPassword123!
- POSTGRES_DB=CQRS_Read
ports:
- "5432:5432"
volumes:
- read-db-data:/var/lib/postgresql/data
networks:
- cqrs-network
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=YourPassword123!
volumes:
- rabbitmq-data:/var/lib/rabbitmq
networks:
- cqrs-network
volumes:
write-db-data:
read-db-data:
rabbitmq-data:
networks:
cqrs-network:
driver: bridge
8.3 监控与日志
// Infrastructure/Monitoring/OrderMonitoringService.cs
public class OrderMonitoringService
{
private readonly ILogger<OrderMonitoringService> _logger;
private readonly IMetrics _metrics;
public OrderMonitoringService(
ILogger<OrderMonitoringService> logger,
IMetrics metrics)
{
_logger = logger;
_metrics = metrics;
}
public void TrackCommandExecution(string commandName, long durationMs, bool success)
{
_logger.LogInformation(
"Command {CommandName} executed in {Duration}ms with result {Success}",
commandName, durationMs, success);
_metrics.IncrementCounter($"cqrs.command.{commandName}.total");
_metrics.RecordHistogram($"cqrs.command.{commandName}.duration", durationMs);
if (!success)
{
_metrics.IncrementCounter($"cqrs.command.{commandName}.errors");
}
}
public void TrackQueryExecution(string queryName, long durationMs, bool success)
{
_logger.LogInformation(
"Query {QueryName} executed in {Duration}ms with result {Success}",
queryName, durationMs, success);
_metrics.IncrementCounter($"cqrs.query.{queryName}.total");
_metrics.RecordHistogram($"cqrs.query.{queryName}.duration", durationMs);
if (!success)
{
_metrics.IncrementCounter($"cqrs.query.{queryName}.errors");
}
}
public void TrackEventProcessing(string eventName, long durationMs, bool success)
{
_logger.LogInformation(
"Event {EventName} processed in {Duration}ms with result {Success}",
eventName, durationMs, success);
_metrics.IncrementCounter($"cqrs.event.{eventName}.total");
_metrics.RecordHistogram($"cqrs.event.{eventName}.duration", durationMs);
if (!success)
{
_metrics.IncrementCounter($"cqrs.event.{eventName}.errors");
}
}
}
9. CQRS的常见问题与解决方案
9.1 数据一致性问题
问题:CQRS通常引入最终一致性,可能导致读写模型数据不一致。
解决方案:
- 版本控制:在读模型中包含版本号,客户端可以检查版本
- 补偿机制:当检测到不一致时,触发补偿操作
- 事件重放:定期重放事件以修复不一致的读模型
// Infrastructure/Persistence/ConsistencyChecker.cs
public class ConsistencyChecker
{
private readonly IEventStore _eventStore;
private readonly IReadModelUpdater _readModelUpdater;
public ConsistencyChecker(
IEventStore eventStore,
IReadModelUpdater readModelUpdater)
{
_eventStore = eventStore;
_readModelUpdater = readModelUpdater;
}
public async Task CheckAndFixInconsistencies()
{
var allEvents = await _eventStore.GetAllEventsAsync();
// 按聚合ID分组
var eventsByAggregate = allEvents
.GroupBy(e => e.AggregateId)
.ToDictionary(g => g.Key, g => g.OrderBy(e => e.Version).ToList());
foreach (var aggregateId in eventsByAggregate.Keys)
{
var events = eventsByAggregate[aggregateId];
// 重建读模型
var reconstructedModel = await ReconstructReadModelAsync(events);
// 获取当前读模型
var currentModel = await _readModelUpdater.GetCurrentModelAsync(aggregateId);
// 比较并修复差异
if (!AreModelsEqual(reconstructedModel, currentModel))
{
await _readModelUpdater.UpdateReadModelAsync(aggregateId, reconstructedModel);
}
}
}
private async Task<object> ReconstructReadModelAsync(List<IEvent> events)
{
// 重放事件重建读模型
// 实现细节取决于具体业务逻辑
return new object(); // 简化示例
}
private bool AreModelsEqual(object model1, object model2)
{
// 实现模型比较逻辑
return model1?.Equals(model2) ?? false;
}
}
9.2 复杂性管理
问题:CQRS增加了架构复杂性,可能难以维护。
解决方案:
- 分层架构:清晰分离应用层、领域层、基础设施层
- 框架支持:使用成熟的CQRS框架(如Axon Framework、EventFlow)
- 代码生成:使用代码生成工具减少样板代码
9.3 学习曲线
问题:团队需要时间学习CQRS和事件溯源概念。
解决方案:
- 渐进式采用:从简单场景开始,逐步扩展
- 培训与文档:提供详细的内部文档和培训
- 代码审查:通过代码审查确保正确实现
10. CQRS的最佳实践
10.1 设计原则
- 单一职责:每个组件只负责一个明确的职责
- 开闭原则:对扩展开放,对修改关闭
- 依赖倒置:依赖抽象而非具体实现
- 领域驱动设计:使用DDD概念建模业务逻辑
10.2 代码组织
- 清晰的命名:使用描述性的名称,如
CreateOrderCommand、GetOrderByIdQuery - 一致的模式:在整个项目中保持一致的代码模式
- 适当的抽象:避免过度抽象,保持代码可读性
10.3 性能优化
- 读写分离:使用不同的数据库实例
- 缓存策略:合理使用缓存,避免缓存穿透
- 异步处理:使用消息队列处理耗时操作
- 索引优化:为读模型查询创建适当的索引
10.4 错误处理
- 事务管理:确保命令处理的原子性
- 重试机制:对临时性故障实现重试逻辑
- 死信队列:处理无法处理的消息
- 监控告警:及时发现和处理异常
11. 总结
CQRS是一种强大的架构模式,特别适合复杂业务场景。通过分离读写职责,CQRS能够提供更好的性能、可扩展性和维护性。然而,它也引入了额外的复杂性,需要仔细权衡。
11.1 关键要点
- 理解核心概念:命令、查询、处理器、读写模型
- 选择合适的场景:CQRS不是银弹,只适用于复杂场景
- 渐进式采用:从简单实现开始,逐步演进
- 关注一致性:处理好读写模型的一致性问题
- 持续优化:根据实际需求调整架构
11.2 进一步学习资源
书籍:
- 《领域驱动设计:软件核心复杂性应对之道》
- 《实现领域驱动设计》
- 《CQRS与事件溯源》
框架:
- Axon Framework(Java)
- EventFlow(.NET)
- Lagom(Scala)
在线资源:
- Martin Fowler的CQRS文章
- Microsoft的CQRS示例项目
- GitHub上的开源CQRS项目
通过本文的完整指南,您应该能够从理论到实践全面理解CQRS,并在实际项目中成功应用。记住,架构模式的选择应该基于具体业务需求,而不是盲目追求新技术。
