并发编程 -研究任务消息传递以实现无同步的进程间通信






4.80/5 (13投票s)
对 Parallel FX 的进一步研究。
引言
我写上一篇文章 并发编程 - 入门 的目的是为了研究微软的 Parallel FX 库 (PFX)。正如各位专家明确指出的那样,PFX 并未解决应用程序状态和共享内存同步的问题。Erlang 的一项让我着迷的特性是任务间的消息传递概念。具体来说,接收消息的线程会获得发送线程所发送消息的本地副本。这完全消除了同步问题。本文的目的是研究一个简单的 TaskMessageManager (TMM) 实现,该实现与 PFX 配合使用,为任务添加序列化消息传递。
关于代码
如果您下载了代码,您还需要安装 Parallel FX 库。另外,非常重要的一点是,如果您在 Debug 模式下编译代码,请在属性中取消勾选“定义 DEBUG 常量”——TMM 输出的调试消息在测试 TMM 模式时会严重影响应用程序的性能。需要 Visual Studio 2008。
基本架构
任务通过指定它们正在监听的消息队列来向 TMM 注册。任务会阻塞,直到消息被放入消息队列。任务可以共享同一个消息队列,以便工作可以分配给多个任务。应用程序必须设置任务以并发运行在特定的消息队列上,而不是使用 SelfReplicating 标志选项(见下文 PFX 问题)。消息在任务请求时从队列中检索,这意味着它已经完成了当前工作。使用了两个内置消息(cancel 和 done),分别用于告知任务终止以及表明所有工作已完成。在这两种情况下,该任务都会终止。如果队列被多个任务共享,它们都会终止(类似广播到所有监听该消息队列的任务)。
TaskMessageManager 的作用
TMM 设置了消息队列,主应用程序及其任务可以通过这些队列相互通信,根据消息类型和内容启动工作。此外,消息会被复制,因此接收线程不会直接引用发送线程的消息。TMM 是一个轻量级 API,开发人员可以使用它来减少实现相同功能所需的设置工作。
TaskMessageManager 的作用
TMM 是一个实验,我计划继续深入研究。它不打算用于生产代码。显然存在性能问题,这可能永远无法在 C# 中解决。还有一些核心架构问题,我认为只能通过像 Erlang 那样的虚拟机来解决,该虚拟机便于将数据作为值在任务之间移动,甚至不包含同步的构造。另一方面,谨慎使用 TMM 可能会使应用程序受益于无同步的任务通信。
当然,TMM 并没有消除任务之间的同步问题,它只是将问题**从**开发人员需要处理的问题**转移**到了 TMM **为**开发人员处理的问题。TMM 中有大量的 lock 语句来控制对消息队列的访问,当然,还有序列化过程,这会带来性能和可用性方面的问题。PFX 也是如此——它通过将线程管理问题从开发人员手中转移到 PFX,使开发人员更容易向应用程序添加并发性。
实现
接口
有两个关键接口,IMessage
和 IClonableMessage
。发送给任务的所有消息都必须实现 IMessage
。如果消息实现了 IClonableMessage
public interface IClonableMessage
{
IMessage DeepClone();
}
则会调用 DeepClone
方法,而不是使用粗暴的 .NET 序列化。如果您不实现 IClonableMessage
,则您的消息类必须是可序列化的。IMessage
接口目前只是一个存根。
用法
一个简单示例
在这个简单的例子中,创建了两个任务,应用程序等待任务注册(参见下文 PFX 问题)
task1 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task1");
task2 = Task.Create(ATask, null, TaskManager.Default, TaskCreationOptions.None, "Task2");
while (!tmm.IsRegistered(task1)) {Thread.Sleep(10);}
while (!tmm.IsRegistered(task2)) { Thread.Sleep(10); }
任务注册自身,并开始监听同一个消息队列,指定 100ms 的超时时间。
static void ATask(object obj)
{
tmm.RegisterTask(Task.Current, "TaskQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage(100);
...
如果任务指定了超时时间,GetMessage
调用将返回内置的 NoMessage
。如果任务未指定超时时间(-1),TMM 会在内部继续监控任务,以查看应用程序或 PFX TaskManager 是否发出了取消请求,在这种情况下,TMM 会抛出 TaskCanceledException
1。由于应用程序可能通过 StopMessage
通知任务它不再需要了,因此典型的任务也应该检查此消息。在我们简单的例子中,一旦任务收到我们的两条消息之一,它就会终止自身。
while (!stopped)
{
TaskMessage tm = tmm.GetMessage(100);
Debug.WriteLine("!tmm: " + Task.Current.Name + ": "+tm.Message.ToString());
if (tm.Message is SayHelloMessage)
{
// executes in either task 1 or task 2.
Console.WriteLine("Hello!");
stopped = true;
}
else if (tm.Message is RequestHelloMessage)
{
// executes in task 1.
// Post the message queue rather than a specific task.
tmm.PostMessage("TaskQueue", new SayHelloMessage());
stopped = true;
}
else if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is NoMessage)
{
}
}
应用程序将 RequestHelloMessage
发布到此队列。其中一个任务将被释放,它将 SayHelloMessage
发布到同一个队列,通常由第二个任务处理,而不是第一个任务去检查更多消息。在我们简单的测试中,应用程序会这样做
tmm.PostMessage("TaskQueue", new RequestHelloMessage());
tmm.Wait();
消息队列会监控 Task.Completed
事件,并从其内部映射中移除任务,因此我们可以一直等到所有任务都完成。
通过查看此应用程序运行时生成的跟踪,您将看到两个任务在工作
另一个运行示例表明,Task1 先收到了消息
并发应用程序模式
此时,开始识别并发编程模式是有用的。
处理工作然后终止
一种并发应用程序模式是“处理工作然后终止”模式,它基本上只是一个等待消息、处理该消息然后终止自身的任务。上面的例子说明了这种模式。
处理消息然后终止
一个稍微复杂一些的版本会处理其队列中的所有消息,然后终止。这假设队列在任务开始之前已经完全加载,否则任务将与加载器竞争。此模式当前不可能实现(参见下文 PFX 问题)。
处理工作直至停止
比前一种模式更高级的模式要求添加到队列的最后一条消息是 StopMessage
。任务只有在看到此消息时才会终止。显然,这假设所有监听该队列的任务按顺序出队消息。此外,它要求 TMM 自动将消息放入每个监听该队列的任务的队列中,以便所有监听任务都终止。此模式,如 TMM 实现的那样,当前在任务动态地将自己添加到已发布 StopMessage
的消息队列之后失败。
一个更复杂的例子
回到我在第一篇文章中使用的 Mandelbrot 示例5,我将再次展示代码是如何修改的,这次说明了任务和任务消息传递。您会立即注意到这个架构的特点是,原始代码已被分解成小的、自主的单元。然而,在这个版本中,仍然存在一个全局变量集,任务会引用它
readonly double xstep;
readonly double ystep;
readonly double escapeRadius;
readonly double logEscapeRadius;
readonly int width;
readonly int height;
readonly byte[] argb;
readonly Color[] colors;
readonly ComplexNumber p1;
readonly ComplexNumber p2;
readonly int maxIteration;
readonly ProgressBar progress;
我的待办事项清单是将其中的变量移至各自的任务,并在任务创建时使用任务参数进行初始化。目前,它们被指定为 readonly
以强调它们是不可变的。
消息
有三条消息。
/// <summary>
/// The initial task is primed with width messages, each an x coordinate.
/// </summary>
public class XCoordMessage : IMessage, IClonableMessage
{
public int x;
public XCoordMessage(int x)
{
this.x = x;
}
public IMessage DeepClone()
{
return new XCoordMessage(x);
}
}
/// <summary>
/// The coordinate message is sent to the task that computes
/// the iterations.
/// </summary>
public class CoordMessage : IMessage, IClonableMessage
{
public int x;
public int y;
public CoordMessage(int x, int y)
{
this.x = x;
this.y = y;
}
public IMessage DeepClone()
{
return new CoordMessage(x, y);
}
}
/// <summary>
/// This message is sent to the task responsible for updating
/// the bitmap.
/// </summary>
public class CoordColorMessage : IMessage, IClonableMessage
{
public int x;
public int y;
public int colorIndex;
public CoordColorMessage(int x, int y, int colorIndex)
{
this.x = x;
this.y = y;
this.colorIndex = colorIndex;
}
public IMessage DeepClone()
{
return new CoordColorMessage(x, y, colorIndex);
}
}
第一条消息设置任务来生成 (x,y) 坐标,然后将该坐标传递给迭代任务。迭代任务在确定迭代次数后,会向负责更新位图的任务发送一条消息。
任务
有三个任务。每个任务都会检查 StopMessage
,如果 TMM 返回该消息,则会自行终止。否则,它会检查消息是否为预期的消息类型,并基于该消息执行任务。
PostCoordinates
此任务将要计算的像素的 (x, y) 坐标发布到 ComputeIterations
任务。
public void PostCoordinates(object obj)
{
// When we get the the x coord
// and vertical height, post the complete coordinate.
tmm.RegisterTask(Task.Current, "CoordQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is XCoordMessage)
{
XCoordMessage xch = (XCoordMessage)tm.Message;
for (int y = 0; y < height; y++)
{
tmm.PostMessage("IterationQueue", new CoordMessage(xch.x, y));
}
}
}
}
ComputeIterations
此任务计算 z 值逃逸前的迭代次数。它操作的是在其队列中收到的消息的 (x, y) 坐标。
public void ComputeIterations(object obj)
{
// When we get the coordinate,
// compute the # of iterations before escape and
// post the result.
tmm.RegisterTask(Task.Current, "IterationQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is CoordMessage)
{
CoordMessage cm = (CoordMessage)tm.Message;
ComplexNumber z = p2;
z.Re = p2.Re + (cm.x * xstep);
z.Im = p1.Im-cm.y*ystep;
ComplexNumber C = z;
int iteration = 0;
while ( (z.Modulus < escapeRadius) && (iteration < maxIteration) )
{
z = z * z + C;
iteration++;
}
int colorIndex = 0;
if (iteration < maxIteration)
{
z = z * z + C; iteration++;
z = z * z + C; iteration++;
double mu = iteration - (Math.Log(Math.Log(z.Modulus))) / logEscapeRadius;
colorIndex = (int)(mu / maxIteration * 768);
}
tmm.PostMessage("BitmapQueue", new CoordColorMessage(cm.x, cm.y, colorIndex));
}
}
}
UpdateBitmap
此任务接收像素的 (x, y) 坐标以及上面任务计算出的颜色索引。它负责将颜色发布到位图。
public void UpdateBitmap(object obj)
{
// When we get the computed iterations
// for the coordinate, put it in the bitmap.
tmm.RegisterTask(Task.Current, "BitmapQueue");
bool stopped = false;
while (!stopped)
{
TaskMessage tm = tmm.GetMessage();
if (tm.Message is StopMessage)
{
stopped = true;
}
else if (tm.Message is CoordColorMessage)
{
CoordColorMessage cm = (CoordColorMessage)tm.Message;
int colorIndex=cm.colorIndex;
if ((colorIndex < 0) || (colorIndex >= 768))
{
colorIndex = 0;
}
int index = (cm.y * width + cm.x) * 4;
argb[index] = colors[colorIndex].B;
argb[index + 1] = colors[colorIndex].G;
argb[index + 2] = colors[colorIndex].R;
argb[index + 3] = 255;
// See comments in UpdateProgress for why we do this rather than
// another task that updates the progress bar.
Interlocked.Increment(ref Tasks.progressValue);
// tmm.PostMessage("UpdateQueue", UpdateProgressMessage.msg);
}
}
}
进度条更新过程
进度条在主应用程序线程中更新。由于可能有一个以上的 UpdateBitmap
任务正在运行,因此 progressValue
任务必须原子地递增,因此使用了 Interlocked.Increment
语句。尝试发布计数并对进度条的容器窗体使用 BeginInvoke
是灾难性的——似乎通过任务实例更新 UI 是个糟糕的主意。在主应用程序线程中,TMM 在等待任务完成时会提供一个回调。这有效地将进度条更新封送到了主应用程序线程,在那里它可以被安全地更新。
protected void UpdateUI()
{
// on 32 bit systems, this is atomic.
int val = Tasks.progressValue;
progress.Value = val;
Application.DoEvents();
Thread.Sleep(100);
}
但最终结果是这违反了将数据保留在任务本地的模型。我将这个问题留给未来的增强。
任务初始化
任务通过编译器选项进行初始化,可以选择每个任务一个 Task
实例,或者 n 个 Task
实例,其中 n 是可用处理器的数量。您可以通过在代码中定义此编译器选项来尝试性能差异。此模式模拟任务的自复制。理想情况下,我们希望 TaskManager
来管理自复制。
public void Initialize()
{
Task task;
#if DualTask
for (int i = 0; i < System.Environment.ProcessorCount; i++)
#endif
{
#if DualTask
#else
int i = 0;
#endif
task = Task.Create(PostCoordinates, null, TaskManager.Default,
TaskCreationOptions.None, "Coordinates"+i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
task = Task.Create(ComputeIterations, null, TaskManager.Default,
TaskCreationOptions.None, "Iterations"+i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
task = Task.Create(UpdateBitmap, null, TaskManager.Default,
TaskCreationOptions.None, "Bitmap" + i);
while (!tmm.IsRegistered(task)) { Thread.Sleep(10); }
}
}
主应用程序线程
而不是使用两个循环来计算每个分形的迭代次数,主应用程序会使用 x 坐标消息来初始化第一个任务,然后等待所有任务完成。
Tasks tasks = new Tasks(xStep, yStep, escapeRadius, logEscapeRadius, p.Width, p.Height,
maxIteration, P1, P2, argb, Colors, progress);
tasks.Initialize();
tasks.SendX();
// Wait for the coord queue to flush.
TaskMessageManager.Default.Wait("CoordQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("CoordQueue", StopMessage.Default);
// Wait next for the iteration queue to flush.
TaskMessageManager.Default.Wait("IterationQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("IterationQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
TaskMessageManager.Default.Wait("BitmapQueue", UpdateUI);
TaskMessageManager.Default.PostMessage("BitmapQueue", StopMessage.Default);
// Then wait for the bitmap queue to flush.
//TaskMessageManager.Default.Wait("UpdateQueue", UpdateUI);
//TaskMessageManager.Default.PostMessage("UpdateQueue", StopMessage.Default);
TaskMessageManager.Default.Wait(UpdateUI);
这里的关键点是,应用程序在队列为空时发布 StopMessage
,然后调用 Wait
来等待最后一个任务完成。这确实有效,尽管非常笨拙,因为任务 1 的队列在至少向任务 2 的队列中发布了消息之前无法清空。
性能结果
由此产生的性能由这张图4生动地说明了
您会惊奇地注意到,并行化外循环(使用 Parallel.For
处理内循环——而不是同时处理外循环)比单线程应用程序的性能要差得多,正如我在第一篇文章中所预测的那样。尽管我清楚地看到,单线程执行时双核利用率仅为 50%,而 PFX 版本中的核心利用率为 100%。正如预期的那样,TMM 版本要慢得多,但至少在这里,设置多个任务会带来性能提升。顺便说一句,在所有情况下,移除更新进度条的代码都会带来 10%-20% 的性能提升,这表明更新 UI 是一项昂贵的操作。
现在,有趣的是,根据渲染分形的不同区域,PFX 的性能有时会更好。也许,碰巧我找到了 PFX 性能较差的分形区域。这绝对值得进一步深入研究。
PFX 问题
本次实验最重要的结果是我发现的 PFX 问题。希望它们能在未来的版本中得到解决,但如果不行,替换 PFX 任务管理器和 Task
类是相当直接的,尽管这将导致 PLINQ 的解耦。不过,这也有替代方案。但在此期间,我将违背我一贯的“重复造轮子”的习惯,尽可能多地尝试与 PFX 合作。
自动任务启动
第一个问题是,任务一旦创建,就会立即启动。这意味着我无法在创建后注册 Task
实例,因为任务会立即开始从 TMM 请求消息,而任务创建者可能尚未注册。反之,如果任务自行注册(这是我的实现方式),创建者必须等到任务注册后才能发布消息。第一种情况可以通过某种自动注册过程来解决。第二种情况可以通过(这是我的偏好,但尚未实现)先创建队列来解决。这将允许任务创建者用消息填充队列,然后创建任务。
但理想情况下,我认为 Task.Create
方法应该允许您以暂停状态创建任务,并在您选择的时间启动(或恢复)它。为了完整起见,应用程序应该能够暂停任务。PFX TaskManager
应该足够智能,可以将任务分配给以前为已暂停任务保留的核心。
自复制额外事件
在自复制时,即使任务从未启动,Task.Completed
事件也会触发。在本例中,我删除了 task2 的创建。从这个跟踪中
您可以看到 Task1 被注册了两次(PFX TaskManager
看到有两个核心,因此它复制了相同的任务名,这没关系)。但是,您还可以看到有四次“Task Completed”事件调用!我认为 Task.Completed
不应该为从未启动过的线程调用。
任务实例重用?
启用自复制后,我遇到了这个异常
if (taskQueueMap.ContainsKey(task))
{
throw new TaskMessageManagerException("The task " + task.Name +
" is already registered.");
}
我无法可靠地重现此问题,但这表明 Task
实例可能被 PFX TaskManager
重用。至于 taskQueueMap
为什么仍然包含此任务,我并不清楚,并且异常可能是我代码中的一个 bug 引起的。尽管如此,知道 Task
实例是否被重用很重要,这需要对 System.Threading
程序集进行进一步调查。如果 Task
实例被重用,那么任务最好确保在任务开始时初始化为已知状态(而不是其最后状态)。
令人担忧的 PFX Bug
由于任务可能长时间等待消息,因此“已知正确性 bug”(我猜,简单地说“bug”不再流行了):“长时间阻塞的任务可能导致线程注入失控和内存溢出条件,当使用默认策略时。”2 值得关注。希望这个 bug 能被修复。
核心与线程利用率
任务被分配给核心,在自复制时,当核心可用时,任务会被复制。3 我认为这并非理想。它假设任务会消耗其核心的 100%,但这并非总是如此。如果任务正在等待 I/O 完成事件怎么办?对于 TMM 而言,如果任务正在等待消息怎么办?是否有其他实际工作的 Task
实例可以暂时利用这个核心?到目前为止,我还没有找到任何关于 PFX 是否处理任务不消耗核心 100% 的情况的信息。
性能
我完全期望 PFX 渲染的 Mandelbrot 会比单线程操作更好,而且我实际上对它反而更差感到失望!如果 PFX 团队的某个人能解释这个结果,那将是太好了。
TMM 问题
任务消息管理器实例
只允许一个 TMM。任务始终与默认 TMM 交互。通过允许多个 TMM,每个 TMM 管理**单个**队列,可能可以更好地管理任务队列的整个概念。
PostMessage 重载
PostMessage
有发布到队列或发布到任务的重载。然而,发布到任务是名不副实的,因为它会发布到该任务的队列,这就像发布到队列一样,因为监听该队列的任何可用任务都会收到该消息。这一点并不明显,并且方法签名具有误导性,因为开发人员可能会错误地认为消息正在发布到该特定任务。
自复制任务
TMM 不适用于自复制任务。关于此问题以及“智能”删除不再需要处理队列中没有内容的任务的代码,有一整部分代码已被注释掉。这可能是在混合管理任务,并且可能与 PFX 的 TaskManager
发生冲突。自复制任务无效的另一个原因是 PFX 在 Task.Completed
事件数量大于已启动 Task
实例数量时存在问题。最后是 TMM 停止任务的问题,如下一节所述。
停止任务
在“处理工作直至停止”模式中,TMM 向监听指定队列的每个任务发送 StopMessage
。要成功停止所有任务,任务本身必须行为正确——它不能在收到 StopMessage
后请求另一条消息。此外,由于此消息在发出时入队(而不是在遇到时入队,这并不能真正解决问题,因为可以在其他任务停止时添加任务),任何在消息入队后自行添加的任务将不会收到停止消息。这是一个必须解决的严重问题,以便自复制能够正常工作。
装箱
我认为使用结构体作为消息基本上没有意义,因为它们通过接口引用时会被装箱(尽管我需要对此进行仔细检查)。这是 .NET 环境在某些情况下不适合进行任务间消息传递的另一个原因。这些场景可以被识别为任务的生命周期非常短暂,以至于消息的序列化占了任务本身的相当一部分。
每个任务消息管理器有多个队列
如上所述,每个 TMM 的多个队列会导致在向特定队列添加和删除消息时产生不必要的锁定,因为锁定了整个队列集合,而不是仅仅锁定队列本身。这将阻塞其他试图从不同队列检索消息的任务。未来的性能改进是只为每个 TMM 使用一个队列,这将需要创建 TMM 实例。
其他观察
调试消息
TMM 中的调试消息会大大降低其速度。
每个核心的活动线程过多
如果一个核心在多个活动线程之间进行任务切换,任务切换本身就会拖慢核心的速度。这需要进一步研究。
理解应用程序的任务
通过使用 TMM,可以清楚地看出,需要仔细理解并行任务才能利用并发编程的优势。
理解您的并发编程工具集
同样清楚的是,除了理解应用程序需求外,还需要清楚地理解任何促进任务管理和任务之间协调(如消息传递)的库的工作原理。我希望 PFX 团队能提供高质量的文档,以便开发人员能够清楚地了解如何最好地利用 PFX 和并发编程。陷阱在于认为 PFX 使并发编程变得容易。它不是——CTP 仅仅是对您自己管理线程过程的语法糖。性能测试表明,并行化外循环并不能提高性能——实际上,它会降低性能。如果 PFX 团队的任何人能够解释这种行为并在本文中发帖,我将不胜感激。
未来改进
- 允许多个
TaskMessageManager
实例。 - 每个 TMM 一个队列。
- 通过将初始化参数传递给任务来初始化任务“堆”,从而真正实现全局变量的无引用。
- 与 PFX 的自复制算法共存。
- 任务平衡(不利用核心 100% 的任务)。
- 在 TMM 中创建一个插入或更新函数,该函数替换现有消息而不是添加另一条消息。这相当于一个“单槽”队列。
结论
我在这里的尝试是为 PFX 构建一个用例,这可能超出了 PFX 设计者计划的范围。此外,我想尝试无同步的任务间通信,以研究这种方法的优缺点。事实证明,PFX 可能不适合此类工作,无论是否存在其他更基本的问题,如序列化性能(这可能使 .NET 语言总体上不适合此方法)。然而,我确实希望 PFX 的创建者能够审视这项工作并考虑我的一些建议。我计划在未来的文章中继续这项调查。性能测量和优化是我感兴趣的一个主题——如何测量 PFX 或 TMM 等的性能变化,以及核心内的性能优化,因为坦率地说,我预计将在 .NET 语言领域工作相当长一段时间。
参考文献和注释
1 - 取消任务
2 - PFX 的已知正确性 Bug
4 - 这些测试是在 Sony Viao VGN-FE890 笔记本电脑上完成的,运行 32 位 Vista Business,2GB RAM,T5500 处理器,1.66GHz。Mandelbrot 配置是
maxIteration = 300;
P1 = new ComplexNumber(-0.669912107426552, 0.451398357629374);
P2 = new ComplexNumber(-0.672973630864052, 0.449948162316874);
绘图区域为 1254w x 594h。