使用 EntityFramework 5 进行异步数据库轮询





5.00/5 (4投票s)
如何使用 EntityFramework 5 异步轮询数据库以获取实体。
介绍
我给自己布置了一个任务,开发一个基于 Entity Framework 的通用、异步轮询器,并满足以下要求:
- 必须轮询 DbContext 中配置的特定实体类型
- 必须接受 lambda 表达式作为轮询查询
- 必须为连接失败实现指数退避重试模式
- 必须异步运行且基于事件
背景
我们中的许多人都不得不在代码中使用某种轮询机制,因为它通常需要监听更改/新条目,并在它们到达时执行某些操作。轮询器通常范围从带有 Thread.Sleep 的基本 while() 循环到复杂的异步观察者模式和 SqlDependency 代码...
我最近一直在大量使用 Entity Framework,并且我有一个要求,即轮询数据库中具有特定 StatusId 的表(实体)中的记录。 我决定编写一个通用轮询器,它非常易于使用,并允许开发人员指定一个 lambda 表达式来异步查询数据库:
使用代码
要使用附加的 EntityPoller<T>
,只需编写以下代码:
var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int) Status.New);
poller.ActiveCycle = 200; //milliseconds
poller.IdleCycle = 10000; //10 seconds
poller.RetryAttempts = 5;
poller.RetryInitialInterval = 10000; //10 seconds
poller.EntityReceived += OnNotification;
poller.Error += OnError;
poller.Start();
代码分解
轮询
首先,您可以在下面看到构造函数接受两个参数,第一个是您的 DbContext
,它必须包含类型为 T
的实体集 (DbSet<>
),第二个是返回 bool 的 lambda 表达式。
public sealed class EntityPoller<T>
where T : class
{
private readonly DbContext _context;
private readonly Func<T, bool> _query;
private readonly object _sync = new object();
private bool _polling;
public EntityPoller(DbContext context, Func<T, bool> query)
{
_context = context;
_query = query;
IdleCycle = 10000;
ActiveCycle = 1000;
RetryAttempts = 5;
RetryInitialInterval = 10000;
}
public int IdleCycle { get; set; }
public int ActiveCycle { get; set; }
public int RetryAttempts { get; set; }
public int RetryInitialInterval { get; set; }
轮询包含一个主循环,该循环将持续运行,直到返回一个实体。对于循环的每个周期,都有一个暂停 (Thread.Sleep
),暂停的持续时间取决于定义的查询结果。如果查询返回一个实体,则暂停使用 ActiveCycle
,否则它将使用 IdleCycle
。 这是通过设计来实现的,以允许以受控方式从对象中发出实体通知事件。 如果您希望尽快发出新实体的事件,请将 ActiveCycle
设置为 0。
private T Poll()
{
var set = _context.Set<T>();
T entity = null;
try
{
while (_polling)
{
entity = Retry(() => set.FirstOrDefault(_query),RetryAttempts,RetryInitialInterval);
if (entity != null) break;
Thread.Sleep(IdleCycle);
}
Thread.Sleep(ActiveCycle);
}
catch (Exception ex)
{
Stop();
if (Error != null)
Error.Invoke(ex);
}
return entity;
}
轮询方法通过使用 AsyncWorker
委托和 AsyncOperation
来调用。 我不会在本文中介绍异步代码,但所有代码都包含在源中!
重试模式
您可能已经在上面的代码片段中注意到,查询 Func<t,bool>
包装在一个 Retry 方法中。 Retry 模式将运行查询 RetryAttempts
次,从 RetryInitialInterval
毫秒开始,并以 5 的幂指数增加它。这只是我用作默认值,但您可能希望更改此逻辑以满足您的需求。 Retry 函数如下
private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000)
{
if (action == null)
throw new ArgumentNullException("action");
for (int i = 1; i <= attempts; i++)
{
try
{
T result = action.Invoke();
return result;
}
catch (Exception)
{
if (i >= attempts) throw;
Thread.Sleep(initialInterval);
}
initialInterval *= 5;
}
return null;
}
就是这样! 它对我很有效。 如果有人使用它并想发表评论或提出问题/建议,请随时提出。
感谢阅读。