引言

CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,它将应用程序的读取(查询)和写入(命令)操作分离到不同的模型和数据存储中。这种模式特别适用于复杂业务场景,能够显著提升系统的可扩展性、性能和维护性。本文将从理论基础出发,逐步深入到实际代码实现,提供一个完整的CQRS实践指南。

1. CQRS理论基础

1.1 什么是CQRS?

CQRS的核心思想是将应用程序的读取(Query)和写入(Command)操作分离。在传统的CRUD(Create, Read, Update, Delete)架构中,同一个数据模型通常同时处理读写操作,这可能导致以下问题:

  • 性能瓶颈:读写操作共享同一数据模型,难以针对不同操作进行优化
  • 复杂性增加:业务逻辑在读写操作中混杂,难以维护
  • 扩展性受限:无法独立扩展读写操作

CQRS通过分离读写职责来解决这些问题:

graph LR
    A[客户端] --> B[Command Handler]
    A --> C[Query Handler]
    B --> D[Write Model]
    C --> E[Read Model]
    D --> F[Event Store]
    E --> F

1.2 CQRS的核心组件

  1. Command(命令):表示一个意图改变系统状态的操作,通常是动词短语(如”CreateOrder”、”UpdateUser”)
  2. Query(查询):表示一个只读操作,不改变系统状态(如”GetOrderById”、”GetUserList”)
  3. Command Handler(命令处理器):处理命令,执行业务逻辑,更新写模型
  4. Query Handler(查询处理器):处理查询,从读模型中获取数据
  5. Write Model(写模型):用于写入操作的数据模型,通常包含完整的业务逻辑和验证
  6. Read Model(读模型):用于读取操作的数据模型,通常经过优化以满足特定查询需求

1.3 CQRS与事件溯源(Event Sourcing)的关系

CQRS经常与事件溯源结合使用,形成更强大的架构模式:

  • 事件溯源:将系统状态的变化记录为一系列不可变的事件,而不是直接修改当前状态
  • CQRS + 事件溯源:写模型通过事件溯源记录所有状态变化,读模型通过重放事件或订阅事件流来构建

2. CQRS的适用场景

2.1 适合使用CQRS的场景

  1. 复杂业务逻辑:业务规则复杂,读写操作差异大
  2. 高并发读取:系统需要处理大量读取请求,需要优化读取性能
  3. 多数据源:需要从多个数据源聚合数据
  4. 审计需求:需要完整的操作历史记录
  5. 微服务架构:不同服务可能需要不同的数据模型

2.2 不适合使用CQRS的场景

  1. 简单CRUD应用:业务逻辑简单,读写操作差异不大
  2. 小型项目:增加架构复杂度可能得不偿失
  3. 实时一致性要求极高:CQRS通常引入最终一致性

3. CQRS代码实践:基础实现

3.1 项目结构设计

一个典型的CQRS项目结构如下:

CQRS-Example/
├── src/
│   ├── Application/
│   │   ├── Commands/          # 命令定义
│   │   ├── Queries/           # 查询定义
│   │   ├── Handlers/          # 处理器
│   │   └── Models/            # 应用层模型
│   ├── Domain/
│   │   ├── Entities/          # 领域实体
│   │   ├── Events/            # 领域事件
│   │   ├── Repositories/      # 仓储接口
│   │   └── Services/          # 领域服务
│   ├── Infrastructure/
│   │   ├── Persistence/       # 持久化实现
│   │   ├── Messaging/         # 消息处理
│   │   └── Caching/           # 缓存实现
│   └── Web/
│       ├── Controllers/       # API控制器
│       └── Models/            # Web模型
├── tests/
└── README.md

3.2 命令和查询的定义

3.2.1 命令定义

// Application/Commands/CreateOrderCommand.cs
public class CreateOrderCommand : ICommand
{
    public Guid OrderId { get; set; }
    public string CustomerName { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
}

// Application/Commands/UpdateOrderCommand.cs
public class UpdateOrderCommand : ICommand
{
    public Guid OrderId { get; set; }
    public string CustomerName { get; set; }
    public List<OrderItem> Items { get; set; }
    public decimal TotalAmount { get; set; }
}

// Application/Commands/DeleteOrderCommand.cs
public class DeleteOrderCommand : ICommand
{
    public Guid OrderId { get; set; }
}

3.2.2 查询定义

// Application/Queries/GetOrderByIdQuery.cs
public class GetOrderByIdQuery : IQuery<OrderDto>
{
    public Guid OrderId { get; set; }
}

// Application/Queries/GetOrdersByCustomerQuery.cs
public class GetOrdersByCustomerQuery : IQuery<List<OrderDto>>
{
    public string CustomerName { get; set; }
    public int Page { get; set; } = 1;
    public int PageSize { get; set; } = 10;
}

// Application/Queries/GetOrderStatisticsQuery.cs
public class GetOrderStatisticsQuery : IQuery<OrderStatisticsDto>
{
    public DateTime FromDate { get; set; }
    public DateTime ToDate { get; set; }
}

3.3 处理器实现

3.3.1 命令处理器

// Application/Handlers/Commands/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
{
    private readonly IOrderRepository _orderRepository;
    private readonly IEventBus _eventBus;

    public CreateOrderCommandHandler(
        IOrderRepository orderRepository,
        IEventBus eventBus)
    {
        _orderRepository = orderRepository;
        _eventBus = eventBus;
    }

    public async Task HandleAsync(CreateOrderCommand command)
    {
        // 1. 验证命令
        if (string.IsNullOrWhiteSpace(command.CustomerName))
            throw new ArgumentException("Customer name is required");
        
        if (command.Items == null || command.Items.Count == 0)
            throw new ArgumentException("Order must have at least one item");

        // 2. 创建领域实体
        var order = new Order(
            command.OrderId,
            command.CustomerName,
            command.Items,
            command.TotalAmount);

        // 3. 执行业务规则验证
        if (!order.IsValid())
            throw new InvalidOperationException("Order validation failed");

        // 4. 保存到写模型
        await _orderRepository.AddAsync(order);

        // 5. 发布领域事件
        var orderCreatedEvent = new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerName = order.CustomerName,
            TotalAmount = order.TotalAmount,
            CreatedAt = DateTime.UtcNow
        };
        
        await _eventBus.PublishAsync(orderCreatedEvent);

        // 6. 返回结果(可选)
        return;
    }
}

3.3.2 查询处理器

// Application/Handlers/Queries/GetOrderByIdQueryHandler.cs
public class GetOrderByIdQueryHandler : IQueryHandler<GetOrderByIdQuery, OrderDto>
{
    private readonly IOrderReadRepository _orderReadRepository;

    public GetOrderByIdQueryHandler(IOrderReadRepository orderReadRepository)
    {
        _orderReadRepository = orderReadRepository;
    }

    public async Task<OrderDto> HandleAsync(GetOrderByIdQuery query)
    {
        // 1. 从读模型获取数据
        var order = await _orderReadRepository.GetByIdAsync(query.OrderId);
        
        if (order == null)
            throw new KeyNotFoundException($"Order with ID {query.OrderId} not found");

        // 2. 转换为DTO
        var orderDto = new OrderDto
        {
            Id = order.Id,
            CustomerName = order.CustomerName,
            Items = order.Items,
            TotalAmount = order.TotalAmount,
            CreatedAt = order.CreatedAt
        };

        return orderDto;
    }
}

3.4 仓储接口与实现

3.4.1 写模型仓储

// Domain/Repositories/IOrderRepository.cs
public interface IOrderRepository
{
    Task AddAsync(Order order);
    Task UpdateAsync(Order order);
    Task DeleteAsync(Guid id);
    Task<Order> GetByIdAsync(Guid id);
}

// Infrastructure/Persistence/OrderRepository.cs
public class OrderRepository : IOrderRepository
{
    private readonly WriteDbContext _writeDbContext;

    public OrderRepository(WriteDbContext writeDbContext)
    {
        _writeDbContext = writeDbContext;
    }

    public async Task AddAsync(Order order)
    {
        _writeDbContext.Orders.Add(order);
        await _writeDbContext.SaveChangesAsync();
    }

    public async Task UpdateAsync(Order order)
    {
        _writeDbContext.Orders.Update(order);
        await _writeDbContext.SaveChangesAsync();
    }

    public async Task DeleteAsync(Guid id)
    {
        var order = await _writeDbContext.Orders.FindAsync(id);
        if (order != null)
        {
            _writeDbContext.Orders.Remove(order);
            await _writeDbContext.SaveChangesAsync();
        }
    }

    public async Task<Order> GetByIdAsync(Guid id)
    {
        return await _writeDbContext.Orders.FindAsync(id);
    }
}

3.4.2 读模型仓储

// Domain/Repositories/IOrderReadRepository.cs
public interface IOrderReadRepository
{
    Task<OrderReadModel> GetByIdAsync(Guid id);
    Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize);
    Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate);
}

// Infrastructure/Persistence/OrderReadRepository.cs
public class OrderReadRepository : IOrderReadRepository
{
    private readonly ReadDbContext _readDbContext;

    public OrderReadRepository(ReadDbContext readDbContext)
    {
        _readDbContext = readDbContext;
    }

    public async Task<OrderReadModel> GetByIdAsync(Guid id)
    {
        return await _readDbContext.OrderReadModels
            .AsNoTracking()
            .FirstOrDefaultAsync(o => o.Id == id);
    }

    public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
    {
        return await _readDbContext.OrderReadModels
            .AsNoTracking()
            .Where(o => o.CustomerName == customerName)
            .OrderByDescending(o => o.CreatedAt)
            .Skip((page - 1) * pageSize)
            .Take(pageSize)
            .ToListAsync();
    }

    public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
    {
        var statistics = await _readDbContext.OrderReadModels
            .AsNoTracking()
            .Where(o => o.CreatedAt >= fromDate && o.CreatedAt <= toDate)
            .GroupBy(o => 1)
            .Select(g => new OrderStatisticsDto
            {
                TotalOrders = g.Count(),
                TotalRevenue = g.Sum(o => o.TotalAmount),
                AverageOrderValue = g.Average(o => o.TotalAmount)
            })
            .FirstOrDefaultAsync();

        return statistics ?? new OrderStatisticsDto();
    }
}

3.5 读写模型定义

3.5.1 写模型(领域实体)

// Domain/Entities/Order.cs
public class Order
{
    public Guid Id { get; private set; }
    public string CustomerName { get; private set; }
    public List<OrderItem> Items { get; private set; }
    public decimal TotalAmount { get; private set; }
    public DateTime CreatedAt { get; private set; }
    public DateTime? UpdatedAt { get; private set; }
    public OrderStatus Status { get; private set; }

    // 构造函数
    public Order(Guid id, string customerName, List<OrderItem> items, decimal totalAmount)
    {
        Id = id;
        CustomerName = customerName;
        Items = items;
        TotalAmount = totalAmount;
        CreatedAt = DateTime.UtcNow;
        Status = OrderStatus.Pending;
    }

    // 业务方法
    public void Update(string customerName, List<OrderItem> items, decimal totalAmount)
    {
        CustomerName = customerName;
        Items = items;
        TotalAmount = totalAmount;
        UpdatedAt = DateTime.UtcNow;
    }

    public void Confirm()
    {
        if (Status != OrderStatus.Pending)
            throw new InvalidOperationException("Only pending orders can be confirmed");
        
        Status = OrderStatus.Confirmed;
        UpdatedAt = DateTime.UtcNow;
    }

    public void Cancel()
    {
        if (Status == OrderStatus.Completed)
            throw new InvalidOperationException("Completed orders cannot be cancelled");
        
        Status = OrderStatus.Cancelled;
        UpdatedAt = DateTime.UtcNow;
    }

    // 验证方法
    public bool IsValid()
    {
        if (string.IsNullOrWhiteSpace(CustomerName))
            return false;
        
        if (Items == null || Items.Count == 0)
            return false;
        
        if (TotalAmount <= 0)
            return false;
        
        return true;
    }
}

// Domain/Entities/OrderItem.cs
public class OrderItem
{
    public Guid ProductId { get; set; }
    public string ProductName { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
    public decimal Subtotal => Quantity * UnitPrice;
}

// Domain/Entities/OrderStatus.cs
public enum OrderStatus
{
    Pending,
    Confirmed,
    Processing,
    Shipped,
    Completed,
    Cancelled
}

3.5.2 读模型(DTO/ViewModel)

// Application/Models/OrderDto.cs
public class OrderDto
{
    public Guid Id { get; set; }
    public string CustomerName { get; set; }
    public List<OrderItemDto> Items { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
    public DateTime? UpdatedAt { get; set; }
    public string Status { get; set; }
}

// Application/Models/OrderItemDto.cs
public class OrderItemDto
{
    public Guid ProductId { get; set; }
    public string ProductName { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
    public decimal Subtotal { get; set; }
}

// Application/Models/OrderStatisticsDto.cs
public class OrderStatisticsDto
{
    public int TotalOrders { get; set; }
    public decimal TotalRevenue { get; set; }
    public decimal AverageOrderValue { get; set; }
}

3.6 事件处理

3.6.1 领域事件定义

// Domain/Events/OrderCreatedEvent.cs
public class OrderCreatedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public string CustomerName { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Domain/Events/OrderUpdatedEvent.cs
public class OrderUpdatedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public string CustomerName { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime UpdatedAt { get; set; }
}

// Domain/Events/OrderCancelledEvent.cs
public class OrderCancelledEvent : IEvent
{
    public Guid OrderId { get; set; }
    public DateTime CancelledAt { get; set; }
}

3.6.2 事件处理器

// Infrastructure/Messaging/EventHandlers/OrderCreatedEventHandler.cs
public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
{
    private readonly IReadModelUpdater _readModelUpdater;
    private readonly INotificationService _notificationService;

    public OrderCreatedEventHandler(
        IReadModelUpdater readModelUpdater,
        INotificationService notificationService)
    {
        _readModelUpdater = readModelUpdater;
        _notificationService = notificationService;
    }

    public async Task HandleAsync(OrderCreatedEvent @event)
    {
        // 1. 更新读模型
        await _readModelUpdater.UpdateReadModelAsync(@event);

        // 2. 发送通知
        await _notificationService.SendOrderCreatedNotificationAsync(
            @event.OrderId, 
            @event.CustomerName);

        // 3. 记录日志
        Console.WriteLine($"Order created: {@event.OrderId}");
    }
}

// Infrastructure/Messaging/EventBus.cs
public class EventBus : IEventBus
{
    private readonly IServiceProvider _serviceProvider;

    public EventBus(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var handlers = _serviceProvider.GetServices<IEventHandler<TEvent>>();
        
        foreach (var handler in handlers)
        {
            await handler.HandleAsync(@event);
        }
    }
}

4. CQRS与事件溯源结合

4.1 事件溯源基础

事件溯源将系统状态的变化记录为一系列不可变的事件。每个事件代表一个状态变化,系统状态通过重放所有事件来重建。

// Domain/Events/IEvent.cs
public interface IEvent
{
    Guid AggregateId { get; }
    DateTime Timestamp { get; }
    int Version { get; }
}

// Domain/Events/EventStore.cs
public class EventStore : IEventStore
{
    private readonly List<IEvent> _events = new List<IEvent>();

    public async Task SaveAsync(IEvent @event)
    {
        _events.Add(@event);
        // 实际应用中,这里会持久化到数据库
    }

    public async Task<List<IEvent>> GetEventsAsync(Guid aggregateId)
    {
        return _events
            .Where(e => e.AggregateId == aggregateId)
            .OrderBy(e => e.Version)
            .ToList();
    }

    public async Task<List<IEvent>> GetAllEventsAsync()
    {
        return _events
            .OrderBy(e => e.Timestamp)
            .ToList();
    }
}

4.2 聚合根与事件重放

// Domain/Aggregates/OrderAggregate.cs
public class OrderAggregate
{
    private readonly List<IEvent> _uncommittedEvents = new List<IEvent>();
    private int _version = 0;

    public Guid Id { get; private set; }
    public string CustomerName { get; private set; }
    public List<OrderItem> Items { get; private set; }
    public decimal TotalAmount { get; private set; }
    public OrderStatus Status { get; private set; }

    // 构造函数(从事件重建状态)
    public OrderAggregate(IEnumerable<IEvent> events)
    {
        foreach (var @event in events)
        {
            ApplyEvent(@event, false);
        }
    }

    // 业务操作
    public void CreateOrder(Guid id, string customerName, List<OrderItem> items, decimal totalAmount)
    {
        var @event = new OrderCreatedEvent
        {
            AggregateId = id,
            Version = _version + 1,
            Timestamp = DateTime.UtcNow,
            CustomerName = customerName,
            Items = items,
            TotalAmount = totalAmount
        };

        ApplyEvent(@event, true);
    }

    public void UpdateOrder(string customerName, List<OrderItem> items, decimal totalAmount)
    {
        var @event = new OrderUpdatedEvent
        {
            AggregateId = Id,
            Version = _version + 1,
            Timestamp = DateTime.UtcNow,
            CustomerName = customerName,
            Items = items,
            TotalAmount = totalAmount
        };

        ApplyEvent(@event, true);
    }

    // 应用事件
    private void ApplyEvent(IEvent @event, bool isNew)
    {
        switch (@event)
        {
            case OrderCreatedEvent createdEvent:
                Id = createdEvent.AggregateId;
                CustomerName = createdEvent.CustomerName;
                Items = createdEvent.Items;
                TotalAmount = createdEvent.TotalAmount;
                Status = OrderStatus.Pending;
                break;

            case OrderUpdatedEvent updatedEvent:
                CustomerName = updatedEvent.CustomerName;
                Items = updatedEvent.Items;
                TotalAmount = updatedEvent.TotalAmount;
                break;

            case OrderCancelledEvent cancelledEvent:
                Status = OrderStatus.Cancelled;
                break;
        }

        _version = @event.Version;

        if (isNew)
        {
            _uncommittedEvents.Add(@event);
        }
    }

    // 获取未提交的事件
    public IEnumerable<IEvent> GetUncommittedEvents()
    {
        return _uncommittedEvents;
    }

    // 清除未提交的事件
    public void ClearUncommittedEvents()
    {
        _uncommittedEvents.Clear();
    }
}

4.3 事件溯源仓储

// Infrastructure/Persistence/EventSourcingRepository.cs
public class EventSourcingRepository : IOrderRepository
{
    private readonly IEventStore _eventStore;

    public EventSourcingRepository(IEventStore eventStore)
    {
        _eventStore = eventStore;
    }

    public async Task AddAsync(Order order)
    {
        // 这里实际上应该处理领域事件,而不是直接保存实体
        // 为了简化示例,我们假设order已经包含了事件
        var events = order.GetUncommittedEvents();
        
        foreach (var @event in events)
        {
            await _eventStore.SaveAsync(@event);
        }
    }

    public async Task<Order> GetByIdAsync(Guid id)
    {
        var events = await _eventStore.GetEventsAsync(id);
        
        if (events == null || !events.Any())
            return null;

        // 重放事件重建聚合状态
        var orderAggregate = new OrderAggregate(events);
        
        // 将聚合转换为领域实体(实际应用中可能需要映射)
        return new Order(
            orderAggregate.Id,
            orderAggregate.CustomerName,
            orderAggregate.Items,
            orderAggregate.TotalAmount);
    }

    public async Task UpdateAsync(Order order)
    {
        var events = order.GetUncommittedEvents();
        
        foreach (var @event in events)
        {
            await _eventStore.SaveAsync(@event);
        }
    }

    public async Task DeleteAsync(Guid id)
    {
        // 在事件溯源中,删除通常通过发布一个"Deleted"事件来实现
        var deleteEvent = new OrderDeletedEvent
        {
            AggregateId = id,
            Version = (await GetNextVersionAsync(id)),
            Timestamp = DateTime.UtcNow
        };

        await _eventStore.SaveAsync(deleteEvent);
    }

    private async Task<int> GetNextVersionAsync(Guid aggregateId)
    {
        var events = await _eventStore.GetEventsAsync(aggregateId);
        return events?.LastOrDefault()?.Version + 1 ?? 1;
    }
}

5. CQRS的高级实现

5.1 使用消息队列实现异步处理

// Infrastructure/Messaging/RabbitMQ/RabbitMQEventBus.cs
public class RabbitMQEventBus : IEventBus
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    private readonly IServiceProvider _serviceProvider;

    public RabbitMQEventBus(
        IConnection connection,
        IModel channel,
        IServiceProvider serviceProvider)
    {
        _connection = connection;
        _channel = channel;
        _serviceProvider = serviceProvider;
    }

    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var eventName = @event.GetType().Name;
        var message = JsonSerializer.Serialize(@event);

        var properties = _channel.CreateBasicProperties();
        properties.Persistent = true;
        properties.MessageId = Guid.NewGuid().ToString();

        _channel.BasicPublish(
            exchange: "events",
            routingKey: eventName,
            basicProperties: properties,
            body: Encoding.UTF8.GetBytes(message));

        await Task.CompletedTask;
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>
    {
        var eventName = typeof(TEvent).Name;
        
        _channel.QueueDeclare(
            queue: eventName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        _channel.QueueBind(
            queue: eventName,
            exchange: "events",
            routingKey: eventName);

        var consumer = new EventingBasicConsumer(_channel);
        
        consumer.Received += async (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            var @event = JsonSerializer.Deserialize<TEvent>(message);

            using (var scope = _serviceProvider.CreateScope())
            {
                var handler = scope.ServiceProvider.GetRequiredService<TEventHandler>();
                await handler.HandleAsync(@event);
            }

            _channel.BasicAck(ea.DeliveryTag, false);
        };

        _channel.BasicConsume(
            queue: eventName,
            autoAck: false,
            consumer: consumer);
    }
}

5.2 使用CQRS与DDD(领域驱动设计)结合

// Domain/Services/OrderDomainService.cs
public class OrderDomainService
{
    private readonly IOrderRepository _orderRepository;
    private readonly IProductRepository _productRepository;

    public OrderDomainService(
        IOrderRepository orderRepository,
        IProductRepository productRepository)
    {
        _orderRepository = orderRepository;
        _productRepository = productRepository;
    }

    public async Task<Order> CreateOrderWithValidation(
        Guid customerId,
        List<OrderItemRequest> items)
    {
        // 1. 验证产品库存
        foreach (var item in items)
        {
            var product = await _productRepository.GetByIdAsync(item.ProductId);
            if (product == null)
                throw new KeyNotFoundException($"Product {item.ProductId} not found");
            
            if (product.Stock < item.Quantity)
                throw new InvalidOperationException($"Insufficient stock for product {item.ProductId}");
        }

        // 2. 创建订单
        var orderItems = items.Select(i => new OrderItem
        {
            ProductId = i.ProductId,
            ProductName = i.ProductName,
            Quantity = i.Quantity,
            UnitPrice = i.UnitPrice
        }).ToList();

        var totalAmount = orderItems.Sum(i => i.Subtotal);

        var order = new Order(
            Guid.NewGuid(),
            customerId.ToString(),
            orderItems,
            totalAmount);

        // 3. 扣减库存
        foreach (var item in items)
        {
            await _productRepository.DecreaseStockAsync(item.ProductId, item.Quantity);
        }

        return order;
    }
}

5.3 使用CQRS与微服务架构

// OrderService/Controllers/OrderController.cs
[ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
    private readonly IMediator _mediator;

    public OrderController(IMediator mediator)
    {
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
    {
        var command = new CreateOrderCommand
        {
            OrderId = Guid.NewGuid(),
            CustomerName = request.CustomerName,
            Items = request.Items,
            TotalAmount = request.TotalAmount
        };

        await _mediator.Send(command);
        
        return CreatedAtAction(nameof(GetOrder), new { id = command.OrderId }, null);
    }

    [HttpGet("{id}")]
    public async Task<IActionResult> GetOrder(Guid id)
    {
        var query = new GetOrderByIdQuery { OrderId = id };
        var order = await _mediator.Send(query);
        
        return Ok(order);
    }

    [HttpGet("customer/{customerName}")]
    public async Task<IActionResult> GetOrdersByCustomer(string customerName, [FromQuery] int page = 1, [FromQuery] int pageSize = 10)
    {
        var query = new GetOrdersByCustomerQuery
        {
            CustomerName = customerName,
            Page = page,
            PageSize = pageSize
        };
        
        var orders = await _mediator.Send(query);
        
        return Ok(orders);
    }
}

6. CQRS的性能优化

6.1 读模型缓存策略

// Infrastructure/Caching/OrderReadModelCache.cs
public class OrderReadModelCache : IOrderReadRepository
{
    private readonly IOrderReadRepository _decoratedRepository;
    private readonly IMemoryCache _cache;
    private readonly TimeSpan _cacheDuration = TimeSpan.FromMinutes(5);

    public OrderReadModelCache(
        IOrderReadRepository decoratedRepository,
        IMemoryCache cache)
    {
        _decoratedRepository = decoratedRepository;
        _cache = cache;
    }

    public async Task<OrderReadModel> GetByIdAsync(Guid id)
    {
        var cacheKey = $"order:{id}";
        
        if (_cache.TryGetValue(cacheKey, out OrderReadModel cachedOrder))
        {
            return cachedOrder;
        }

        var order = await _decoratedRepository.GetByIdAsync(id);
        
        if (order != null)
        {
            _cache.Set(cacheKey, order, _cacheDuration);
        }

        return order;
    }

    public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
    {
        var cacheKey = $"orders:customer:{customerName}:page:{page}:size:{pageSize}";
        
        if (_cache.TryGetValue(cacheKey, out List<OrderReadModel> cachedOrders))
        {
            return cachedOrders;
        }

        var orders = await _decoratedRepository.GetByCustomerAsync(customerName, page, pageSize);
        
        if (orders != null)
        {
            _cache.Set(cacheKey, orders, _cacheDuration);
        }

        return orders;
    }

    public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
    {
        var cacheKey = $"statistics:{fromDate:yyyyMMdd}:{toDate:yyyyMMdd}";
        
        if (_cache.TryGetValue(cacheKey, out OrderStatisticsDto cachedStatistics))
        {
            return cachedStatistics;
        }

        var statistics = await _decoratedRepository.GetStatisticsAsync(fromDate, toDate);
        
        if (statistics != null)
        {
            _cache.Set(cacheKey, statistics, TimeSpan.FromMinutes(1));
        }

        return statistics;
    }
}

6.2 读模型预计算

// Infrastructure/Persistence/PrecomputedOrderReadRepository.cs
public class PrecomputedOrderReadRepository : IOrderReadRepository
{
    private readonly ReadDbContext _readDbContext;
    private readonly IEventBus _eventBus;

    public PrecomputedOrderReadRepository(
        ReadDbContext readDbContext,
        IEventBus eventBus)
    {
        _readDbContext = readDbContext;
        _eventBus = eventBus;
    }

    public async Task<OrderReadModel> GetByIdAsync(Guid id)
    {
        // 直接查询预计算的读模型
        return await _readDbContext.OrderReadModels
            .AsNoTracking()
            .FirstOrDefaultAsync(o => o.Id == id);
    }

    public async Task<List<OrderReadModel>> GetByCustomerAsync(string customerName, int page, int pageSize)
    {
        // 使用预计算的索引
        return await _readDbContext.OrderReadModels
            .AsNoTracking()
            .Where(o => o.CustomerName == customerName)
            .OrderByDescending(o => o.CreatedAt)
            .Skip((page - 1) * pageSize)
            .Take(pageSize)
            .ToListAsync();
    }

    public async Task<OrderStatisticsDto> GetStatisticsAsync(DateTime fromDate, DateTime toDate)
    {
        // 使用预计算的统计表
        var statistics = await _readDbContext.OrderStatistics
            .AsNoTracking()
            .Where(s => s.Date >= fromDate.Date && s.Date <= toDate.Date)
            .GroupBy(s => 1)
            .Select(g => new OrderStatisticsDto
            {
                TotalOrders = g.Sum(s => s.TotalOrders),
                TotalRevenue = g.Sum(s => s.TotalRevenue),
                AverageOrderValue = g.Average(s => s.AverageOrderValue)
            })
            .FirstOrDefaultAsync();

        return statistics ?? new OrderStatisticsDto();
    }
}

7. CQRS的测试策略

7.1 命令处理器测试

// Tests/Application/Handlers/CreateOrderCommandHandlerTests.cs
public class CreateOrderCommandHandlerTests
{
    [Fact]
    public async Task HandleAsync_WithValidCommand_ShouldCreateOrder()
    {
        // Arrange
        var mockRepository = new Mock<IOrderRepository>();
        var mockEventBus = new Mock<IEventBus>();
        
        var handler = new CreateOrderCommandHandler(
            mockRepository.Object,
            mockEventBus.Object);

        var command = new CreateOrderCommand
        {
            OrderId = Guid.NewGuid(),
            CustomerName = "John Doe",
            Items = new List<OrderItem>
            {
                new OrderItem
                {
                    ProductId = Guid.NewGuid(),
                    ProductName = "Product 1",
                    Quantity = 2,
                    UnitPrice = 10.00m
                }
            },
            TotalAmount = 20.00m
        };

        // Act
        await handler.HandleAsync(command);

        // Assert
        mockRepository.Verify(
            r => r.AddAsync(It.Is<Order>(o => 
                o.Id == command.OrderId &&
                o.CustomerName == command.CustomerName &&
                o.TotalAmount == command.TotalAmount)),
            Times.Once);

        mockEventBus.Verify(
            e => e.PublishAsync(It.IsAny<OrderCreatedEvent>()),
            Times.Once);
    }

    [Fact]
    public async Task HandleAsync_WithInvalidCustomerName_ShouldThrowException()
    {
        // Arrange
        var mockRepository = new Mock<IOrderRepository>();
        var mockEventBus = new Mock<IEventBus>();
        
        var handler = new CreateOrderCommandHandler(
            mockRepository.Object,
            mockEventBus.Object);

        var command = new CreateOrderCommand
        {
            OrderId = Guid.NewGuid(),
            CustomerName = "", // 无效的客户名
            Items = new List<OrderItem>
            {
                new OrderItem
                {
                    ProductId = Guid.NewGuid(),
                    ProductName = "Product 1",
                    Quantity = 2,
                    UnitPrice = 10.00m
                }
            },
            TotalAmount = 20.00m
        };

        // Act & Assert
        await Assert.ThrowsAsync<ArgumentException>(() => handler.HandleAsync(command));
    }
}

7.2 查询处理器测试

// Tests/Application/Handlers/GetOrderByIdQueryHandlerTests.cs
public class GetOrderByIdQueryHandlerTests
{
    [Fact]
    public async Task HandleAsync_WithValidId_ShouldReturnOrder()
    {
        // Arrange
        var mockRepository = new Mock<IOrderReadRepository>();
        
        var expectedOrder = new OrderReadModel
        {
            Id = Guid.NewGuid(),
            CustomerName = "John Doe",
            Items = new List<OrderItemDto>
            {
                new OrderItemDto
                {
                    ProductId = Guid.NewGuid(),
                    ProductName = "Product 1",
                    Quantity = 2,
                    UnitPrice = 10.00m,
                    Subtotal = 20.00m
                }
            },
            TotalAmount = 20.00m,
            CreatedAt = DateTime.UtcNow
        };

        mockRepository
            .Setup(r => r.GetByIdAsync(expectedOrder.Id))
            .ReturnsAsync(expectedOrder);

        var handler = new GetOrderByIdQueryHandler(mockRepository.Object);
        var query = new GetOrderByIdQuery { OrderId = expectedOrder.Id };

        // Act
        var result = await handler.HandleAsync(query);

        // Assert
        Assert.NotNull(result);
        Assert.Equal(expectedOrder.Id, result.Id);
        Assert.Equal(expectedOrder.CustomerName, result.CustomerName);
        Assert.Equal(expectedOrder.TotalAmount, result.TotalAmount);
    }

    [Fact]
    public async Task HandleAsync_WithInvalidId_ShouldThrowException()
    {
        // Arrange
        var mockRepository = new Mock<IOrderReadRepository>();
        
        mockRepository
            .Setup(r => r.GetByIdAsync(It.IsAny<Guid>()))
            .ReturnsAsync((OrderReadModel)null);

        var handler = new GetOrderByIdQueryHandler(mockRepository.Object);
        var query = new GetOrderByIdQuery { OrderId = Guid.NewGuid() };

        // Act & Assert
        await Assert.ThrowsAsync<KeyNotFoundException>(() => handler.HandleAsync(query));
    }
}

8. CQRS的部署与运维

8.1 Docker部署配置

# Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["CQRS-Example/CQRS-Example.csproj", "CQRS-Example/"]
RUN dotnet restore "CQRS-Example/CQRS-Example.csproj"
COPY . .
WORKDIR "/src/CQRS-Example"
RUN dotnet build "CQRS-Example.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "CQRS-Example.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "CQRS-Example.dll"]

8.2 Docker Compose配置

# docker-compose.yml
version: '3.8'

services:
  cqrs-api:
    build: .
    ports:
      - "5000:80"
    environment:
      - ASPNETCORE_ENVIRONMENT=Production
      - ConnectionStrings__WriteDb=Server=write-db;Database=CQRS_Write;User=sa;Password=YourPassword123!;
      - ConnectionStrings__ReadDb=Server=read-db;Database=CQRS_Read;User=sa;Password=YourPassword123!;
      - RabbitMQ__Host=rabbitmq
      - RabbitMQ__Port=5672
    depends_on:
      - write-db
      - read-db
      - rabbitmq
    networks:
      - cqrs-network

  write-db:
    image: mcr.microsoft.com/mssql/server:2022-latest
    environment:
      - ACCEPT_EULA=Y
      - SA_PASSWORD=YourPassword123!
    ports:
      - "1433:1433"
    volumes:
      - write-db-data:/var/opt/mssql
    networks:
      - cqrs-network

  read-db:
    image: postgres:15
    environment:
      - POSTGRES_PASSWORD=YourPassword123!
      - POSTGRES_DB=CQRS_Read
    ports:
      - "5432:5432"
    volumes:
      - read-db-data:/var/lib/postgresql/data
    networks:
      - cqrs-network

  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=YourPassword123!
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
    networks:
      - cqrs-network

volumes:
  write-db-data:
  read-db-data:
  rabbitmq-data:

networks:
  cqrs-network:
    driver: bridge

8.3 监控与日志

// Infrastructure/Monitoring/OrderMonitoringService.cs
public class OrderMonitoringService
{
    private readonly ILogger<OrderMonitoringService> _logger;
    private readonly IMetrics _metrics;

    public OrderMonitoringService(
        ILogger<OrderMonitoringService> logger,
        IMetrics metrics)
    {
        _logger = logger;
        _metrics = metrics;
    }

    public void TrackCommandExecution(string commandName, long durationMs, bool success)
    {
        _logger.LogInformation(
            "Command {CommandName} executed in {Duration}ms with result {Success}",
            commandName, durationMs, success);

        _metrics.IncrementCounter($"cqrs.command.{commandName}.total");
        _metrics.RecordHistogram($"cqrs.command.{commandName}.duration", durationMs);
        
        if (!success)
        {
            _metrics.IncrementCounter($"cqrs.command.{commandName}.errors");
        }
    }

    public void TrackQueryExecution(string queryName, long durationMs, bool success)
    {
        _logger.LogInformation(
            "Query {QueryName} executed in {Duration}ms with result {Success}",
            queryName, durationMs, success);

        _metrics.IncrementCounter($"cqrs.query.{queryName}.total");
        _metrics.RecordHistogram($"cqrs.query.{queryName}.duration", durationMs);
        
        if (!success)
        {
            _metrics.IncrementCounter($"cqrs.query.{queryName}.errors");
        }
    }

    public void TrackEventProcessing(string eventName, long durationMs, bool success)
    {
        _logger.LogInformation(
            "Event {EventName} processed in {Duration}ms with result {Success}",
            eventName, durationMs, success);

        _metrics.IncrementCounter($"cqrs.event.{eventName}.total");
        _metrics.RecordHistogram($"cqrs.event.{eventName}.duration", durationMs);
        
        if (!success)
        {
            _metrics.IncrementCounter($"cqrs.event.{eventName}.errors");
        }
    }
}

9. CQRS的常见问题与解决方案

9.1 数据一致性问题

问题:CQRS通常引入最终一致性,可能导致读写模型数据不一致。

解决方案

  1. 版本控制:在读模型中包含版本号,客户端可以检查版本
  2. 补偿机制:当检测到不一致时,触发补偿操作
  3. 事件重放:定期重放事件以修复不一致的读模型
// Infrastructure/Persistence/ConsistencyChecker.cs
public class ConsistencyChecker
{
    private readonly IEventStore _eventStore;
    private readonly IReadModelUpdater _readModelUpdater;

    public ConsistencyChecker(
        IEventStore eventStore,
        IReadModelUpdater readModelUpdater)
    {
        _eventStore = eventStore;
        _readModelUpdater = readModelUpdater;
    }

    public async Task CheckAndFixInconsistencies()
    {
        var allEvents = await _eventStore.GetAllEventsAsync();
        
        // 按聚合ID分组
        var eventsByAggregate = allEvents
            .GroupBy(e => e.AggregateId)
            .ToDictionary(g => g.Key, g => g.OrderBy(e => e.Version).ToList());

        foreach (var aggregateId in eventsByAggregate.Keys)
        {
            var events = eventsByAggregate[aggregateId];
            
            // 重建读模型
            var reconstructedModel = await ReconstructReadModelAsync(events);
            
            // 获取当前读模型
            var currentModel = await _readModelUpdater.GetCurrentModelAsync(aggregateId);
            
            // 比较并修复差异
            if (!AreModelsEqual(reconstructedModel, currentModel))
            {
                await _readModelUpdater.UpdateReadModelAsync(aggregateId, reconstructedModel);
            }
        }
    }

    private async Task<object> ReconstructReadModelAsync(List<IEvent> events)
    {
        // 重放事件重建读模型
        // 实现细节取决于具体业务逻辑
        return new object(); // 简化示例
    }

    private bool AreModelsEqual(object model1, object model2)
    {
        // 实现模型比较逻辑
        return model1?.Equals(model2) ?? false;
    }
}

9.2 复杂性管理

问题:CQRS增加了架构复杂性,可能难以维护。

解决方案

  1. 分层架构:清晰分离应用层、领域层、基础设施层
  2. 框架支持:使用成熟的CQRS框架(如Axon Framework、EventFlow)
  3. 代码生成:使用代码生成工具减少样板代码

9.3 学习曲线

问题:团队需要时间学习CQRS和事件溯源概念。

解决方案

  1. 渐进式采用:从简单场景开始,逐步扩展
  2. 培训与文档:提供详细的内部文档和培训
  3. 代码审查:通过代码审查确保正确实现

10. CQRS的最佳实践

10.1 设计原则

  1. 单一职责:每个组件只负责一个明确的职责
  2. 开闭原则:对扩展开放,对修改关闭
  3. 依赖倒置:依赖抽象而非具体实现
  4. 领域驱动设计:使用DDD概念建模业务逻辑

10.2 代码组织

  1. 清晰的命名:使用描述性的名称,如CreateOrderCommandGetOrderByIdQuery
  2. 一致的模式:在整个项目中保持一致的代码模式
  3. 适当的抽象:避免过度抽象,保持代码可读性

10.3 性能优化

  1. 读写分离:使用不同的数据库实例
  2. 缓存策略:合理使用缓存,避免缓存穿透
  3. 异步处理:使用消息队列处理耗时操作
  4. 索引优化:为读模型查询创建适当的索引

10.4 错误处理

  1. 事务管理:确保命令处理的原子性
  2. 重试机制:对临时性故障实现重试逻辑
  3. 死信队列:处理无法处理的消息
  4. 监控告警:及时发现和处理异常

11. 总结

CQRS是一种强大的架构模式,特别适合复杂业务场景。通过分离读写职责,CQRS能够提供更好的性能、可扩展性和维护性。然而,它也引入了额外的复杂性,需要仔细权衡。

11.1 关键要点

  1. 理解核心概念:命令、查询、处理器、读写模型
  2. 选择合适的场景:CQRS不是银弹,只适用于复杂场景
  3. 渐进式采用:从简单实现开始,逐步演进
  4. 关注一致性:处理好读写模型的一致性问题
  5. 持续优化:根据实际需求调整架构

11.2 进一步学习资源

  1. 书籍

    • 《领域驱动设计:软件核心复杂性应对之道》
    • 《实现领域驱动设计》
    • 《CQRS与事件溯源》
  2. 框架

    • Axon Framework(Java)
    • EventFlow(.NET)
    • Lagom(Scala)
  3. 在线资源

    • Martin Fowler的CQRS文章
    • Microsoft的CQRS示例项目
    • GitHub上的开源CQRS项目

通过本文的完整指南,您应该能够从理论到实践全面理解CQRS,并在实际项目中成功应用。记住,架构模式的选择应该基于具体业务需求,而不是盲目追求新技术。