65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Task C# 的责任链和策略模式

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (14投票s)

2017 年 4 月 20 日

CPOL

7分钟阅读

viewsIcon

33384

downloadIcon

682

使用 Task 类并实现请求批量处理的简单设计理念

 

引言

有许多场景需要批量处理。例如处理批量产品订单或处理批量请求。本文不侧重于性能有效的处理,而是提供了一个简单的处理实现。一个过程可以定义为包含许多小任务的工作流。我将描述一个简单的烹饪过程工作流,以及我如何应用遵循责任链和策略设计模式的 Task 类。

背景

我曾在一个 Windows 服务上工作,该服务处理批量安装订单请求。在我了解 C# Task 类之前,我创建了一些自己的 Process 和 Task 抽象类,然后使用 Thread 和 ThreadPool 类来执行我的任务。(有机会我会尝试发布基于这个旧设计的代码。)当我偶然发现 Sacha Barber 的文章《任务并行库:1 of n 系列》时,我对 Task 类产生了兴趣,并开始按照我在此处介绍的一些设计模式使用它。

问题陈述

让我们以“烹饪过程”为例工作流来实现。我们都知道烹饪过程涉及多个任务,例如“购买蔬菜、切菜、清洗器皿、准备食材、烹饪”等。其中一些任务依赖于其他任务,例如切菜需要先购买蔬菜。一些任务可以并行发生,例如准备食材和购买蔬菜,它们彼此不依赖。这些任务中的每一个都将输入转换为或处理为某种输出。就像任何将原材料加工成产品的工厂一样。例如,清洗器皿将脏器皿变成干净的器皿。切菜将提供切碎的蔬菜以便于烹饪。由于这些任务中的每一个都有其对食材、蔬菜、器皿等输入的责任。这些任务本身并不能制作出任何菜肴。我们必须将所有这些任务链接成一个责任链来制作菜肴。

每个人都会很高兴每天吃到不同的菜肴。所以我们的烹饪过程不应该总是制作相同的菜肴。我们需要有不同的食谱,比如法式杂烩(我从没吃过)、土豆花椰菜(是的,我喜欢)等等。所以我们必须为不同的食谱购买不同的蔬菜,因此购买蔬菜任务需要 N 种食谱的 N 种算法。清洗器皿或准备香料也是如此。我们需要遵循一些好的策略算法,不仅要解决问题,还要做出美味的菜肴。

责任链和策略模式

责任链将请求沿着处理程序链传递。购买任务可以被视为这样的处理程序,其职责是为给定食谱购买所需的蔬菜。蔬菜被传递给链中的下一个处理程序,其职责是切菜。因此,形成这个链对于创建工作流很重要。这样,客户端(饥饿的人)就不必费心菜肴是如何制作的。他们所做的只是点菜并等待菜肴。

System.Threading.Tasks 命名空间提供了一个很好的框架来创建任务并以多种不同方式执行它们。这个库本身负责为我们从线程池创建线程。还为我们提供了将任务一个接一个链接起来的选项,这正是我们烹饪过程所需要的。

购买蔬菜、准备食材和清洗器皿可以异步完成。因此,我们可以使用 Task.Factory.startNew() 或 Task.Run() 来创建异步执行的任务。

//start the tasks
Task<string[]> buyVegiesTask = Task.Run(() => buyVegies(input.recipe));            
Task<string[]> cleanVesselsTask = Task.Run(() => cleanVessels(input.recipe));
Task<string[]> prepareIngredientsTask = Task.Run(() => prepareIngredients(input.recipe));

buyVegies、cleanVessels 和 prepareMassala 是接受 Recipe 作为参数的静态方法。这些操作其参数的静态方法对于线程安全工作很有用。这些就像 CookingProcess 类中创建的实用方法。

现在我们需要切菜,这是购买蔬菜之后的链式任务。

 //cut vegetables            
 Task<String> cutVegeTask = buyVegiesTask.ContinueWith<string>(t =>
 {
      //Collect result
      input.Vegetables = t.Result;

      //Make a copy for cut task, so input is not affected
      string[] vegetables = new string[input.Vegetables.Length];
      input.Vegetables.CopyTo(vegetables, 0);

      return Retry(() =>
      {   //Passing bought vegetables to cut task
           return cutVegies(vegetables, input.recipe);

       }, 3);
 },TaskContinuationOptions.OnlyOnRanToCompletion);

为切菜任务复制一份蔬菜,以确保 buyVegiesTask 的结果安全。如果您认为蔬菜只用于切菜任务,并且对其他任务不再重要,则不需要这样做。

buyVegiesTask.ContinueWith 将确保切割只在购买之后发生。Enum TaskContinuationOptions 提供了多种链接选项,例如 OnlyOnCanceled、OnlyOnFaulted、NotOnCanceled、NotOnFaulted 等。在这些选项中,我们只需要在购买蔬菜完成时执行切割。因此使用了 OnlyOnRanToCompletion。

烹饪任务必须等待所有任务,例如购买蔬菜、切菜、准备食材。Task.WaitAll 可用于使烹饪任务等待直到其他任务完成。

Task.WaitAll(new Task[] { buyVegiesTask, cleanVesselsTask, prepareIngredientsTask, cutVegeTask });

烹饪完成后,可以按顺序装饰菜肴并上菜。

Task<string[]> decorateTask = cookTask.ContinueWith((t) => {  return decorate(input.recipe);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
//serve
Task<string> serveTask = decorateTask.ContinueWith((t) => {  
     ...

     return serve(description, input.recipe);

}, TaskContinuationOptions.OnlyOnRanToCompletion);

现在,为了用不同的菜肴满足许多人,我们可能需要烹饪不止一种食谱。因此,我们的任务,如购买蔬菜、切蔬菜、清洗器皿等,将根据食谱遵循不同的算法。例如,土豆食谱可能需要购买土豆、洋葱等蔬菜。而西兰花食谱可能需要购买西兰花。我们需要为每个食谱的每个任务收集更多的算法。

在 Recipes 类中将 Recipe 定义为 Enum。

public enum Recipe
{
        Ratatouille = 0,
        Aloo_gobi = 1
}

为每个任务创建一个字典(这可能是一本食谱),其中包含每种食谱的购买、切割、准备的不同方式。

public delegate string[] PrepareMasala();
public static Dictionary<Recipe, PrepareMasala> PrepareSpiceIngredientsAlgorithms = new Dictionary<Recipe, PrepareMasala>();

public delegate string[] buyVegie();
public static Dictionary<Recipe, buyVegie> buyVegieAlgorithms = new Dictionary<Recipe, buyVegie>();

public delegate string[] cleanVessel();
public static Dictionary<Recipe, cleanVessel> cleanVesselAlgorithms = new Dictionary<Recipe, cleanVessel>();

public delegate string[] decorate();
public static Dictionary<Recipe, decorate> decorateAlgorithms = new Dictionary<Recipe, decorate>();

这是一个开放式字典,可以接受任何新的食谱发明。这个想法基于策略模式,以便在运行时为给定的食谱选择购买或装饰算法。

Recipes.buyVegieAlgorithms.Add(Recipe.Ratatouille, () => new string[4] { "Ginger", "Onion", "zucchini", "eggplant" });
Recipes.buyVegieAlgorithms.Add(Recipe.Aloo_gobi, () => new string[4] { "Potatoes", "Cauliflower", "Onion", "Ginger" }); 

buyVegies 任务方法根据食谱输入选择算法。

private static string[] buyVegies(Recipe recipe)
{
    // Select vegetables based on recipe
        string[] vegetables = Recipes.buyVegieAlgorithms[recipe]();

    ...
}

重试和异常处理

切菜任务实现了重试机制。假设有人切到手指,并且可以管理地重试他们正在做的任务。对于这种情况,会引发 TaskRetryableException,它提供了一种记录消息和重试任务的方法。我定义了一个布尔变量来模拟这种异常情况。另请注意,Ratatouille 是硬编码的,仅用于测试异常处理。

static bool simulate_exception = true;
private static string cutVegies(string[] vegetables, Recipe recipe)
{    ...          

    if (recipe == Recipe.Ratatouille && simulate_exception)
        {
        ...
                TaskRetryableException exception = new TaskRetryableException(badMsg + "... But I can manage");
                simulate_exception = false;

                //TaskException exception = new TaskException(badMsg);
                throw exception;
    }
}

假设有人严重切到手指,他们无法执行任务。对于这种情况,会引发 TaskException。

Retry 就像实际任务方法的委托方法。Retry 检查重试计数器以确定最大尝试次数,并将调用委托给实际方法。

public static T Retry<T>(Func<T> actualWork, int retryCount)
{
        int maxAllowedRetry = retryCount;
        while (true)
	{
		try
                {
                    return actualWork();
                }
                catch (TaskRetryableException e)
                {
                   
                    if (retryCount == 0 )
                    {
                        throw new   TaskException ( "Maximum count of retry attempts reached " );
		    }
		...

来自 Panagiotis Kanavos 在 http://stackoverflow.com/a/10494424/11635 中的精彩回答

在任务执行期间发生的所有异常都添加到 AggrerateException 的 InnerExceptions 属性中。因此 TaskException 也将被添加到 AggregateException 中。处理此异常的 catch 块显示在下面的代码片段中。

测试厨房

CookingInput 类顾名思义是烹饪过程中每个任务的输入。它将存储食谱、蔬菜数组、器皿数组、食材数组等。

创建了一个 CookingInput 数组,以批量完成多个烹饪任务。数组中的每个输入元素代表烹饪请求。为每个输入元素创建一个任务以并行开始烹饪。因此,如果数组中有 2 个输入,则会创建 2 个任务并行工作。

CookingInput[] inputs = new CookingInput[2];
inputs[0] = new CookingInput(Recipe.Aloo_gobi);
inputs[1] = new CookingInput(Recipe.Ratatouille);
...
int count  = inputs.Count(s => s != null);
...
Task[] cookingTasks = new Task[count];
try
{
    for (int i=0; i< count; i++)
    {
        int index = i;
        //Start cooking for each request in a thread parallely
        cookingTasks[i] = Task.Run(() => CookingProcess.start(inputs[index]));                   
    }
    Task.WaitAll(cookingTasks);                
}
catch (AggregateException ae)
{
    Console.WriteLine();
    Console.WriteLine("Base exception: " + ae.GetBaseException());                                
}  ...

AggregateException 的 GetBaseException() 方法提供导致 AggregateExceptions 的实际异常。因此我们从 GetBaseException() 方法获取 TaskException。

请注意,定义了同步和异步执行的任务。虽然 CookingInput 在任务方法内部是线程安全的,但在任务外部,我们需要注意状态,因为许多线程可能在此输入上操作。

示例输出

Async 和 await

async 和 await 是一个很酷的功能,可以节省大量代码、线程和时间。下面的代码片段显示了 CookingProcess start 方法的重写。请查看 KitchenAsync.zip,了解使用 async 和 await 的相同实现。

            //start the tasks
            Task<string[]> buyVegTask = buyVegiesAsync(input.recipe);
            Task<string[]> cleanVesselTask = cleanVesselsAsync(input.recipe);
            Task<string[]> prepareIngredientsTask = prepareIngredientsAsync(input.recipe);

            //Passing bought vegetables to cut task
            string[] vegetables = await buyVegTask;
            //Make a copy for cut task, so input is not affected
            input.Vegetables = new string[vegetables.Length];
            vegetables.CopyTo(input.Vegetables, 0);
            Task<string> cutVegeTask = RetryAsync(()=> cutVegiesAsync(vegetables, input.recipe),3);            

            //Wait for all the tasks to finish
            await Task.WhenAll(buyVegTask, cleanVesselTask, prepareIngredientsTask, cutVegeTask);
            //collect task's results
            input.ingrdients = prepareIngredientsTask.Result;
            input.Vessels = prepareIngredientsTask.Result;

            //Give message and dont await for any response... as cooking has to progress
            Task msg1 = Task.Run(() => giveMessageToHungryFolks(cutVegeTask.Result));

            //cook and await for food to further proceed- decorate and serve
            string result = await cookAsync(input.recipe);
            Task msg2 = Task.Run(() => giveMessageToHungryFolks(result));

            //decorate and await for decorates
            input.decorates = await decorateAsync(input.recipe);

            //prepare serve description
            string description = String.Format("The plate of cooked [{0}] in a mix with good flavor of [{1}], decorated with [{2}] ",
                string.Join(",", input.Vegetables), string.Join(",", input.ingrdients), string.Join(",", input.decorates));

            //serve
            input.dish = await serveAsync(description, input.recipe);

请注意,Task.Run 用于消息任务 (giveMessageToHungryFolks)。这是因为消息任务不必等待任何响应或结果。

感谢 George 的评论,促使我研究这个主题。

Task 与 async/await

使用秒表比较了 Task 和 Async 两种实现的执行时间。两者都执行 10 次并取平均执行时间,我发现 Async 方式节省了 2.5 秒。

参考文献

https://codeproject.org.cn/Articles/152765/Task-Parallel-Library-of-n 作者 Sacha Barber

http://stackoverflow.com/questions/10490307/retry-a-task-multiple-times-based-on-user-input-in-case-of-an-exception-in-task/10494424#10494424

© . All rights reserved.