Jibu for .NET 入门
Jibu 是一个用于 .NET 2.0 及以上版本的高级多线程 API。
什么是 Jibu?
Jibu 是一个为 .NET 2.0 及以上版本设计的库,它作为 .NET 框架多线程功能的封装器。它的目标是让程序员摆脱创建和使用线程的底层细节,以及如何实现安全的线程间通信。这使开发人员能够专注于他们希望并行化的实际逻辑,而不是如何正确地实现多线程。
Jibu 的主页在 http://www.axon7.com。可以下载免费的 Java 和 .NET 版本,也可以在那里访问支持论坛。
除了通过节省程序员编写复杂多线程功能的时间来减少开发时间外,Jibu 还能显著减少发现 bug 时调试代码所需的时间。多线程应用程序以难以隔离和重现的 bug 而闻名,随着多核系统真正并行执行线程成为常态,这些 bug 发生的可能性更大。在单核系统上,不正确的代码通常可以正常运行而没有问题,因为多个线程实际上不会同时运行(而是操作系统在线程之间快速切换,为每个线程分配一小段时间),但在多核系统上,必须正确使用线程。修复 bug 是开发过程中最难估算的环节之一,而追踪线程 bug 可能非常耗时——除了调试多线程应用程序固有的难度之外,真正了解如何安全使用线程的开发者很少。使用 Jibu,开发人员可以更有信心地认为多线程代码是正确的。许多开发人员可能已经实现了自己的线程库和/或实用程序,但这些很少能被认为是成熟且值得高度信任的——如果您的多线程应用程序出现奇怪的 bug,您有多大信心确定您编写的多线程支持类不是罪魁祸首?相比之下,Jibu 经过其开发者和所有使用它的团队的彻底测试——一个库被使用得越多而没有问题,它就越值得信赖。
使用 Jibu 的方法与程序员通常处理线程的方式大不相同。在编写多线程应用程序时,程序员传统上会考虑线程、临界区、互斥量、信号量、原子操作等。** 许多多线程库(如 Java API 和 C++ Boost 库提供的)只是将这些结构封装在实用类中,但 Jibu 的处理方式不同。Jibu 以您的目标为中心,并尽可能让您忘记诸如线程之类的东西。例如,没有 JibuThread 类。相反,Jibu 是一个基于任务的 API。您根据需要运行的任务进行编程,Jibu 负责执行这些任务并允许它们进行通信。Jibu 管理关于何时创建线程、它们如何安全地与共享数据交互等的底层决策。要了解所有这些是如何工作的,我们将继续探讨 Jibu 所基于的结构。
** 请注意,虽然不需要丰富的传统多线程编程经验,但会假定您具备多线程的基本理论知识。此外,有时会使用传统并发编程中的术语(例如,像互斥量、信号量、原子这样的词),因此快速参考一下可能会很有用。如果不行,您也可以像其他人一样使用 Wikipedia!
关键概念
如前所述,Jibu 不以线程、临界区和其他并发编程基础为单位工作,而是将它们抽象成一种基于任务的思维方式。该库基于 4 个基本概念:**任务**、**并行**、**邮箱** 和 **通道**。请注意,此时我们讨论的是概念,而不是类。类远不止 4 个!
任务
任务可能是最容易理解的概念。任务只是一个可以运行的工作单元,例如,您可能有一个从文件读取数据或执行数学计算的任务。对于熟悉 Java 的人来说,任务大致相当于 Runnable。Jibu 管理您创建的任务,当任务执行时,将使用单个线程来运行该任务。如果您想执行 10 个任务,Jibu 将使用多个线程来同时运行这些任务。
Jibu 的核心是其任务调度程序。它负责管理线程的创建和管理,并将任务分配给线程。重要的是要注意,调度程序不会为每个任务简单地创建一个新线程。在大多数情况下,Jibu 会尽量使线程数等于运行应用程序的系统的核心数。
并行
Jibu 并行构造用于控制多个任务的执行。任务可以异步执行,但多线程应用程序通常需要等待多个任务完成的能力。并行提供了在指定任务集完成之前暂停主应用程序执行的功能。
邮箱和通道
这两个构造都围绕着允许任务进行通信。在许多情况下,应用程序/算法包含多个完全不相关的部分,这些部分可以作为单独的任务运行。但在许多情况下,应用程序有可以同时运行但需要读写相同数据的独立部分。任何熟悉并发编程的人都知道,在不采取预防措施的情况下,这种情况会出现特定问题。我不想在这里教并发编程,但基本上,如果一个线程开始更新某些数据,另一个线程可以在更新过程中读取它。标准的并发编程有互斥量、临界区和信号量等构造,程序员必须使用这些构造来保护数据访问和修改的方式。
在 Jibu 中,我们不需要考虑这个层面。邮箱和通道概念允许任务传递数据,并确保以安全的方式完成。
每个任务都有自己的邮箱,只有它自己可以从中读取。一个任务可以向另一个任务的邮箱发送数据,但不能从中读取。当一个任务需要将信息直接传递给另一个任务时,就会使用邮箱。
如果涉及的任务超过 2 个,那么通道构造可能更有用。通道允许所有链接到它的任务读写数据。任务可以将一个对象推送到通道中。它会保留在通道中,直到一个任务从它那里读取——此时,对象就会从通道中取出。换句话说,一个任务写入的数据可以被任何其他任务检索——但不能被多个任务读取,因为读取操作会将其从通道中移除。因此,如果一个任务想将相同的数据传递给多个其他任务,目前建议发送线程将数据发送到每个接收者的邮箱**
**至少在 Jibu 1.0 中是这样。计划在未来版本中增加多任务之间的通信和数据共享的增强功能。
使用 Jibu
到目前为止,我们已经探讨了 Jibu 的存在原因、它的功能以及它的构建思想。现在是时候看看这些概念如何转化为类,并审视 Jibu 库本身了。
设置 Jibu 应用程序
本教程假定您已熟练掌握 C#,包括类、继承等。如果您经验不足,您可能不知道如何告诉您的应用程序使用 Jibu 库。幸运的是,这非常简单**。在您的 Visual C# 项目中,您应该会看到解决方案资源管理器窗口

如果右键单击“**引用**”并选择“**添加引用**”,您应该会看到一个弹出窗口,如下所示

如果切换到“**浏览**”选项卡,您现在需要找到 Jibu.dll 文件,其中包含 Jibu 库。默认情况下,它会在类似这样的位置

只需选择 **jibu.dll** 并单击 **OK**。现在您应该已将 Jibu 添加为引用,并且它应该出现在解决方案资源管理器中的引用列表中

**此项目是使用 Visual C# Express 2005 针对 Jibu 1.0 RC2 编写的;不同版本可能存在细微差异。
核心 Jibu 类
在继续查看示例应用程序之前,将简要概述一些关键的 Jibu 类和接口。这次简要介绍无意取代出色的类文档。事实上,我建议您将类文档视为必读内容。除了对每个方法的更详细描述外,文档中还包含大量有用的代码示例,因此您可以看到设计者希望如何使用该库。
管理器
Manager 类控制 Jibu 的初始化。虽然它对任何 Jibu 应用程序都至关重要,但我们实际上很少使用这个类。Manager 的方法和属性都是静态的;这个类永远不会被实例化。所有需要做的是确保在调用任何其他 Jibu 类或功能之前调用 Manager.Initialize()。
Task、Async 和 Future
抽象类 Task 可能是整个库中最重要的类。您想要在多线程应用程序中运行的所有任务都将编写为 Async 或 Future 的自定义子类,它们是 Task 的直接子类。Task、Async 和 Future 都是抽象类,它们提供了将您的代码作为 Jibu 任务运行的功能。
当您编写自己的任务时,您必须选择基于 Async 还是 Future。顾名思义,Async 提供了一种轻松异步运行任务的方法。调用 start 方法会将您的任务对象添加到 Jibu 调度程序,调度程序将执行任务,而正常程序流程继续——换句话说,Async.Start 不会阻塞。您稍后可以使用 Async.WaitFor 方法使主程序流程暂停,直到任务完成。
Future 用于创建异步任务,区别在于 Future 允许在任务完成后通过 Result() 方法获得返回值。
Async 和 Future 都提供 Start 方法,该方法将任务添加到 Jibu 调度程序,以及 Cancel 方法,该方法可用于提前终止任务的执行。要创建自定义任务,您必须实现的唯一方法是 Run() 方法。在调用 Start() 使任务被调度执行后,调度程序将调用此方法。
Task 类还提供了 Mailbox API,主要是一个 Send 方法,允许将数据发送到另一个任务,以及一个 Receive 方法,用于读取另一个任务发送的数据。
Channel、ChannelReader 和 ChannelWriter
Channel 泛型类体现了前面讨论的 Channel 概念,它允许多个任务以安全的方式共享数据。最简单的理解方式是将其视为一个堆栈或队列(您可以选择其中之一),您可以在其中推送和拉取模板类型的对象。通道通常有一个最大大小,如果通道未满,则调用 Write 将数据进入通道并立即返回,否则调用将阻塞直到通道有空间接受另一个项目。当任务调用 Read 时,它从通道中弹出下一个项目——此方法会阻塞,直到有项目可供从通道中移除。
与 Channel 类相关的是 ChannelReader 和 ChannelWriter,它们封装了底层 Channel 的读取和写入。这些最初可能显得有些多余,但它们的存在意味着您可以控制对 Channel 的读写访问。例如,将 ChannelReader 传递给一个方法,该方法可以读取 Channel 但不能写入它。
Channel 的一个重要概念是能够“毒化”通道(通过 Channel、ChannelReader 或 ChannelWriter 上的 Poison 方法)。一旦通道被毒化,任何尝试读取或写入该通道的操作都会因抛出异常而失败。通常,我们可能会有一个任务,其 Run 方法本质上是一个循环,读取通道并处理读取的数据。通过毒化通道,我们可以使该任务以可控的方式退出,而不是强制终止它。
并行
Parallel 类是一个实用类,提供实现简单并行功能的静态方法。例如,Parallel.Run 接受一个任务数组并阻塞直到所有这些任务都完成。还有一些非常方便的循环并行方法,For 和 Foreach,它们基本上允许 for 循环的迭代在多个核心/CPU 上共享。
Bar 示例应用程序
本教程附带 BarSample 项目。该项目提供了一个简单的酒吧模型,客户在那里下订单并由服务员服务。生成了许多服务员和顾客,每个顾客会随机间隔时间下订单,并由服务员之一服务。一个订单可以包含多种不同类型的饮料;每个订单由一名服务员处理。
从 Jibu 概念的角度来看,该应用程序可分解为以下功能单元:
- 每位服务员和每位顾客都是一个任务。
- 有一个单一的通道。
- 顾客通过向通道写入订单来下单。
- 服务员从通道读取订单并提供所请求的饮料。
- 当服务员为顾客的订单服务完所有饮料后,他们会将一条消息发布到该顾客的邮箱。
- 每位顾客会下一定数量的订单,当一位顾客的最后一个订单完成时,该顾客任务将结束。
- 当所有顾客任务都结束后,服务员任务也会自动终止。
现在,让我们看看代码……
BarSampleApp.cs
这是应用程序的主类。假设您有任何 C# 知识,您应该会明白,静态 Main 方法在应用程序启动时被调用,创建 BarSampleApp 的新实例并运行它。
这个类只有两个我们需要关注的方法……首先是构造函数
private BarSampleApp()
{
/*
* Initialize the Jibu manager with default stack size.
* Manager is a static class
*/
Manager.Initialize();
}
任何 Jibu 应用程序中最重要的就是使用 Jibu.Manager.Initialize() 初始化 Jibu。正如注释所示,可以在此方法中设置 Jibu 的堆栈大小,但通过不传递参数,我们选择默认值 1MB。
在初始化 Jibu 后,应用程序的所有实际功能都在 Run 方法中提供。它只有大约 30 行,足够短,我们可以逐行分析……
public void Run()
{
CustomerTask[] customers = new CustomerTask[5];
WaiterTask[] waiters = new WaiterTask[2];
如前所述,Bar 示例模拟了许多顾客和服务员,每个都由一个任务表示。这里我们看到顾客和服务员的数量被硬编码为 5 和 2。此时,我们大约知道每个任务类型提供的功能,但不知道其工作原理。暂时没关系……在检查两个自定义任务类如何工作之前,我们将先介绍应用程序是如何协同工作的。
请注意,我们实际上还没有创建任何任务,只是创建了用来存放它们的数组。
Channel<Order> queueChannel = new Channel<Order>();
我们的应用程序使用一个通道来提供简单的消费者/生产者通信模型。顾客可以将订单写入队列,服务员可以从队列中读取它们来处理。请注意,通道是使用将要传递的数据类型进行模板化的,即 Order 类。我不会写任何关于 Order 类的内容,它只是一个微不足道的容器,用于记录下每个订单需要多少种不同类型的饮料。
for (int i = 0; i < waiters.Length; ++i)
{
waiters[i] = new WaiterTask(queueChannel.ChannelReader, i + 1);
waiters[i].Start();
}
接下来,我们实际创建每个 WaiterTask。此类在构造函数中接受 ChannelReader 和一个 id 作为构造参数。如前所述,使用 ChannelReader 意味着我们可以强制执行应用程序逻辑,即服务员可以读取订单,但不能下单。
请注意,我们在创建每个 WaiterTask 后就启动了它。这没有问题,因为 WaiterTask 只是启动并等待订单出现在通道中……因此,在这个循环结束时,每个服务员都已就位,等待口渴的顾客光顾。
for (int i = 0; i < customers.Length; ++i)
{
customers[i] = new CustomerTask(queueChannel.ChannelWriter, i + 1);
}
同样的方式,我们现在创建我们的 CustomerTasks。有两点需要注意……首先,我们还没有启动 CustomerTasks。在 Jibu 中,任务可以被创建,但在调用其 Start 方法之前不会被传递给调度程序。其次,CustomerTasks 接受 ChannelWriter 作为构造参数。顾客下单,因此允许他们写入通道但不能从中读取。
Parallel.Run(customers);
记住,在此行之前,WaiterTasks 已经启动,在 Jibu 调度程序控制的线程中运行,但 CustomerTasks 尚未启动。Parallel.Run 同样可以接受已启动或未启动的任务——如果尚未启动,它将启动它们。然后它等待所有 CustomerTasks 完成,这等同于说它等待 CustomerTask.Run 在所有顾客上退出。在方法返回之前,不会发生这种情况。查看应用程序的功能摘要,这意味着当此方法调用返回时,最后一位顾客已收到他的最后一份订单,并且应用程序即将结束。实际的订购和供应饮料是在运行的 CustomerTask 和 WaiterTask 对象之间的交互中完成的,它们通过 Channel 进行通信,稍后我们将看到。
queueChannel.Poison();
此时,我们知道所有 CustomerTasks 都已完成,因为 Parallel.Run 已返回。然而,服务员仍然耐心地等待更多订单——但既然所有顾客都已离开酒吧,是时候让服务员回家了。通过“毒化”通道,我们将在服务员下次尝试从通道读取时遇到 PoisonException。WaiterTask 类编写方式是,当发生这种情况时,任务将结束。因此,通过“毒化”通道,我们可以一种方便的方式使 WaiterTasks 以安全可控的方式结束。
Parallel.Run(waiters);
但是,服务员不会立即知道“毒化”的事情。通过添加此行,我们确保应用程序不会在所有 WaiterTasks 实际终止之前继续进行。
WaiterTask.cs
此类是 Async 的子类,用于模拟服务员。如前所述,WaiterTask 在构造函数中接收 ChannelReader,用于从通道读取订单。它还接收一个 id,仅用于在控制台消息中标识服务员。
我们需要关注的唯一方法是 Run 方法。记住,在调用任务的 Start 方法后,Jibu 会调用此方法。
显而易见,Run 方法会无限循环,直到抛出 PoisonException。当通道被“毒化”时,这将发生,此时 Run 方法,从而 WaiterTask,将结束。在此之前,Run 方法运行一个循环
Order order = channelReader.Read();
如果通道中没有 Order,ChannelReader 将会阻塞——换句话说,在有订单可读之前,此方法不会返回。任务将等待,Jibu 会确保在有订单可读之前,其 CPU 使用率最低。
接下来的几行只是提取订单信息,并模拟一次服务一种饮料,每次服务之间都有延迟。
Send(id, order.getCustomer().Address);
Timer.SleepFor(1000);
服务完所有饮料后,会向下单顾客的邮箱发送一条消息。请注意,我们使用任务的 Address 发送消息,而不是任务本身。为了向任务发送消息,必须知道它的地址。
在此之后,循环会暂停一秒(仅仅是为了让应用程序的进度更容易跟踪),然后再重新开始。
CustomerTask.cs
CustomerTask 也是 Async 的子类。它也运行一个循环,但每次迭代不是关于读取和处理订单,而是在替换完最大数量的订单后,任务会在每次迭代中下新订单,然后退出。
while (roundsBought<=maxRounds)
{
int countdown = rand.Next(10, 20);
while(countdown--> 0)
{
Timer.SleepFor(1000);
}
创建时,maxRounds 是随机生成的。这代表顾客将下的订单数量。在每次迭代开始时,任务会暂停一段随机时间,介于 10-20 秒之间。请注意,使用了 Jibu.Timer 类来提供 Sleep 功能。
Order order = new Order(this);
...
channelWriter.Write(order);
现在创建一个新的 Order 并填充。请注意,Order 维护着对下单顾客的引用,这使得 WaiterTask 可以在服务完订单后向 CustomerTask 发送邮箱消息。
构造 Order 对象后,将其写入通道。由于创建通道时没有指定缓冲区大小,调用 Write 将阻塞,直到 Waiter 从通道中读取订单。
int val = Receive<int>();
Console.WriteLine("Customer #" + id + " got his drinks from waiter #"+val+"\n");
调用 Receive 用于从 CustomerTask 的邮箱读取一个 int。此方法会阻塞,因此直到有消息发布到顾客邮箱才会返回。如前所述,这条消息是在服务员处理完顾客的订单后发布的。