65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Taskling 进行批处理模式

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2016年11月11日

MIT

10分钟阅读

viewsIcon

34541

downloadIcon

1

在本文中,我们将探讨批量处理的常见模式,以及 Taskling 库如何为您的 C# 批处理作业提供一种简单可靠的使用这些模式的方法。

引言

构建批处理流程有一些常见模式

  • 实例并发控制 - 例如,单例进程
  • 将数据分区为更小的批次
  • 通过从失败的实例中断处继续处理来从故障中恢复
  • 维护/记录实例的状态和处理数据的状态
  • 出现问题时发出警报

Taskling 是一组两个库,它们提供了这些模式,并将 SQL Server 用作后端存储。它与主机无关,可用于 Web 应用程序、Azure 作业、控制台应用程序等。

我们将通过 Taskling 的一些示例代码来探讨每种模式。

背景

为了说明如何使用本文所述的模式与 Taskling 一起使用,我们需要首先讨论配置以及如何实例化 ITasklingClient

许多行为由配置控制,使用 Taskling,您必须创建一个实现 IConfigurationReader 的类,该类仅返回一个具有 KEY[value] 格式的键值对字符串。我们将在下面的模式中讨论每个设置。

public class MyConfigReader : IConfigurationReader
{
    public string GetTaskConfigurationString(string applicationName, string taskName)
    {
        var key = applicationName + "::" + taskName;
        return ConfigurationManager.AppSettings.Get(key);
    }
}

所有任务都由应用程序名称和任务名称唯一标识。在上面的示例中,它从应用程序配置中获取配置字符串。

<appSettings>
  <add key="MyApplication::MyTask" value="DB[Server=(local);Database=MyAppDb;Trusted_Connection=True;] TO[120] E[true] CON[-1] KPLT[2] KPDT[40] MCI[1] KA[true] KAINT[1] KADT[10] TPDT[0] RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] MXBL[20]"/>
</appSettings>

我们将查看与模式无关的设置

  • DB 是连接字符串
  • TO 是命令超时(秒)
  • E 是启用 1,禁用 0

我们将在下面逐一介绍其余设置的含义。

通过将您的配置读取器实现传递给其构造函数来实例化 ITasklingClient

ITasklingClient tasklingClient = new TasklingClient(new MyConfigReader());

或通过依赖注入(AutoFac 示例)

builder.Register<TasklingClient>(x => new TasklingClient(new MyConfigReader())).As<ITasklingClient>();

使用代码

在您的批处理流程所在的主类中,我们需要实例化一个新的 ITaskExecutionContext,它将负责进行所有状态管理、日志记录以及创建子上下文以将数据分区为块。

using (var executionContext = _tasklingClient.CreateTaskExecutionContext("MyApplication", "MyTask"))
{
    if(executionContext.TryStart())
    {
        // batch processing logic
    }
}

我们现在已准备好开始研究这些模式。

模式 #1 - 实例并发限制,例如单例

有些批处理任务需要是单例的。如果您每小时运行一次任务,但该任务的运行时间可能超过一小时,那么您可能会遇到两个执行实例同时运行的情况。

除了单例之外,限制并发执行的数量以防止其他组件过载可能很有用。也许您有大量数据需要处理,因此您每分钟运行一次任务,每次执行需要十分钟,您将有十个并发执行。当您的架构中的某个组件(Web 服务、数据库等)无法处理十个并发执行的负载时,您可以将并发限制设置为 5。当组件显示过载迹象时,您可以实时简单地减少并发限制,然后在稍后再次增加它。

Taskling 的并发限制跨服务器工作。设置键 CON 设置限制。-1 表示无限制,任何高于该数字的值都将是限制。因此,对于单例,请设置 CON[1]。

我们实例化 ITaskExecutionContext 并调用其 TryStart。如果限制已达到,TryStart() 将返回 false。对该上下文的任何进一步调用都将失败,因此我们将调用包装在 if 语句中。

if(executionContext.TryStart()) 
{ 
    // batch processing logic 
}

Taskling 如何保证并发控制本身就是一个有趣的话题。Taskling 利用 SQL Server 的行锁定和阻塞,在多线程甚至多服务器环境中创建单线程场景。

模式 #2 - 将数据分区为更小的批次

Taskling 可以将数据分区为四种类型的块

  • 范围块
    • 日期范围块
    • 数字范围块
  • 列表块
  • 对象块

所有这些的共同点是我们需要在块之间进行隔离,也就是说,数据不重叠。Taskling 保证块隔离,如果

  • 您将块创建逻辑包装在临界区中
  • 您不将重复数据传递给 Taskling

您可以通过将代码包装如下来创建保证单线程(甚至跨服务器)的代码段

using (var cs = taskExecutionContext.CreateCriticalSection())
{
    if(cs.TryStart())
    {
        // code that needs to be single-threaded (even acorss servers)
    }
}

Taskling 临界区使用与主任务并发控制相同的方法进行并发控制。它将重试 20 秒,重试 2 次,如果仍然无法进入,则 TryStart 将返回 false;此等待时间可以在其中一个重载中更改。

带日期范围块的范围块示例

范围块不存储任何数据,只存储日期或数字范围。然后可以使用此范围检索和处理数据。

有些进程会持续处理日期之间的数据。每次批处理运行时,它可能会检查已处理到哪个日期,然后使用该日期作为开始日期,并将当前时间作为结束日期。

  • 使用 Taskling,您可以获取最后一个块的结束日期
  • 使用临界区来保证并发限制大于 1 的任务的块隔离
private void RunTask(ITaskExecutionContext taskExecutionContext)
{
    try
    {
        var dateRangeBlocks = GetDateRangeBlocks(taskExecutionContext);
        foreach (var block in dateRangeBlocks)
            ProcessBlock(block);

        taskExecutionContext.Complete();
    }
    catch(Exception ex)
    {
        taskExecutionContext.Error(ex.ToString(), true);
    }
}
// -- Critical Section --
// This method uses a critical section to protect the data identification phase of the task. If two tasks execute
// at the same time then the code inside the critical section can only be executed by one task at a time and
// there is no chance for identifying the same data in both tasks
private IList<IDateRangeBlockContext> GetDateRangeBlocks(ITaskExecutionContext taskExecutionContext)
{
    using (var cs = taskExecutionContext.CreateCriticalSection())
    {
        if(cs.TryStart())
        {
            var startDate = GetDateRangeStartDate(taskExecutionContext);
            var endDate = DateTime.UtcNow;

            return taskExecutionContext.GetDateRangeBlocks(x => x.WithRange(startDate, endDate, TimeSpan.FromMinutes(30)));
        }
        throw new Exception("Could not acquire a critical section, aborted task");
    }
}

// -- Last Block --
// The previous block is used to identify the datetime of the start of the next range of data
// The first time this task is ever run there is no previous block, so we use a configured timespan from the
// current datetime instead
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
    var lastBlock = taskExecutionContext.GetLastDateRangeBlock(LastBlockOrder.LastCreated);
    if (lastBlock == null)
        return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
    else
        return lastBlock.EndDate;
}

在上面的示例中,Taskling 接受一个日期范围和一个 TimeSpan 作为最大块大小,并返回一个日期范围块列表(IDateRangeBlockContext)。因此,如果日期范围涵盖 24 小时,我们将最大块大小指定为 TimeSpan.FromHours(1),我们将得到 24 个块。

一旦我们有了块,我们就处理每一个。在此示例中,我们检索每个块日期之间的行程,计算旅行洞察并将它们持久化。

  • 处理开始前会调用 Start()
  • 处理完成后会调用 Complete()。您可以选择将处理的记录数作为参数传递
  • 块的开始和结束日期用于从数据库检索行程
  • 所有代码都包装在 try catch 块中,在 catch 块中会调用 Failed(string errorMessage),并且不会再次抛出异常,以允许后续块执行
private void ProcessBlock(IDateRangeBlockContext blockContext)
{
    try
    {
        blockContext.Start();

        var travelDataItems = _travelDataService.GetJourneys(blockContext.DateRangeBlock.StartDate, blockContext.DateRangeBlock.EndDate);
        var travelInsights = new List<TravelInsight>();
        // my insights processing logic
        //...
        //...
        _travelInsightsService.Add(travelInsights);

        int itemCountProcessed = travelInsights.Count;
        blockContext.Complete(itemCountProcessed);
    }
    catch(Exception ex)
    {
        blockContext.Failed(ex.ToString());
    }
}

数字块基本上是相同的。您将开始数字、结束数字和最大块大小传递给 Taskling。

int maxBlockSize = 500;
return taskExecutionContext.GetNumericRangeBlocks(x => x.WithRange(startNumber, endNumber, maxBlockSize));

因此,如果我们传递数字 1 和 1000,最大块大小为 100,则将返回 10 个 INumericBlockContexts,您可以使用它们来处理这些范围之间的数据。

您可以在以下链接中找到数字块的示例代码

https://github.com/Vanlightly/Taskling.NET/wiki/Task-with-Numeric-Range-Blocks-Example-Code

列表块

列表块实际上将数据以 JSON 格式存储在 SQL Server 中。有两种列表块上下文

  • IListBlockContext<TItem>
  • IListBlockContext<TItem,THeader>

TItem 是列表项类型的泛型类型。THeader 是可以存储与块相关的数据的泛型类型。

让我们来看一个生成 IListBlockContext<TItem,THeader> 块的示例。在此示例中,我们按日期范围检索数据,但将要处理的数据存储在列表块中。

// Code snippet: Unformatted as this coce snippet does not format correctly as a code block for some reason

public class BatchDatesHeader
{
    public DateTime FromDate { get; set; }
    public DateTime ToDate { get; set; }
}

public class Journey
{
    public long JourneyId { get; set; }
    public string DepartureStation { get; set; }
    public string ArrivalStation { get; set; }
    public DateTime TravelDate { get; set; }
    public string PassengerName { get; set; }
}

// end of code snippet

BatchDatesHeader 将是我们的头部类,Journey 将是我们的列表项类。

请注意,ItemStatus.Pending, ItemStatus.Failed 与从故障中恢复和重新处理先前失败的块有关。我们将在下一个模式中更详细地介绍这一点。

在此示例中,我们检索自上次作业运行以来所有行程,将它们分区到列表块中,然后处理每个列表块。对于每个列表块,我们将单独处理每个行程,提取旅行洞察并通知用户有关该洞察。

private void RunTask(ITaskExecutionContext taskExecutionContext)
{
    try
    {
        var listBlocks = GetListBlocks(taskExecutionContext);
        foreach (var block in listBlocks)
            ProcessBlock(block);

        taskExecutionContext.Complete();
    }
    catch (Exception ex)
    {
        taskExecutionContext.Error(ex.ToString(), true);
    }
}

private IList<IListBlockContext<Journey, BatchDatesHeader>> GetListBlocks(ITaskExecutionContext taskExecutionContext)
{
    using (var cs = taskExecutionContext.CreateCriticalSection())
    {
        if (cs.TryStart())
        {
            var startDate = GetDateRangeStartDate(taskExecutionContext);
            var endDate = DateTime.UtcNow;

            var journeys = _travelDataService.GetJourneys(startDate, endDate).ToList();
            var batchHeader = new BatchDatesHeader()
            {
                FromDate = startDate,
                ToDate = endDate
            };

            short blockSize = 500;
            return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));
        }
        throw new Exception("Could not acquire a critical section, aborted task");
    }
}

// Header used to identify the date range to be processed
// This is just an example, there are many ways to identify the data to be processed
private DateTime GetDateRangeStartDate(ITaskExecutionContext taskExecutionContext)
{
    var lastBlock = taskExecutionContext.GetLastListBlock<Journey, BatchDatesHeader>();
    if (lastBlock == null)
        return DateTime.UtcNow.Add(_configuration.FirstRunTimeSpan);
    else
        return lastBlock.Header.ToDate;
}


private void ProcessBlock(IListBlockContext<Journey, BatchDatesHeader> blockContext)
{
    try
    {
        blockContext.Start();

        foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed))
            ProcessJourney(journeyItem);

        blockContext.Complete();
    }
    catch (Exception ex)
    {
        blockContext.Failed(ex.ToString());
    }
}

// -- Error handling --
// All code is wrapped in a try catch block, in the catch Failed(string errorMessage) is called and the exception
// is not thrown again in order to allow subsequent list block items to execute
// -- Discard --
// Often a data item does not meet a business rule and should be ignored. Taskling allows you to mark items as  discarded
// Items either get processed successfully, they fail or they get discarded.
private void ProcessJourney(IListBlockItem<Journey> journeyItem)
{
    try
    {
        if (journeyItem.Value.DepartureStation.Equals(journeyItem.Value.ArrivalStation))
        {
            journeyItem.Discarded("Discarded due to distance rule");
        }
        else
        {
            var insight = ExtractInsight(journeyItem.Value);
            _notificationService.NotifyUser(insight);
            journeyItem.Completed();
        }
    }
    catch(Exception ex)
    {
        journeyItem.Failed(ex.ToString());
    }
}

private TravelInsight ExtractInsight(Journey jounery)
{
    TravelInsight insight = new TravelInsight();
    //

    return insight;
}

一些有趣的事情需要注意

  • 每个列表项都有自己的状态。
  • 对于由于某些业务规则需要忽略的任何数据项,您可以将其标记为已丢弃并附加原因。
  • 如果您的数据太大而无法存储在块中,您可以在列表块中存储数据标识符,然后在处理每个项目时检索数据。

模式 #3 - 故障恢复

有时应用程序可能会失败,Azure 作业可能崩溃,或者 ASP.NET/WCF 进程可能被回收。或者一个 bug 可能导致批处理进程失败并停止。Taskling 提供了一种从中断处继续处理的方法。

当您向 ITaskExecutionContext 请求块时,无论是日期范围、列表还是其他,它都会返回您传递给它的数据作为块。您可以配置 Taskling 以也返回先前失败的块。

如果您使用以下方式配置您的 ITasklingClient

  • RPC_FAIL[true] RPC_FAIL_MTS[600] RPC_FAIL_RTYL[3] 您告诉 Taskling 查找在过去 600 分钟内创建的失败块,并且重试次数不超过 3 次。
  • RPC_DEAD[true] RPC_DEAD_MTS[600] RPC_DEAD_RTYL[3] 您告诉 Taskling 查找在过去 600 分钟内创建的死块,并且重试次数不超过 3 次。

那么当您调用 GetListBlocks 时,它将在一个列表中返回新块和旧块。但您可能会问,什么是死块?死任务/块是指发生灾难性故障,以至于无法注册其消亡的任务/块。例如,有人拔掉了服务器的电源线,或者 IIS 使用 ThreadAbortException 终止了您的线程。在这种情况下,状态仍然是“进行中”,或者对于已创建但尚未开始的块,即使它已死亡,其状态仍将保持“待定”。

Taskling 使用心跳(keep alive)来注册它仍然存活的事实。一旦超过配置的最后一次心跳时间而状态仍然是“进行中”或“待定”,Taskling 就知道它真的死了。

为了配置心跳,我们设置 KA[true] KAINT[1] KADT[10],这意味着使用心跳,每 1 分钟发送一次,并在 10 分钟内未收到任何心跳时将其视为死亡。

让我们使用最后一个示例来说明这一点。在最后一个示例中,我们创建了许多包含行程的列表块。

当启用了对失败和死块的重新处理时,以下调用可能会返回先前失败的块。

return taskExecutionContext.GetListBlocks<Journey, BatchDatesHeader>(x => x.WithPeriodicCommit(journeys, batchHeader, blockSize, BatchSize.Fifty));

如果您不想再次发送用户通知给已成功处理的行程,那么在迭代项目时,我们只请求“待定”和“失败”的项目。当我们处理一个块时,它可能是新数据,也可能是失败的旧块。如果是新的,那么所有项目都将处于“待定”状态。

foreach (var journeyItem in blockContext.GetItems(ItemStatus.Pending, ItemStatus.Failed)) 
    ProcessJourney(journeyItem);

一旦块达到重试限制或其创建日期超过本示例中配置的 600 分钟,它将不再重试。

模式 #4 - 警报

Taskling 中的所有数据都存储在七个表中,您可以将这些表部署到中央数据库服务器,也可以部署到使用 Taskling 的每个应用程序数据库。因为 Taskling 维护所有状态信息、块数据和每个任务执行使用的配置,所以我们可以创建可用于实时警报的 SQL 查询。

例如

  • 达到重试限制的失败/死块
  • 失败的任务
  • 在 Y 分钟内失败 X 次的任务
  • 连续失败 X 次的任务
  • 过去 X 分钟内未运行的任务

GitHub Wiki 页面上有示例 SQL

https://github.com/Vanlightly/Taskling.NET/wiki/Alerts-Database-Scripts

结论

Taskling 是一个用于实现常见批处理模式的有用库。它是开源的。您只需要包含 Taskling 和 Taskling.SqlServer nuget 包并运行表创建脚本即可开始。

如需更多阅读材料,请查看 GitHub 页面上的 GitHub Wiki

https://github.com/Vanlightly/Taskling.NET

示例代码

TasklingTester 解决方案随本文一起提供,其中包含我们涵盖的源代码。它还包括

  • 生成 Taskling 表以及 Journey 和 TravelInsight 表所需的 SQL 脚本
  • 一个模拟新数据进入的数据生成脚本
  • 一些有用的查询来查看生成 Taskling 数据

首先阅读解决方案中的自述文件以获取说明。基本上,您需要做的就是运行一个脚本,然后运行应用程序,您就可以看到它如何工作。

© . All rights reserved.