65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 EntityFramework 5 进行异步数据库轮询

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2012年10月30日

CPOL

2分钟阅读

viewsIcon

35673

downloadIcon

458

如何使用 EntityFramework 5 异步轮询数据库以获取实体。

介绍 

我给自己布置了一个任务,开发一个基于 Entity Framework 的通用、异步轮询器,并满足以下要求:

  • 必须轮询 DbContext 中配置的特定实体类型
  • 必须接受 lambda 表达式作为轮询查询
  • 必须为连接失败实现指数退避重试模式
  • 必须异步运行且基于事件

背景  

我们中的许多人都不得不在代码中使用某种轮询机制,因为它通常需要监听更改/新条目,并在它们到达时执行某些操作。轮询器通常范围从带有 Thread.Sleep 的基本 while() 循环到复杂的异步观察者模式和 SqlDependency 代码...

我最近一直在大量使用 Entity Framework,并且我有一个要求,即轮询数据库中具有特定 StatusId 的表(实体)中的记录。 我决定编写一个通用轮询器,它非常易于使用,并允许开发人员指定一个 lambda 表达式来异步查询数据库:

使用代码 

要使用附加的 EntityPoller<T>,只需编写以下代码:

var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int) Status.New);

poller.ActiveCycle = 200; //milliseconds
poller.IdleCycle = 10000; //10 seconds
poller.RetryAttempts = 5;
poller.RetryInitialInterval = 10000; //10 seconds

poller.EntityReceived += OnNotification;
poller.Error += OnError;

poller.Start();

代码分解

轮询

首先,您可以在下面看到构造函数接受两个参数,第一个是您的 DbContext,它必须包含类型为 T 的实体集 (DbSet<>),第二个是返回 bool 的 lambda 表达式。

public sealed class EntityPoller<T>
        where T : class
{
    private readonly DbContext _context;
    private readonly Func<T, bool> _query;
    private readonly object _sync = new object();
    private bool _polling;

    public EntityPoller(DbContext context, Func<T, bool> query)
    {
        _context = context;
        _query = query;

        IdleCycle = 10000;
        ActiveCycle = 1000;
        RetryAttempts = 5;
        RetryInitialInterval = 10000;
    }

    public int IdleCycle { get; set; }
    public int ActiveCycle { get; set; }
    public int RetryAttempts { get; set; }
    public int RetryInitialInterval { get; set; }   

轮询包含一个主循环,该循环将持续运行,直到返回一个实体。对于循环的每个周期,都有一个暂停 (Thread.Sleep),暂停的持续时间取决于定义的查询结果。如果查询返回一个实体,则暂停使用 ActiveCycle,否则它将使用 IdleCycle。 这是通过设计来实现的,以允许以受控方式从对象中发出实体通知事件。 如果您希望尽快发出新实体的事件,请将 ActiveCycle 设置为 0。

private T Poll()
{
    var set = _context.Set<T>();
    T entity = null;

    try
    {

        while (_polling)
        {
            entity = Retry(() => set.FirstOrDefault(_query),RetryAttempts,RetryInitialInterval);
            if (entity != null) break;

            Thread.Sleep(IdleCycle);
        }

        Thread.Sleep(ActiveCycle);

    }
    catch (Exception ex)
    {
        Stop();

        if (Error != null)
            Error.Invoke(ex);
    }

    return entity;
}

轮询方法通过使用 AsyncWorker 委托和 AsyncOperation 来调用。 我不会在本文中介绍异步代码,但所有代码都包含在源中!

重试模式

您可能已经在上面的代码片段中注意到,查询 Func<t,bool> 包装在一个 Retry 方法中。 Retry 模式将运行查询 RetryAttempts 次,从 RetryInitialInterval 毫秒开始,并以 5 的幂指数增加它。这只是我用作默认值,但您可能希望更改此逻辑以满足您的需求。 Retry 函数如下

private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000)
{
    if (action == null)
        throw new ArgumentNullException("action");


    for (int i = 1; i <= attempts; i++)
    {
        try
        {
            T result = action.Invoke();
            return result;
        }
        catch (Exception)
        {
            if (i >= attempts) throw;
            Thread.Sleep(initialInterval);
        }

        initialInterval *= 5;
    }

    return null;
}

就是这样! 它对我很有效。 如果有人使用它并想发表评论或提出问题/建议,请随时提出。

感谢阅读。

© . All rights reserved.