构建一个简洁的 DotNetCore CQS 数据管道






4.23/5 (4投票s)
如何构建 CQS 数据管道
测试数据
附录提供了数据类和测试数据提供程序的摘要。存储库的文档部分有完整的描述。
存储库
本文相关的数据存储库在此:Blazr.Demo.DataPipeline。存储库中还有一些关于设计、数据类、测试数据提供程序和数据库上下文的详细文档。
引言
CQS - 不要与 CQRS 混淆 - 本质上是一种编程风格。每个操作都是:
- 一个
Command
(命令)- 请求进行数据更改 - 一个
Query
(查询)- 请求获取一些数据
一个 Command
返回状态信息或不返回任何内容。命令从不返回数据。
一个 Query
返回一个数据集。查询从不更改数据状态。进行查询没有副作用。
这是一个非常好的模式,可以普遍应用于您的代码。
每个操作都有一个 Command
/Query
类来定义操作,以及一个 Handler
(处理程序)类来执行定义的操作。通常,是一对一的关系:每个请求都有一个唯一的处理程序。
本质上
-
一个
Request
对象定义了Handler
需要执行请求的信息以及它期望的返回结果 -Result
。 -
一个
Handler
对象执行必要的代码,并使用Request
提供的数据返回定义的Result
。
概念上,它非常简单,实现起来也相对容易。问题在于每个数据库操作都需要一个请求和一个处理程序对象。大量类定义和重复相同的旧代码。
解决方案布局和设计
该解决方案包含一组遵循干净设计原则的库。它旨在适用于任何 DotNetCore 环境。Blazr.Core
和 Blazr.Data
是可用于任何实现的两个基础库。Blazr.Demo.Core
和 Blazr.Demo.Data
是两个应用程序特定的库。
前端应用程序是一个 XUnit 测试项目,用于演示和测试代码。
我在 Blazor 项目中使用它。
接口和基类
基本方法可以通过两个泛型接口来定义。
ICQSRequest
定义任何请求
- 它表明请求产生一个由
TResult
定义的输出。 - 它有一个唯一的
TransactionId
来跟踪事务(如果需要且已实现)。
public interface ICQSRequest<out TResult>
{
public Guid TransactionId { get;}
}
ICQSHandler
定义任何执行 ICQSRequest
实例的处理程序
- 处理程序接收一个实现了
ICQSRequest
接口的TRequest
。 - 处理程序输出
ICQSRequest
接口中定义的TResult
。 - 它有一个单一的
ExecuteAsync
方法,该方法接受一个TRequest
并返回TResult
。
public interface ICQSHandler<in TRequest, out TResult>
where TRequest : ICQSRequest<TResult>
{
TResult ExecuteAsync(TRequest request);
}
要构建更简洁的实现
- 我们必须接受 80/20 法则。并非所有请求都能通过我们的标准实现来满足,但节省 80% 的工作量和类也是很有意义的。
- 我们需要一种方法来处理那 20%。
- 我们需要一个“兼容”的基于泛型的 ORM 来与我们的数据存储交互。此实现使用了Entity Framework,它提供了此功能。
- 在基类中编写一些非常复杂的泛型代码,将功能抽象到样板代码中。
结果
解决方案定义了一组要返回的标准结果:请求的 TResult
。它们被定义为带有静态构造函数的 record
,并包含状态信息,如果是查询,则包含数据。它们必须是可序列化的,以便在 API 中使用。每个如下所示
public record ListProviderResult<TRecord>
{
public IEnumerable<TRecord> Items { get; init; }
public int TotalItemCount { get; init; }
public bool Success { get; init; }
public string? Message { get; init; }
//....Constructors
}
public record RecordProviderResult<TRecord>
{
public TRecord? Record { get; init; }
public bool Success { get; init; }
public string? Message { get; init; }
//....Constructors
}
public record CommandResult
{
public Guid NewId { get; init; }
public bool Success { get; init; }
public string Message { get; init; }
//....Constructors
}
public record FKListProviderResult
{
public IEnumerable<IFkListItem> Items { get; init; }
public bool Success { get; init; }
public string? Message { get; init; }
//....Constructors
}
所有都实现了static
构造函数,以严格控制内容。
基类
TRecord
代表使用 ORM 从数据存储中检索的数据类。它被限定为实现空构造函数 new()
的class
。
Request
接口/类结构如下所示
Handler
接口/类结构如下所示
Commands
所有命令
- 接收一个我们定义为
TRecord
的记录。 - 将
TResult
固定为异步ValueTask<CommandResult>
。
一个实现 ICQSRequest
和此功能的接口。
public interface IRecordCommand<TRecord>
: ICQSRequest<ValueTask<CommandResult>>
{
public TRecord Record { get;}
}
以及一个抽象实现。
public abstract class RecordCommandBase<TRecord>
: IRecordCommand<TRecord>
{
public Guid TransactionId { get; } = Guid.NewGuid();
public TRecord Record { get; protected set; } = default!;
protected RecordCommandBase() { }
}
现在我们可以定义 Add
/Delete
/Update
特定命令。所有命令都使用静态构造函数来控制和验证内容。必须存在一对一的关系(请求 -> 处理程序),因此我们为每种类型的命令定义一个处理程序。
public class AddRecordCommand<TRecord>
: RecordCommandBase<TRecord>
{
private AddRecordCommand() { }
public static AddRecordCommand<TRecord> GetCommand(TRecord record)
=> new AddRecordCommand<TRecord> { Record = record };
}
public class DeleteRecordCommand<TRecord>
: RecordCommandBase<TRecord>
{
private DeleteRecordCommand() { }
public static DeleteRecordCommand<TRecord> GetCommand(TRecord record)
=> new DeleteRecordCommand<TRecord> { Record = record };
}
public class UpdateRecordCommand<TRecord>
: RecordCommandBase<TRecord>
{
private UpdateRecordCommand() { }
public static UpdateRecordCommand<TRecord> GetCommand(TRecord record)
=> new UpdateRecordCommand<TRecord> { Record = record };
}
命令处理器
为处理程序创建接口或基类没有好处,因此我们实现 Create
/Update
/Delete
命令作为三个单独的类。TRecord
定义记录类,TDbContext
定义 DI DbContextFactory
中使用的DbContext
。
我们使用 DbContext
内置的泛型方法,因此不需要特定的 DbContext
。Set<TRecord>
方法查找 TRecord
的 DbSet
实例,而 Update<TRecord>
、Add<TRecord>
和 Delete<TRecord>
方法与 SaveChangesAsync
一起实现命令。
所有处理程序都遵循相同的模式。
- 构造函数传入
DbContext
工厂和要执行的命令请求。 Execute
:- 获取一个
DbContext
。 - 在上下文中调用泛型
Add/Update/Delete
,传入记录。内部,EF 查找记录集和特定记录,并用提供的记录替换它。 - 调用
DbContext
上的SaveChanges
,将更改提交到数据存储。 - 检查我们是否有一个更改,并返回一个
CommandResult
。
- 获取一个
这是 Add Handler(添加处理程序)
public class AddRecordCommandHandler<TRecord, TDbContext>
: ICQSHandler<AddRecordCommand<TRecord>, ValueTask<CommandResult>>
where TDbContext : DbContext
where TRecord : class, new()
{
protected IDbContextFactory<TDbContext> factory;
public AddRecordCommandHandler(IDbContextFactory<TDbContext> factory)
=> this.factory = factory;
public async ValueTask<CommandResult> ExecuteAsync(AddRecordCommand<TRecord> command)
{
using var dbContext = factory.CreateDbContext();
dbContext.Add<TRecord>(command.Record);
return await dbContext.SaveChangesAsync() == 1
? CommandResult.Successful("Record Saved")
: CommandResult.Failure("Error saving Record");
}
}
查询
查询并不那么统一。
- 存在各种类型的
TResult
。 - 它们具有
Where
和OrderBy
等特定操作。
为了处理这些需求,我们定义了三个查询请求
RecordQuery(记录查询)
这会返回一个 RecordProviderResult
,其中包含一个基于提供的 Uid
的单个记录。
public record RecordQuery<TRecord>
: ICQSRequest<ValueTask<RecordProviderResult<TRecord>>>
{
public Guid TransactionId { get; } = Guid.NewGuid();
public Guid GuidId { get; init; }
protected RecordQuery() { }
public static RecordQuery<TRecord> GetQuery(Guid recordId)
=> new RecordQuery<TRecord> { GuidId = recordId };
}
ListQuery(列表查询)
这会返回一个 ListProviderResult
,其中包含一个分页的 IEnumerable
TRecord
。
我们定义了一个接口
public interface IListQuery<TRecord>
: ICQSRequest<ValueTask<ListProviderResult<TRecord>>>
where TRecord : class, new()
{
public int StartIndex { get; }
public int PageSize { get; }
public string? SortExpressionString { get; }
public string? FilterExpressionString { get; }
}
一个抽象的基类实现
public abstract record ListQueryBase<TRecord>
:IListQuery<TRecord>
where TRecord : class, new()
{
public int StartIndex { get; init; }
public int PageSize { get; init; }
public string? SortExpressionString { get; init; }
public string? FilterExpressionString { get; init; }
public Guid TransactionId { get; init; } = Guid.NewGuid();
protected ListQueryBase() { }
}
最后是一个泛型查询
public record ListQuery<TRecord>
:ListQueryBase<TRecord>
where TRecord : class, new()
{
public static ListQuery<TRecord> GetQuery(ListProviderRequest<TRecord> request)
=> new ListQuery<TRecord>
{
StartIndex = request.StartIndex,
PageSize = request.PageSize,
SortExpressionString = request.SortExpressionString,
FilterExpressionString = request.FilterExpressionString
};
}
我们将代码分成接口/抽象基类模式,以便可以实现自定义列表查询。如果它们继承自 ListQuery
,我们会遇到工厂和模式匹配方法的问题。使用基类来实现样板代码可以解决此问题。
FKListQuery(外键列表查询)
这会返回一个 FkListProviderResult
,其中包含一个 IEnumerable
IFkListItem
。FkListItem
是一个简单的对象,包含一个 Guid
/Name
对。其主要用途是在 UI 中的外键 Select
控件中。
public record FKListQuery<TRecord>
: ICQSRequest<ValueTask<FKListProviderResult>>
{
public Guid TransactionId { get; } = Guid.NewGuid();
}
Query Handlers(查询处理程序)
相应的查询处理程序是
RecordQueryHandler(记录查询处理程序)
创建“通用”版本可能具有挑战性,具体取决于 ORM。
需要注意的关键概念是
- 使用
IDbContextFactory
的工作单元DbContexts
。 _dbContext.Set<TRecord>()
获取TRecord
的DbSet
。- 使用两种方法来应用查询。
public class RecordQueryHandler<TRecord, TDbContext>
: ICQSHandler<RecordQuery<TRecord>,
ValueTask<RecordProviderResult<TRecord>>>
where TRecord : class, new()
where TDbContext : DbContext
{
private IDbContextFactory<TDbContext> _factory;
public RecordQueryHandler(IDbContextFactory<TDbContext> factory)
=> _factory = factory;
public async ValueTask<RecordProviderResult<TRecord>>
ExecuteAsync(RecordQuery<TRecord> query)
{
var dbContext = _factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
TRecord? record = null;
// first check if the record implements IRecord.
// If so we can do a cast and then do the query via the Uid property directly
if ((new TRecord()) is IRecord)
record = await dbContext.Set<TRecord>().SingleOrDefaultAsync
(item => ((IRecord)item).Uid == query.GuidId);
// Try and use the EF FindAsync implementation
if (record is null)
record = await dbContext.FindAsync<TRecord>(query.GuidId);
if (record is null)
return RecordProviderResult<TRecord>.Failure
(No record retrieved");
return RecordProviderResult<TRecord>.Successful(record);
}
}
ListQueryHandler(列表查询处理程序)
此处需要注意的关键概念是
- 使用
IDbContextFactory
的工作单元DbContexts
。 _dbContext.Set<TRecord>()
获取TRecord
的DbSet
。- 使用
IQueryable
构建查询。 - 需要两个查询。一个用于获取“分页”
recordset
,一个用于获取总记录数。 - 使用
System.Linq.Dynamic
进行排序和过滤。稍后将对此进行讨论。
public class ListQueryHandler<TRecord, TDbContext>
: IListQueryHandler<TRecord>
where TDbContext : DbContext
where TRecord : class, new()
{
protected IEnumerable<TRecord> items = Enumerable.Empty<TRecord>();
protected int count = 0;
protected IDbContextFactory<TDbContext> factory;
protected IListQuery<TRecord> listQuery = default!;
public ListQueryHandler(IDbContextFactory<TDbContext> factory)
=> this.factory = factory;
public async ValueTask<ListProviderResult<TRecord>>
ExecuteAsync(IListQuery<TRecord> query)
{
if (query is null)
return ListProviderResult<TRecord>.Failure
("No Query Defined");
listQuery = query;
if (await this.GetItemsAsync())
await this.GetCountAsync();
return ListProviderResult<TRecord>.Successful(this.items, this.count);
}
protected virtual async ValueTask<bool> GetItemsAsync()
{
var dbContext = this.factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
IQueryable<TRecord> query = dbContext.Set<TRecord>();
if (listQuery.FilterExpressionString is not null)
query = query
.Where(listQuery.FilterExpressionString)
.AsQueryable();
if (listQuery.SortExpressionString is not null)
query = query.OrderBy(listQuery.SortExpressionString);
if (listQuery.PageSize > 0)
query = query
.Skip(listQuery.StartIndex)
.Take(listQuery.PageSize);
if (query is IAsyncEnumerable<TRecord>)
this.items = await query.ToListAsync();
else
this.items = query.ToList();
return true;
}
protected virtual async ValueTask<bool> GetCountAsync()
{
var dbContext = this.factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior =
QueryTrackingBehavior.NoTracking;
IQueryable<TRecord> query = dbContext.Set<TRecord>();
if (listQuery.FilterExpressionString is not null)
query = query
.Where(listQuery.FilterExpressionString)
.AsQueryable();
if (query is IAsyncEnumerable<TRecord>)
count = await query.CountAsync();
else
count = query.Count();
return true;
}
}
FKListQueryHandler(外键列表查询处理程序)
public class FKListQueryHandler<TRecord, TDbContext>
: ICQSHandler<FKListQuery<TRecord>, ValueTask<FKListProviderResult>>
where TDbContext : DbContext
where TRecord : class, IFkListItem, new()
{
protected IEnumerable<TRecord> items = Enumerable.Empty<TRecord>();
protected IDbContextFactory<TDbContext> factory;
public FKListQueryHandler(IDbContextFactory<TDbContext> factory)
=> this.factory = factory;
public async ValueTask<FKListProviderResult>
ExecuteAsync(FKListQuery<TRecord> listQuery)
{
var dbContext = this.factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior =
QueryTrackingBehavior.NoTracking;
if (listQuery is null)
return FKListProviderResult.Failure
("No Query defined");
IEnumerable<TRecord> dbSet = await dbContext.Set<TRecord>().ToListAsync();
return FKListProviderResult.Successful(dbSet);
}
}
实现处理程序
处理程序设计为两种方式使用
- 作为单独的依赖注入服务
- 通过依赖注入的工厂
我们将在测试中看到两者的用法。
通用工厂代理
代理使用单个方法 ExecuteAsync(Request)
,并为每个请求提供实现,该实现将正确的处理程序映射,执行请求并提供预期的结果。
var TResult = await DataBrokerInstance.ExecuteAsync<TRecord>(TRequest);
用于在 DI 中定义服务的接口
public interface ICQSDataBroker
{
public ValueTask<CommandResult> ExecuteAsync<TRecord>
(AddRecordCommand<TRecord> command) where TRecord : class, new();
public ValueTask<CommandResult> ExecuteAsync<TRecord>
(UpdateRecordCommand<TRecord> command) where TRecord : class, new();
public ValueTask<CommandResult> ExecuteAsync<TRecord>
(DeleteRecordCommand<TRecord> command) where TRecord : class, new();
public ValueTask<ListProviderResult<TRecord>> ExecuteAsync<TRecord>
(ListQuery<TRecord> query) where TRecord : class, new();
public ValueTask<RecordProviderResult<TRecord>> ExecuteAsync<TRecord>
(RecordQuery<TRecord> query) where TRecord : class, new();
public ValueTask<FKListProviderResult> ExecuteAsync<TRecord>
(FKListQuery<TRecord> query) where TRecord : class, IFkListItem, new();
}
以及一个服务器代理实现
public class CQSDataBroker<TDbContext>
:ICQSDataBroker
where TDbContext : DbContext
{
private readonly IDbContextFactory<TDbContext> _factory;
private readonly IServiceProvider _serviceProvider;
public CQSDataBroker(IDbContextFactory<TDbContext> factory,
IServiceProvider serviceProvider)
{
_factory = factory;
_serviceProvider = serviceProvider;
}
public async ValueTask<CommandResult> ExecuteAsync<TRecord>
(AddRecordCommand<TRecord> command) where TRecord : class, new()
{
var handler = new AddRecordCommandHandler<TRecord, TDbContext>
(_factory, command);
return await handler.ExecuteAsync();
}
//.... Update and Delete ExecuteAsyncs
public async ValueTask<ListProviderResult<TRecord>> ExecuteAsync<TRecord>
(ListQuery<TRecord> query) where TRecord : class, new()
{
var handler = new ListQueryHandler<TRecord, TDbContext>(_factory, query);
return await handler.ExecuteAsync();
}
public async ValueTask<RecordProviderResult<TRecord>>
ExecuteAsync<TRecord>(RecordQuery<TRecord> query) where TRecord : class, new()
{
var handler = new RecordQueryHandler<TRecord, TDbContext>(_factory, query);
return await handler.ExecuteAsync();
}
public async ValueTask<FKListProviderResult> ExecuteAsync<TRecord>
(FKListQuery<TRecord> query) where TRecord : class, IFkListItem, new()
{
var handler = new FKListQueryHandler<TRecord, TDbContext>(_factory, query);
return await handler.ExecuteAsync();
}
public ValueTask<object> ExecuteAsync<TRecord>(object query)
=> throw new NotImplementedException();
}
注意捕获所有异常的方法。
测试代理
SetUp
这是代理演示测试的设置。它设置了一个 DI 服务容器并将实例传递给测试。
public CQSBrokerTests()
// Creates an instance of the Test Data provider
=> _weatherTestDataProvider = WeatherTestDataProvider.Instance();
private ServiceProvider GetServiceProvider()
{
// Creates a Service Collection
var services = new ServiceCollection();
// Adds the application services to the collection
Action<DbContextOptionsBuilder> dbOptions = options => options.UseInMemoryDatabase
($"WeatherDatabase-{Guid.NewGuid().ToString()}");
services.AddDbContextFactory<TDbContext>(options);
services.AddSingleton<ICQSDataBroker, CQSDataBroker<InMemoryWeatherDbContext>>();
// Creates a Service Provider from the Services collection
// This is our DI container
var provider = services.BuildServiceProvider();
// Adds the test data to the in memory database
var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
WeatherTestDataProvider.Instance().LoadDbContext<InMemoryWeatherDbContext>(factory);
return provider!;
}
测试
获取天气地点列表的典型测试
[Fact]
public async void TestWeatherLocationListCQSDataBroker()
{
// Build our DI container
var provider = GetServiceProvider();
//Get the Data Broker
var broker = provider.GetService<ICQSDataBroker>()!;
// Get the control record count from the Test Data Provider
var testRecordCount = _weatherTestDataProvider.WeatherLocations.Count();
int pageSize = 10;
// Get the expected recordset count.
// It should be either the page size
//or the total record count if that's smaller
var testCount = testRecordCount > pageSize ? pageSize : testRecordCount ;
// Create a list request
var listRequest = new ListProviderRequest<DboWeatherLocation>(0, pageSize);
// Create a ListQuery and execute the query
//on the Data Broker against the DboWeatherLocation recordset
var query = new ListQuery<DboWeatherLocation>(listRequest);
var result = await broker.ExecuteAsync<DboWeatherLocation>(query);
// Check we have success
Assert.True(result.Success);
// Check the recordset count
Assert.Equal(testCount, result.Items.Count());
// Check the total count os correct against the test provider
Assert.True(result.TotalItemCount == testRecordCount);
}
以及一个 Add 命令测试
[Fact]
public async void TestAddCQSDataBroker()
{
var provider = GetServiceProvider();
var broker = provider.GetService<ICQSDataBroker>()!;
var newRecord = _weatherTestDataProvider.GetForecast();
var id = newRecord!.Uid;
var command = new AddRecordCommand<DboWeatherForecast>(newRecord);
var result = await broker.ExecuteAsync(command);
var query = new RecordQuery<DboWeatherForecast>(id);
var checkResult = await broker.ExecuteAsync(query);
Assert.True(result.Success);
Assert.Equal(newRecord, checkResult.Record);
}
自定义请求
过滤列表
这可能是最常见的自定义请求。标准 ListQuery
使用 Dynamic Linq,因此您可以将查询构建为字符串以传递到查询中。但是,Dynamic Linq 效率不高,因此我更喜欢在大量使用它们的地方定义自定义查询。
所有此类查询都可以使用自定义的 BaseListQuery
。
我们的示例自定义查询根据 Location
过滤 WeatherForecast
。
查询
- 继承自
ListQueryBase
,将TRecord
固定为DvoWeatherForecast
- 定义一个
WeatherLocationId
属性 - 定义一个
static creator
方法
public record WeatherForecastListQuery
: ListQueryBase<DvoWeatherForecast>
{
public Guid WeatherLocationId { get; init; }
private WeatherForecastListQuery() { }
public static WeatherForecastListQuery GetQuery
(Guid weatherLocationId, ListProviderRequest<DvoWeatherForecast> request)
=> new WeatherForecastListQuery
{
StartIndex = request.StartIndex,
PageSize = request.PageSize,
SortExpressionString = request.SortExpressionString,
FilterExpressionString = request.FilterExpressionString,
WeatherLocationId = weatherLocationId,
};
}
Handler
这是基于与通用处理程序相同的模式构建的。
public class WeatherForecastListQueryHandler<TDbContext>
: IListQueryHandler<DvoWeatherForecast>
where TDbContext : DbContext
{
protected IEnumerable<DvoWeatherForecast> items =
Enumerable.Empty<DvoWeatherForecast>();
protected int count = 0;
protected IDbContextFactory<TDbContext> factory;
protected WeatherForecastListQuery listQuery = default!;
public WeatherForecastListQueryHandler(IDbContextFactory<TDbContext> factory)
{
this.factory = factory;
}
public async ValueTask<ListProviderResult<DvoWeatherForecast>>
ExecuteAsync(IListQuery<DvoWeatherForecast> query)
{
if (query is null || query is not WeatherForecastListQuery)
return new ListProviderResult<DvoWeatherForecast>
(new List<DvoWeatherForecast>(), 0, false, "No Query Defined");
listQuery = (WeatherForecastListQuery)query;
if (await this.GetItemsAsync())
await this.GetCountAsync();
return new ListProviderResult<DvoWeatherForecast>(this.items, this.count);
}
protected virtual async ValueTask<bool> GetItemsAsync()
{
var dbContext = this.factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
IQueryable<DvoWeatherForecast> query = dbContext.Set<DvoWeatherForecast>();
if (listQuery.WeatherLocationId != Guid.Empty)
query = query
.Where(item => item.WeatherLocationId == listQuery.WeatherLocationId)
.AsQueryable();
if (listQuery.SortExpressionString is not null)
query = query.OrderBy(listQuery.SortExpressionString);
if (listQuery.PageSize > 0)
query = query
.Skip(listQuery.StartIndex)
.Take(listQuery.PageSize);
if (query is IAsyncEnumerable<DvoWeatherForecast>)
this.items = await query.ToListAsync();
else
this.items = query.ToList();
return true;
}
protected virtual async ValueTask<bool> GetCountAsync()
{
var dbContext = this.factory.CreateDbContext();
dbContext.ChangeTracker.QueryTrackingBehavior =
QueryTrackingBehavior.NoTracking;
IQueryable<DvoWeatherForecast> query = dbContext.Set<DvoWeatherForecast>();
if (listQuery.WeatherLocationId != Guid.Empty)
query = query
.Where(item => item.WeatherLocationId == listQuery.WeatherLocationId)
.AsQueryable();
if (query is IAsyncEnumerable<DvoWeatherForecast>)
count = await query.CountAsync();
else
count = query.Count();
return true;
}
}
可以在 DI 中定义处理程序
services.AddScoped<IListQueryHandler<DvoWeatherForecast>,
WeatherForecastListQueryHandler<InMemoryWeatherDbContext>>();
代理
我们可以将方法添加到标准代理中以处理 IListQueryHandler<TRecord>
。请注意,使用此方法,我们每个数据类只能定义一个 IListQueryHandler
。
ICQSDataBroker
定义
public interface ICQSDataBroker
{
public ValueTask<ListProviderResult<TRecord>>
ExecuteAsync<TRecord>(IListQuery<TRecord> query) where TRecord : class, new();
}
以及 CQSDataBroker
中的实现
public async ValueTask<ListProviderResult<TRecord>>
ExecuteAsync<TRecord>(IListQuery<TRecord> query) where TRecord : class, new()
{
var queryType = query.GetType();
var handler = _serviceProvider.GetService<IListQueryHandler<TRecord>>();
if (handler == null)
throw new NullReferenceException
("No Handler service registered for the List Query");
return await handler.ExecuteAsync(query);
}
测试
更新 CQSBrokerTests
,添加自定义处理程序
private ServiceProvider GetServiceProvider()
{
// Creates a Service Collection
var services = new ServiceCollection();
// Adds the application services to the collection
Action<DbContextOptionsBuilder> dbOptions = options => options.UseInMemoryDatabase
($"WeatherDatabase-{Guid.NewGuid().ToString()}");
services.AddWeatherAppServerDataServices<InMemoryWeatherDbContext>(dbOptions);
services.AddSingleton<ICQSDataBroker, CQSDataBroker<InMemoryWeatherDbContext>>();
services.AddScoped<IListQueryHandler<DvoWeatherForecast>,
WeatherForecastListQueryHandler<InMemoryWeatherDbContext>>();
// Creates a Service Provider from the Services collection
// This is our DI container
var provider = services.BuildServiceProvider();
// Adds the test data to the in memory database
var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
WeatherTestDataProvider.Instance().LoadDbContext<InMemoryWeatherDbContext>(factory);
return provider!;
}
并添加一个测试
[Fact]
public async void TestCustomDvoWeatherForecastListCQSDataBroker()
{
var provider = GetServiceProvider();
var broker = provider.GetService<ICQSDataBroker>()!;
var locationId = _weatherTestDataProvider.WeatherLocations.First().Uid;
var testRecordCount = _weatherTestDataProvider.WeatherForecasts.Where
(item => item.WeatherLocationId == locationId).Count();
int pageSize = 10;
var testCount = testRecordCount > pageSize ? pageSize : testRecordCount;
var listRequest = new ListProviderRequest<DvoWeatherForecast>(0, pageSize);
var query = WeatherForecastListQuery.GetQuery(locationId, listRequest);
var result = await broker.ExecuteAsync<DvoWeatherForecast>(query);
Assert.True(result.Success);
Assert.Equal(testCount, result.Items.Count());
Assert.True(result.TotalItemCount == testRecordCount);
}
身份提供程序
这演示了一个完整的自定义实现。它从数据库身份表中获取包含 ClaimsIdentity
(身份验证系统的一部分)的结果。
供参考,数据库记录是
public record DboIdentity
{
[Key] public Guid Id { get; init; } = Guid.Empty;
public string Name { get; init; } = String.Empty;
public string Role { get; init; } = String.Empty;
}
结果
public class IdentityRequestResult
{
public ClaimsIdentity? Identity { get; init; } = null;
public bool Success { get; init; } = false;
public string Message { get; init; } = string.Empty;
public static IdentityRequestResult Failure(string message)
=> new IdentityRequestResult {Message = message };
public static IdentityRequestResult Successful
(ClaimsIdentity identity, string? message = null)
=> new IdentityRequestResult
{Identity = identity, Success=true, Message = message ?? string.Empty };
}
查询请求
public record IdentityQuery
: ICQSRequest<ValueTask<IdentityRequestResult>>
{
public Guid TransactionId { get; } = Guid.NewGuid();
public Guid IdentityId { get; init; } = Guid.Empty;
public static IdentityQuery Query(Guid Uid)
=> new IdentityQuery { IdentityId = Uid };
}
一个处理程序接口:我们可能需要服务器和 API 版本。
public interface IIdentityQueryHandler
: ICQSHandler<IdentityQuery, ValueTask<IdentityRequestResult>>
{}
以及处理程序
public class IdentityQueryHandler<TDbContext>
: ICQSHandler<IdentityQuery, ValueTask<IdentityRequestResult>>
where TDbContext : DbContext
{
protected IDbContextFactory<TDbContext> factory;
public IdentityQueryHandler(IDbContextFactory<TDbContext> factory)
=> this.factory = factory;
public async ValueTask<IdentityRequestResult> ExecuteAsync(IdentityQuery query)
{
var dbContext = this.factory.CreateDbContext();
IQueryable<DboIdentity> queryable = dbContext.Set<DboIdentity>();
if (queryable is not null)
{
var record = await queryable.SingleOrDefaultAsync
(item => item.Id == query.IdentityId);
if (record is not null)
{
var identity = new ClaimsIdentity(new[]
{
new Claim(ClaimTypes.Sid, record.Id.ToString()),
new Claim(ClaimTypes.Name, record.Name),
new Claim(ClaimTypes.Role, record.Role)
});
return IdentityRequestResult.Successful(identity);
}
return IdentityRequestResult.Failure("No Identity exists.");
}
return IdentityRequestResult.Failure("No Identity Records Found.");
}
}
以及演示测试
public class CQSCustomTests
{
private WeatherTestDataProvider _weatherTestDataProvider;
public CQSCustomTests()
// Creates an instance of the Test Data provider
=> _weatherTestDataProvider = WeatherTestDataProvider.Instance();
private ServiceProvider GetServiceProvider()
{
// Creates a Service Collection
var services = new ServiceCollection();
// Adds the application services to the collection
Action<DbContextOptionsBuilder> dbOptions = options =>
options.UseInMemoryDatabase($"WeatherDatabase-{Guid.NewGuid().ToString()}");
services.AddWeatherAppServerDataServices<InMemoryWeatherDbContext>(dbOptions);
services.AddScoped<IIdentityQueryHandler,
IdentityQueryHandler<InMemoryWeatherDbContext>>();
// Creates a Service Provider from the Services collection
// This is our DI container
var provider = services.BuildServiceProvider();
// Adds the test data to the in memory database
var factory = provider.GetService<IDbContextFactory<InMemoryWeatherDbContext>>();
WeatherTestDataProvider.Instance().LoadDbContext
<InMemoryWeatherDbContext>(factory);
return provider!;
}
[Fact]
public async void TestIdentityCQSDataBroker()
{
var provider = GetServiceProvider();
var broker = provider.GetService<IIdentityQueryHandler>()!;
var testRecord = _weatherTestDataProvider.Identities.Skip(1).First();
var query = IdentityQuery.GetQuery(testRecord.Id);
var result = await broker.ExecuteAsync(query);
Assert.True(result.Success);
Assert.NotNull(result.Identity);
Assert.Equal(testRecord.Name, result.Identity.Name);
}
}
摘要
希望我演示了一种不同、更简洁的实现 CQS 模式的方法。我现在是一个皈依者。
我故意没有实现事务日志记录或集中式异常处理。
附录
数据存储
本文和存储库的后端数据库是一个内存中的 Entity Framework 数据库。与模拟数据存储的其他方法相比,它的主要优点是它可以与 DbContext
工厂一起使用并支持多个上下文。我使用内存查询来模拟视图。
TestDataProvider
有一个方法可以将数据填充到 DbContext
中。
完整的 DbContext
如下所示
public class InMemoryWeatherDbContext
: DbContext
{
public DbSet<DboWeatherForecast> DboWeatherForecast { get; set; } = default!;
public DbSet<DvoWeatherForecast> DvoWeatherForecast { get; set; } = default!;
public DbSet<DboWeatherSummary> DboWeatherSummary { get; set; } = default!;
public DbSet<DboWeatherLocation> DboWeatherLocation { get; set; } = default!;
public DbSet<FkWeatherSummary> FkWeatherSummary { get; set; } = default!;
public DbSet<FkWeatherLocation> FkWeatherLocation { get; set; } = default!;
public DbSet<DboIdentity> DboIdentity { get; set; } = default!;
public InMemoryWeatherDbContext
(DbContextOptions<InMemoryWeatherDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<DboWeatherForecast>().ToTable("WeatherForecast");
modelBuilder.Entity<DboWeatherSummary>().ToTable("WeatherSummary");
modelBuilder.Entity<DboWeatherLocation>().ToTable("WeatherLocation");
modelBuilder.Entity<DboIdentity>().ToTable("Identity");
modelBuilder.Entity<DvoWeatherForecast>()
.ToInMemoryQuery(()
=> from f in this.DboWeatherForecast
join s in this.DboWeatherSummary! on
f.WeatherSummaryId equals s.Uid into fs
from fsjoin in fs
join l in this.DboWeatherLocation! on
f.WeatherLocationId equals l.Uid into fl
from fljoin in fl
select new DvoWeatherForecast
{
Uid = f.Uid,
WeatherSummaryId = f.WeatherSummaryId,
WeatherLocationId = f.WeatherLocationId,
Date = f.Date,
Summary = fsjoin.Summary,
Location = fljoin.Location,
TemperatureC = f.TemperatureC,
})
.HasKey(x => x.Uid);
modelBuilder.Entity<FkWeatherSummary>()
.ToInMemoryQuery(()
=> from s in this.DboWeatherSummary!
select new FkWeatherSummary
{
Id =s.Uid,
Name = s.Summary
})
.HasKey(x => x.Id);
modelBuilder.Entity<FkWeatherLocation>()
.ToInMemoryQuery(()
=> from l in this.DboWeatherLocation!
select new FkWeatherLocation
{
Id = l.Uid,
Name = l.Location
})
.HasKey(x => x.Id);
}
}
存储库中有一个自述文件,其中提供了测试数据设置的完整描述。
历史
- 2022 年 8 月 22 日:初始版本