65.9K
CodeProject 正在变化。 阅读更多。
Home

构建一个简洁的 DotNetCore CQS 数据管道

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.23/5 (4投票s)

2022年8月22日

CPOL

8分钟阅读

viewsIcon

6924

如何构建 CQS 数据管道

测试数据

附录提供了数据类和测试数据提供程序的摘要。存储库的文档部分有完整的描述。

存储库

本文相关的数据存储库在此:Blazr.Demo.DataPipeline。存储库中还有一些关于设计、数据类、测试数据提供程序和数据库上下文的详细文档。

引言

CQS - 不要与 CQRS 混淆 - 本质上是一种编程风格。每个操作都是:

  1. 一个 Command(命令)- 请求进行数据更改
  2. 一个 Query(查询)- 请求获取一些数据

一个 Command 返回状态信息或不返回任何内容。命令从不返回数据。

一个 Query 返回一个数据集。查询从不更改数据状态。进行查询没有副作用

这是一个非常好的模式,可以普遍应用于您的代码。

每个操作都有一个 Command/Query 类来定义操作,以及一个 Handler(处理程序)类来执行定义的操作。通常,是一对一的关系:每个请求都有一个唯一的处理程序。

本质上

  • 一个 Request 对象定义了 Handler 需要执行请求的信息以及它期望的返回结果 - Result

  • 一个 Handler 对象执行必要的代码,并使用 Request 提供的数据返回定义的 Result

概念上,它非常简单,实现起来也相对容易。问题在于每个数据库操作都需要一个请求和一个处理程序对象。大量类定义和重复相同的旧代码。

解决方案布局和设计

该解决方案包含一组遵循干净设计原则的库。它旨在适用于任何 DotNetCore 环境。Blazr.CoreBlazr.Data 是可用于任何实现的两个基础库。Blazr.Demo.CoreBlazr.Demo.Data 是两个应用程序特定的库。

前端应用程序是一个 XUnit 测试项目,用于演示和测试代码。

我在 Blazor 项目中使用它。

接口和基类

基本方法可以通过两个泛型接口来定义。

ICQSRequest 定义任何请求

  1. 它表明请求产生一个由 TResult 定义的输出。
  2. 它有一个唯一的 TransactionId 来跟踪事务(如果需要且已实现)。
public interface ICQSRequest<out TResult>
{
    public Guid TransactionId { get;}
}

ICQSHandler 定义任何执行 ICQSRequest 实例的处理程序

  1. 处理程序接收一个实现了 ICQSRequest 接口的 TRequest
  2. 处理程序输出 ICQSRequest 接口中定义的 TResult
  3. 它有一个单一的 ExecuteAsync 方法,该方法接受一个 TRequest 并返回 TResult
public interface ICQSHandler<in TRequest, out TResult>
    where TRequest : ICQSRequest<TResult>
{
    TResult ExecuteAsync(TRequest request);
}

要构建更简洁的实现

  • 我们必须接受 80/20 法则。并非所有请求都能通过我们的标准实现来满足,但节省 80% 的工作量和类也是很有意义的。
  • 我们需要一种方法来处理那 20%。
  • 我们需要一个“兼容”的基于泛型的 ORM 来与我们的数据存储交互。此实现使用了Entity Framework,它提供了此功能。
  • 在基类中编写一些非常复杂的泛型代码,将功能抽象到样板代码中。

结果

解决方案定义了一组要返回的标准结果:请求的 TResult。它们被定义为带有静态构造函数的 record,并包含状态信息,如果是查询,则包含数据。它们必须是可序列化的,以便在 API 中使用。每个如下所示

public record ListProviderResult<TRecord>
{
    public IEnumerable<TRecord> Items { get; init; }
    public int TotalItemCount { get; init; }
    public bool Success { get; init; }
    public string? Message { get; init; }
    //....Constructors
}
public record RecordProviderResult<TRecord>
{
    public TRecord? Record { get; init; }
    public bool Success { get; init; }
    public string? Message { get; init; }
    //....Constructors
}
public record CommandResult
{
    public Guid NewId { get; init; }
    public bool Success { get; init; }
    public string Message { get; init; }
    //....Constructors
}
public record FKListProviderResult
{
    public IEnumerable<IFkListItem> Items { get; init; }
    public bool Success { get; init; }
    public string? Message { get; init; }
    //....Constructors
}

所有都实现了static构造函数,以严格控制内容。

基类

TRecord 代表使用 ORM 从数据存储中检索的数据类。它被限定为实现空构造函数 new()class

Request 接口/类结构如下所示

Handler 接口/类结构如下所示

Commands

所有命令

  1. 接收一个我们定义为 TRecord 的记录。
  2. TResult 固定为异步 ValueTask<CommandResult>

一个实现 ICQSRequest 和此功能的接口。

public interface IRecordCommand<TRecord> 
    : ICQSRequest<ValueTask<CommandResult>>
{
    public TRecord Record { get;}
}

以及一个抽象实现。

public abstract class RecordCommandBase<TRecord>
     : IRecordCommand<TRecord>
{
    public Guid TransactionId { get; } = Guid.NewGuid(); 
    public TRecord Record { get; protected set; } = default!;

    protected RecordCommandBase() { }
}

现在我们可以定义 Add/Delete/Update 特定命令。所有命令都使用静态构造函数来控制和验证内容。必须存在一对一的关系(请求 -> 处理程序),因此我们为每种类型的命令定义一个处理程序。

public class AddRecordCommand<TRecord>
     : RecordCommandBase<TRecord>
{
    private AddRecordCommand() { }

    public static AddRecordCommand<TRecord> GetCommand(TRecord record)
        => new AddRecordCommand<TRecord> { Record = record };
}
public class DeleteRecordCommand<TRecord>
     : RecordCommandBase<TRecord>
{
    private DeleteRecordCommand() { }

    public static DeleteRecordCommand<TRecord> GetCommand(TRecord record)
        => new DeleteRecordCommand<TRecord> { Record = record };
}
public class UpdateRecordCommand<TRecord>
     : RecordCommandBase<TRecord>
{
    private UpdateRecordCommand() { }

    public static UpdateRecordCommand<TRecord> GetCommand(TRecord record)
        => new UpdateRecordCommand<TRecord> { Record = record };
}

命令处理器

为处理程序创建接口或基类没有好处,因此我们实现 Create/Update/Delete 命令作为三个单独的类。TRecord 定义记录类,TDbContext 定义 DI DbContextFactory 中使用的DbContext

我们使用 DbContext 内置的泛型方法,因此不需要特定的 DbContextSet<TRecord> 方法查找 TRecordDbSet 实例,而 Update<TRecord>Add<TRecord>Delete<TRecord> 方法与 SaveChangesAsync 一起实现命令。

所有处理程序都遵循相同的模式。

  1. 构造函数传入 DbContext 工厂和要执行的命令请求。
  2. Execute:
    1. 获取一个 DbContext
    2. 在上下文中调用泛型 Add/Update/Delete,传入记录。内部,EF 查找记录集和特定记录,并用提供的记录替换它。
    3. 调用 DbContext 上的 SaveChanges,将更改提交到数据存储。
    4. 检查我们是否有一个更改,并返回一个 CommandResult

这是 Add Handler(添加处理程序)

public class AddRecordCommandHandler<TRecord, TDbContext>
    : ICQSHandler<AddRecordCommand<TRecord>, ValueTask<CommandResult>>
    where TDbContext : DbContext
    where TRecord : class, new()
{
    protected IDbContextFactory<TDbContext> factory;

    public AddRecordCommandHandler(IDbContextFactory<TDbContext> factory)
        =>  this.factory = factory;

    public async ValueTask<CommandResult> ExecuteAsync(AddRecordCommand<TRecord> command)
    {
        using var dbContext = factory.CreateDbContext();
        dbContext.Add<TRecord>(command.Record);
        return await dbContext.SaveChangesAsync() == 1
            ? CommandResult.Successful("Record Saved")
            : CommandResult.Failure("Error saving Record");
    }
}

查询

查询并不那么统一。

  1. 存在各种类型的 TResult
  2. 它们具有 WhereOrderBy 等特定操作。

为了处理这些需求,我们定义了三个查询请求

RecordQuery(记录查询)

这会返回一个 RecordProviderResult,其中包含一个基于提供的 Uid 的单个记录。

public record RecordQuery<TRecord>
    : ICQSRequest<ValueTask<RecordProviderResult<TRecord>>>
{
    public Guid TransactionId { get; } = Guid.NewGuid();
    public Guid GuidId { get; init; }

    protected RecordQuery() { }

    public static RecordQuery<TRecord> GetQuery(Guid recordId)
        => new RecordQuery<TRecord> { GuidId = recordId };
}

ListQuery(列表查询)

这会返回一个 ListProviderResult,其中包含一个分页IEnumerable TRecord

我们定义了一个接口

public interface IListQuery<TRecord>
    : ICQSRequest<ValueTask<ListProviderResult<TRecord>>>
    where TRecord : class, new()
{
    public int StartIndex { get; }
    public int PageSize { get; }
    public string? SortExpressionString { get; }
    public string? FilterExpressionString { get; }
}

一个抽象的基类实现

public abstract record ListQueryBase<TRecord>
    :IListQuery<TRecord>
    where TRecord : class, new()
{
    public int StartIndex { get; init; }
    public int PageSize { get; init; }
    public string? SortExpressionString { get; init; }
    public string? FilterExpressionString { get; init; }
    public Guid TransactionId { get; init; } = Guid.NewGuid();

    protected ListQueryBase() { }
}

最后是一个泛型查询

public record ListQuery<TRecord>
    :ListQueryBase<TRecord>
    where TRecord : class, new()
{
    public static ListQuery<TRecord> GetQuery(ListProviderRequest<TRecord> request)
        => new ListQuery<TRecord>
        {
            StartIndex = request.StartIndex,
            PageSize = request.PageSize,
            SortExpressionString = request.SortExpressionString,
            FilterExpressionString = request.FilterExpressionString
        };
}

我们将代码分成接口/抽象基类模式,以便可以实现自定义列表查询。如果它们继承自 ListQuery,我们会遇到工厂和模式匹配方法的问题。使用基类来实现样板代码可以解决此问题。

FKListQuery(外键列表查询)

这会返回一个 FkListProviderResult,其中包含一个 IEnumerable IFkListItemFkListItem 是一个简单的对象,包含一个 Guid/Name 对。其主要用途是在 UI 中的外键 Select 控件中。

public record FKListQuery<TRecord>
    : ICQSRequest<ValueTask<FKListProviderResult>>
{
    public Guid TransactionId { get; } = Guid.NewGuid();
}

Query Handlers(查询处理程序)

相应的查询处理程序是

RecordQueryHandler(记录查询处理程序)

创建“通用”版本可能具有挑战性,具体取决于 ORM。

需要注意的关键概念是

  1. 使用 IDbContextFactory工作单元 DbContexts
  2. _dbContext.Set<TRecord>() 获取 TRecordDbSet
  3. 使用两种方法来应用查询。
public class RecordQueryHandler<TRecord, TDbContext>
    : ICQSHandler<RecordQuery<TRecord>, 
    ValueTask<RecordProviderResult<TRecord>>>
        where TRecord : class, new()
        where TDbContext : DbContext
{
    private IDbContextFactory<TDbContext> _factory;

    public RecordQueryHandler(IDbContextFactory<TDbContext> factory)
        =>  _factory = factory;

    public async ValueTask<RecordProviderResult<TRecord>> 
                              ExecuteAsync(RecordQuery<TRecord> query)
    {
        var dbContext = _factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;

        TRecord? record = null;

        // first check if the record implements IRecord. 
        // If so we can do a cast and then do the query via the Uid property directly 
        if ((new TRecord()) is IRecord)
            record = await dbContext.Set<TRecord>().SingleOrDefaultAsync
                                     (item => ((IRecord)item).Uid == query.GuidId);

        // Try and use the EF FindAsync implementation
        if (record is null)
                record = await dbContext.FindAsync<TRecord>(query.GuidId);

        if (record is null)
            return RecordProviderResult<TRecord>.Failure
            (No record retrieved");

        return RecordProviderResult<TRecord>.Successful(record);
    }
}

ListQueryHandler(列表查询处理程序)

此处需要注意的关键概念是

  1. 使用 IDbContextFactory工作单元 DbContexts
  2. _dbContext.Set<TRecord>() 获取 TRecordDbSet
  3. 使用 IQueryable 构建查询。
  4. 需要两个查询。一个用于获取“分页”recordset,一个用于获取总记录数。
  5. 使用 System.Linq.Dynamic 进行排序和过滤。稍后将对此进行讨论。
public class ListQueryHandler<TRecord, TDbContext>
    : IListQueryHandler<TRecord>
        where TDbContext : DbContext
        where TRecord : class, new()
{
    protected IEnumerable<TRecord> items = Enumerable.Empty<TRecord>();
    protected int count = 0;

    protected IDbContextFactory<TDbContext> factory;
    protected IListQuery<TRecord> listQuery = default!;

    public ListQueryHandler(IDbContextFactory<TDbContext> factory)
        => this.factory = factory;

    public async ValueTask<ListProviderResult<TRecord>> 
                 ExecuteAsync(IListQuery<TRecord> query)
    {
        if (query is null)
            return ListProviderResult<TRecord>.Failure
            ("No Query Defined");

        listQuery = query;

        if (await this.GetItemsAsync())
            await this.GetCountAsync();

        return ListProviderResult<TRecord>.Successful(this.items, this.count);
    }

    protected virtual async ValueTask<bool> GetItemsAsync()
    {
        var dbContext = this.factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;

        IQueryable<TRecord> query = dbContext.Set<TRecord>();

        if (listQuery.FilterExpressionString is not null)
            query = query
                .Where(listQuery.FilterExpressionString)
                .AsQueryable();

        if (listQuery.SortExpressionString is not null)
            query = query.OrderBy(listQuery.SortExpressionString);

        if (listQuery.PageSize > 0)
            query = query
                .Skip(listQuery.StartIndex)
                .Take(listQuery.PageSize);

        if (query is IAsyncEnumerable<TRecord>)
            this.items = await query.ToListAsync();
        else
            this.items = query.ToList();

        return true;
    }

    protected virtual async ValueTask<bool> GetCountAsync()
    {
        var dbContext = this.factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = 
                                QueryTrackingBehavior.NoTracking;

        IQueryable<TRecord> query = dbContext.Set<TRecord>();

        if (listQuery.FilterExpressionString is not null)
            query = query
                .Where(listQuery.FilterExpressionString)
                .AsQueryable();

        if (query is IAsyncEnumerable<TRecord>)
            count = await query.CountAsync();
        else
            count = query.Count();

        return true;
    }
}

FKListQueryHandler(外键列表查询处理程序)

public class FKListQueryHandler<TRecord, TDbContext>
    : ICQSHandler<FKListQuery<TRecord>, ValueTask<FKListProviderResult>>
        where TDbContext : DbContext
        where TRecord : class, IFkListItem, new()
{
    protected IEnumerable<TRecord> items = Enumerable.Empty<TRecord>();
    protected IDbContextFactory<TDbContext> factory;

    public FKListQueryHandler(IDbContextFactory<TDbContext> factory)
        => this.factory = factory;

    public async ValueTask<FKListProviderResult> 
                 ExecuteAsync(FKListQuery<TRecord> listQuery)
    {
        var dbContext = this.factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = 
                                QueryTrackingBehavior.NoTracking;

        if (listQuery is null)
            return FKListProviderResult.Failure
            ("No Query defined");

        IEnumerable<TRecord> dbSet = await dbContext.Set<TRecord>().ToListAsync();

        return FKListProviderResult.Successful(dbSet);
    }
}

实现处理程序

处理程序设计为两种方式使用

  1. 作为单独的依赖注入服务
  2. 通过依赖注入的工厂

我们将在测试中看到两者的用法。

通用工厂代理

代理使用单个方法 ExecuteAsync(Request),并为每个请求提供实现,该实现将正确的处理程序映射,执行请求并提供预期的结果。

var TResult = await DataBrokerInstance.ExecuteAsync<TRecord>(TRequest);

用于在 DI 中定义服务的接口

public interface ICQSDataBroker
{    
    public ValueTask<CommandResult> ExecuteAsync<TRecord>
    (AddRecordCommand<TRecord> command) where TRecord : class, new();
    public ValueTask<CommandResult> ExecuteAsync<TRecord>
    (UpdateRecordCommand<TRecord> command) where TRecord : class, new();
    public ValueTask<CommandResult> ExecuteAsync<TRecord>
    (DeleteRecordCommand<TRecord> command) where TRecord : class, new();
    public ValueTask<ListProviderResult<TRecord>> ExecuteAsync<TRecord>
    (ListQuery<TRecord> query) where TRecord : class, new();
    public ValueTask<RecordProviderResult<TRecord>> ExecuteAsync<TRecord>
    (RecordQuery<TRecord> query) where TRecord : class, new();
    public ValueTask<FKListProviderResult> ExecuteAsync<TRecord>
    (FKListQuery<TRecord> query) where TRecord : class, IFkListItem, new();
}

以及一个服务器代理实现

public class CQSDataBroker<TDbContext>
    :ICQSDataBroker
    where TDbContext : DbContext
{
    private readonly IDbContextFactory<TDbContext> _factory;
    private readonly IServiceProvider _serviceProvider;

    public CQSDataBroker(IDbContextFactory<TDbContext> factory, 
                         IServiceProvider serviceProvider)
    { 
        _factory = factory;
        _serviceProvider = serviceProvider;
    }

    public async ValueTask<CommandResult> ExecuteAsync<TRecord>
    (AddRecordCommand<TRecord> command) where TRecord : class, new()
    {
        var handler = new AddRecordCommandHandler<TRecord, TDbContext>
                      (_factory, command);
        return await handler.ExecuteAsync();
    }

    //.... Update and Delete ExecuteAsyncs

    public async ValueTask<ListProviderResult<TRecord>> ExecuteAsync<TRecord>
    (ListQuery<TRecord> query) where TRecord : class, new()
    {
        var handler = new ListQueryHandler<TRecord, TDbContext>(_factory, query);
        return await handler.ExecuteAsync();
    }

    public async ValueTask<RecordProviderResult<TRecord>> 
    ExecuteAsync<TRecord>(RecordQuery<TRecord> query) where TRecord : class, new()
    {
        var handler = new RecordQueryHandler<TRecord, TDbContext>(_factory, query);
        return await handler.ExecuteAsync();
    }

    public async ValueTask<FKListProviderResult> ExecuteAsync<TRecord>
    (FKListQuery<TRecord> query) where TRecord : class, IFkListItem, new()
    {
        var handler = new FKListQueryHandler<TRecord, TDbContext>(_factory, query);
        return await handler.ExecuteAsync();
    }

    public ValueTask<object> ExecuteAsync<TRecord>(object query)
        => throw new NotImplementedException();
}

注意捕获所有异常的方法。

测试代理

SetUp

这是代理演示测试的设置。它设置了一个 DI 服务容器并将实例传递给测试。

public CQSBrokerTests()
    // Creates an instance of the Test Data provider
    => _weatherTestDataProvider = WeatherTestDataProvider.Instance();

private ServiceProvider GetServiceProvider()
{
    // Creates a Service Collection
    var services = new ServiceCollection();

    // Adds the application services to the collection
    Action<DbContextOptionsBuilder> dbOptions = options => options.UseInMemoryDatabase
    ($"WeatherDatabase-{Guid.NewGuid().ToString()}");
    services.AddDbContextFactory<TDbContext>(options);
    services.AddSingleton<ICQSDataBroker, CQSDataBroker<InMemoryWeatherDbContext>>();

    // Creates a Service Provider from the Services collection
    // This is our DI container
    var provider = services.BuildServiceProvider();

    // Adds the test data to the in memory database
    var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
    WeatherTestDataProvider.Instance().LoadDbContext<InMemoryWeatherDbContext>(factory);

    return provider!;
}

测试

获取天气地点列表的典型测试

[Fact]
public async void TestWeatherLocationListCQSDataBroker()
{
    // Build our DI container
    var provider = GetServiceProvider();
    //Get the Data Broker
    var broker = provider.GetService<ICQSDataBroker>()!;

    // Get the control record count from the Test Data Provider
    var testRecordCount = _weatherTestDataProvider.WeatherLocations.Count();
    int pageSize = 10;
    // Get the expected recordset count.
    // It should be either the page size 
    //or the total record count if that's smaller
    var testCount = testRecordCount > pageSize ? pageSize : testRecordCount ;

    // Create a list request
    var listRequest = new ListProviderRequest<DboWeatherLocation>(0, pageSize);

    // Create a ListQuery and execute the query 
    //on the Data Broker against the DboWeatherLocation recordset
    var query = new ListQuery<DboWeatherLocation>(listRequest);
    var result = await broker.ExecuteAsync<DboWeatherLocation>(query);

    // Check we have success
    Assert.True(result.Success);
    // Check the recordset count
    Assert.Equal(testCount, result.Items.Count());
    // Check the total count os correct against the test provider
    Assert.True(result.TotalItemCount == testRecordCount);
}

以及一个 Add 命令测试

[Fact]
public async void TestAddCQSDataBroker()
{
    var provider = GetServiceProvider();
    var broker = provider.GetService<ICQSDataBroker>()!;
    var newRecord = _weatherTestDataProvider.GetForecast();
    var id = newRecord!.Uid;

    var command = new AddRecordCommand<DboWeatherForecast>(newRecord);
    var result = await broker.ExecuteAsync(command);

    var query = new RecordQuery<DboWeatherForecast>(id);
    var checkResult = await broker.ExecuteAsync(query);

    Assert.True(result.Success);
    Assert.Equal(newRecord, checkResult.Record);
}

自定义请求

过滤列表

这可能是最常见的自定义请求。标准 ListQuery 使用 Dynamic Linq,因此您可以将查询构建为字符串以传递到查询中。但是,Dynamic Linq 效率不高,因此我更喜欢在大量使用它们的地方定义自定义查询。

所有此类查询都可以使用自定义的 BaseListQuery

我们的示例自定义查询根据 Location 过滤 WeatherForecast

查询

  1. 继承自 ListQueryBase,将 TRecord 固定为 DvoWeatherForecast
  2. 定义一个 WeatherLocationId 属性
  3. 定义一个 static creator 方法
public record WeatherForecastListQuery
    : ListQueryBase<DvoWeatherForecast>
{
    public Guid WeatherLocationId { get; init; }

    private WeatherForecastListQuery() { }

    public static WeatherForecastListQuery GetQuery
    (Guid weatherLocationId, ListProviderRequest<DvoWeatherForecast> request)
        => new WeatherForecastListQuery
        {
            StartIndex = request.StartIndex,
            PageSize = request.PageSize,
            SortExpressionString = request.SortExpressionString,
            FilterExpressionString = request.FilterExpressionString,
            WeatherLocationId = weatherLocationId,
        };
}

Handler

这是基于与通用处理程序相同的模式构建的。

public class WeatherForecastListQueryHandler<TDbContext>
    : IListQueryHandler<DvoWeatherForecast>
        where TDbContext : DbContext
{
    protected IEnumerable<DvoWeatherForecast> items = 
                          Enumerable.Empty<DvoWeatherForecast>();
    protected int count = 0;

    protected IDbContextFactory<TDbContext> factory;
    protected WeatherForecastListQuery listQuery = default!;

    public WeatherForecastListQueryHandler(IDbContextFactory<TDbContext> factory)
    {
        this.factory = factory;
    }

    public async ValueTask<ListProviderResult<DvoWeatherForecast>> 
    ExecuteAsync(IListQuery<DvoWeatherForecast> query)
    {
        if (query is null || query is not WeatherForecastListQuery)
            return new ListProviderResult<DvoWeatherForecast>
            (new List<DvoWeatherForecast>(), 0, false, "No Query Defined");

        listQuery = (WeatherForecastListQuery)query;

        if (await this.GetItemsAsync())
            await this.GetCountAsync();

        return new ListProviderResult<DvoWeatherForecast>(this.items, this.count);
    }

    protected virtual async ValueTask<bool> GetItemsAsync()
    {
        var dbContext = this.factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;

        IQueryable<DvoWeatherForecast> query = dbContext.Set<DvoWeatherForecast>();

        if (listQuery.WeatherLocationId != Guid.Empty)
            query = query
                .Where(item => item.WeatherLocationId == listQuery.WeatherLocationId)
                .AsQueryable();

        if (listQuery.SortExpressionString is not null)
            query = query.OrderBy(listQuery.SortExpressionString);

        if (listQuery.PageSize > 0)
            query = query
                .Skip(listQuery.StartIndex)
                .Take(listQuery.PageSize);

        if (query is IAsyncEnumerable<DvoWeatherForecast>)
            this.items = await query.ToListAsync();
        else
            this.items = query.ToList();

        return true;
    }

    protected virtual async ValueTask<bool> GetCountAsync()
    {
        var dbContext = this.factory.CreateDbContext();
        dbContext.ChangeTracker.QueryTrackingBehavior = 
                                QueryTrackingBehavior.NoTracking;

        IQueryable<DvoWeatherForecast> query = dbContext.Set<DvoWeatherForecast>();

        if (listQuery.WeatherLocationId != Guid.Empty)
            query = query
                .Where(item => item.WeatherLocationId == listQuery.WeatherLocationId)
                .AsQueryable();

        if (query is IAsyncEnumerable<DvoWeatherForecast>)
            count = await query.CountAsync();
        else
            count = query.Count();

        return true;
    }
}

可以在 DI 中定义处理程序

services.AddScoped<IListQueryHandler<DvoWeatherForecast>, 
WeatherForecastListQueryHandler<InMemoryWeatherDbContext>>();

代理

我们可以将方法添加到标准代理中以处理 IListQueryHandler<TRecord>。请注意,使用此方法,我们每个数据类只能定义一个 IListQueryHandler

ICQSDataBroker 定义

public interface ICQSDataBroker
{
    public ValueTask<ListProviderResult<TRecord>> 
    ExecuteAsync<TRecord>(IListQuery<TRecord> query) where TRecord : class, new();
}

以及 CQSDataBroker 中的实现

public async ValueTask<ListProviderResult<TRecord>> 
ExecuteAsync<TRecord>(IListQuery<TRecord> query) where TRecord : class, new()
{
    var queryType = query.GetType();
    var handler = _serviceProvider.GetService<IListQueryHandler<TRecord>>();
    if (handler == null)
        throw new NullReferenceException
              ("No Handler service registered for the List Query");

    return await handler.ExecuteAsync(query);
}

测试

更新 CQSBrokerTests,添加自定义处理程序

private ServiceProvider GetServiceProvider()
{
    // Creates a Service Collection
    var services = new ServiceCollection();
    // Adds the application services to the collection
    Action<DbContextOptionsBuilder> dbOptions = options => options.UseInMemoryDatabase
    ($"WeatherDatabase-{Guid.NewGuid().ToString()}");
    services.AddWeatherAppServerDataServices<InMemoryWeatherDbContext>(dbOptions);
    services.AddSingleton<ICQSDataBroker, CQSDataBroker<InMemoryWeatherDbContext>>();
    services.AddScoped<IListQueryHandler<DvoWeatherForecast>, 
    WeatherForecastListQueryHandler<InMemoryWeatherDbContext>>();
    // Creates a Service Provider from the Services collection
    // This is our DI container
    var provider = services.BuildServiceProvider();

    // Adds the test data to the in memory database
    var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
    WeatherTestDataProvider.Instance().LoadDbContext<InMemoryWeatherDbContext>(factory);

    return provider!;
}

并添加一个测试

[Fact]
public async void TestCustomDvoWeatherForecastListCQSDataBroker()
{
    var provider = GetServiceProvider();
    var broker = provider.GetService<ICQSDataBroker>()!;
    var locationId = _weatherTestDataProvider.WeatherLocations.First().Uid;

    var testRecordCount = _weatherTestDataProvider.WeatherForecasts.Where
    (item => item.WeatherLocationId == locationId).Count();
    int pageSize = 10;
    var testCount = testRecordCount > pageSize ? pageSize : testRecordCount;

    var listRequest = new ListProviderRequest<DvoWeatherForecast>(0, pageSize);

    var query = WeatherForecastListQuery.GetQuery(locationId, listRequest);
    var result = await broker.ExecuteAsync<DvoWeatherForecast>(query);

    Assert.True(result.Success);
    Assert.Equal(testCount, result.Items.Count());
    Assert.True(result.TotalItemCount == testRecordCount);
}

身份提供程序

这演示了一个完整的自定义实现。它从数据库身份表中获取包含 ClaimsIdentity(身份验证系统的一部分)的结果。

供参考,数据库记录是

public record DboIdentity
{
    [Key] public Guid Id { get; init; } = Guid.Empty;
    public string Name { get; init; } = String.Empty;
    public string Role { get; init; } = String.Empty;
}

结果

public class IdentityRequestResult
{
    public ClaimsIdentity? Identity { get; init; } = null;
    public bool Success { get; init; } = false;
    public string Message { get; init; } = string.Empty;

    public static IdentityRequestResult Failure(string message)
        => new IdentityRequestResult {Message = message };

    public static IdentityRequestResult Successful
           (ClaimsIdentity identity, string? message = null)
        => new IdentityRequestResult 
        {Identity = identity, Success=true, Message = message ?? string.Empty };
}

查询请求

public record IdentityQuery
    : ICQSRequest<ValueTask<IdentityRequestResult>>
{
    public Guid TransactionId { get; } = Guid.NewGuid();
    public Guid IdentityId { get; init; } = Guid.Empty;

    public static IdentityQuery Query(Guid Uid)
        => new IdentityQuery { IdentityId = Uid };
}

一个处理程序接口:我们可能需要服务器和 API 版本。

public interface IIdentityQueryHandler
    : ICQSHandler<IdentityQuery, ValueTask<IdentityRequestResult>>
{}

以及处理程序

public class IdentityQueryHandler<TDbContext>
    : ICQSHandler<IdentityQuery, ValueTask<IdentityRequestResult>>
        where TDbContext : DbContext
{
    protected IDbContextFactory<TDbContext> factory;

    public IdentityQueryHandler(IDbContextFactory<TDbContext> factory)
        => this.factory = factory;

    public async ValueTask<IdentityRequestResult> ExecuteAsync(IdentityQuery query)
    {
        var dbContext = this.factory.CreateDbContext();
        IQueryable<DboIdentity> queryable = dbContext.Set<DboIdentity>();
        if (queryable is not null)
        {
            var record = await queryable.SingleOrDefaultAsync
                         (item => item.Id == query.IdentityId);
            if (record is not null)
            {
                var identity = new ClaimsIdentity(new[]
                {
                    new Claim(ClaimTypes.Sid, record.Id.ToString()),
                    new Claim(ClaimTypes.Name, record.Name),
                    new Claim(ClaimTypes.Role, record.Role)
                });
                return IdentityRequestResult.Successful(identity);
            }
            return IdentityRequestResult.Failure("No Identity exists.");
        }
        return IdentityRequestResult.Failure("No Identity Records Found.");
    }
}

以及演示测试

public class CQSCustomTests
{
    private WeatherTestDataProvider _weatherTestDataProvider;

    public CQSCustomTests()
        // Creates an instance of the Test Data provider
        => _weatherTestDataProvider = WeatherTestDataProvider.Instance();

    private ServiceProvider GetServiceProvider()
    {
        // Creates a Service Collection
        var services = new ServiceCollection();
        // Adds the application services to the collection
        Action<DbContextOptionsBuilder> dbOptions = options => 
        options.UseInMemoryDatabase($"WeatherDatabase-{Guid.NewGuid().ToString()}");
        services.AddWeatherAppServerDataServices<InMemoryWeatherDbContext>(dbOptions);
        services.AddScoped<IIdentityQueryHandler, 
        IdentityQueryHandler<InMemoryWeatherDbContext>>();
        // Creates a Service Provider from the Services collection
        // This is our DI container
        var provider = services.BuildServiceProvider();

        // Adds the test data to the in memory database
        var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
        WeatherTestDataProvider.Instance().LoadDbContext
                                <InMemoryWeatherDbContext>(factory);

        return provider!;
    }

    [Fact]
    public async void TestIdentityCQSDataBroker()
    {
        var provider = GetServiceProvider();
        var broker = provider.GetService<IIdentityQueryHandler>()!;

        var testRecord = _weatherTestDataProvider.Identities.Skip(1).First();

        var query = IdentityQuery.GetQuery(testRecord.Id);
        var result = await broker.ExecuteAsync(query);

        Assert.True(result.Success);
        Assert.NotNull(result.Identity);
        Assert.Equal(testRecord.Name, result.Identity.Name);
    }
}

摘要

希望我演示了一种不同、更简洁的实现 CQS 模式的方法。我现在是一个皈依者。

我故意没有实现事务日志记录或集中式异常处理。

附录

数据存储

本文和存储库的后端数据库是一个内存中的 Entity Framework 数据库。与模拟数据存储的其他方法相比,它的主要优点是它可以与 DbContext 工厂一起使用并支持多个上下文。我使用内存查询来模拟视图。

TestDataProvider 有一个方法可以将数据填充到 DbContext 中。

完整的 DbContext 如下所示

public class InMemoryWeatherDbContext
    : DbContext
{
    public DbSet<DboWeatherForecast> DboWeatherForecast { get; set; } = default!;
    public DbSet<DvoWeatherForecast> DvoWeatherForecast { get; set; } = default!;
    public DbSet<DboWeatherSummary> DboWeatherSummary { get; set; } = default!;
    public DbSet<DboWeatherLocation> DboWeatherLocation { get; set; } = default!;
    public DbSet<FkWeatherSummary> FkWeatherSummary { get; set; } = default!;
    public DbSet<FkWeatherLocation> FkWeatherLocation { get; set; } = default!;
    public DbSet<DboIdentity> DboIdentity { get; set; } = default!;

    public InMemoryWeatherDbContext
    (DbContextOptions<InMemoryWeatherDbContext> options) : base(options) { }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<DboWeatherForecast>().ToTable("WeatherForecast");
        modelBuilder.Entity<DboWeatherSummary>().ToTable("WeatherSummary");
        modelBuilder.Entity<DboWeatherLocation>().ToTable("WeatherLocation");
        modelBuilder.Entity<DboIdentity>().ToTable("Identity");

        modelBuilder.Entity<DvoWeatherForecast>()
            .ToInMemoryQuery(()
            => from f in this.DboWeatherForecast
               join s in this.DboWeatherSummary! on 
                      f.WeatherSummaryId equals s.Uid into fs
               from fsjoin in fs
               join l in this.DboWeatherLocation! on 
                      f.WeatherLocationId equals l.Uid into fl
               from fljoin in fl
               select new DvoWeatherForecast
               {
                   Uid = f.Uid,
                   WeatherSummaryId = f.WeatherSummaryId,
                   WeatherLocationId = f.WeatherLocationId,
                   Date = f.Date,
                   Summary = fsjoin.Summary,
                   Location = fljoin.Location,
                   TemperatureC = f.TemperatureC,
               })
            .HasKey(x => x.Uid);

        modelBuilder.Entity<FkWeatherSummary>()
            .ToInMemoryQuery(()
            => from s in this.DboWeatherSummary!
               select new FkWeatherSummary
               {
                   Id =s.Uid,
                   Name = s.Summary
               })
            .HasKey(x => x.Id);

        modelBuilder.Entity<FkWeatherLocation>()
            .ToInMemoryQuery(()
            => from l in this.DboWeatherLocation!
               select new FkWeatherLocation
               {
                   Id = l.Uid,
                   Name = l.Location
               })
            .HasKey(x => x.Id);
    }
}

存储库中有一个自述文件,其中提供了测试数据设置的完整描述。

历史

  • 2022 年 8 月 22 日:初始版本
© . All rights reserved.