65.9K
CodeProject 正在变化。 阅读更多。
Home

声明式多线程

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (39投票s)

2011年11月22日

CDDL

19分钟阅读

viewsIcon

67351

downloadIcon

884

关于 C# 中声明式多线程概念的介绍和概念验证代码。

传统的多线程方式

自 Windows NT 早期版本以来,Microsoft Windows 一直实现了进程内并行执行路径的能力。大多数程序员对这项功能是避而远之的。多线程应用程序难以调试,并且错误可能以多种方式出现。尽管如此,它仍然是构建响应式用户界面或在多 CPU/多核系统上能够良好扩展的服务器应用程序的最强大功能之一。

有许多线程库试图简化多线程应用程序的实现。但大多数库都侧重于如何创建线程及其生命周期管理。其中许多都存在相同的问题:哪个函数可以从哪个线程调用,这主要是一种约定。例如,在 C# 中,您可以使用 BeginInvoke 从另一个线程调用函数。但是没有人会阻止您直接调用该函数。客户端负责从正确的线程调用库的所有方法。COM 通过为每个单元引入不同的线程模型来改变了这一点。这虽然粗糙,但它允许库声明如何处理线程。框架负责确保正确的行为。在 COM 时代之后,这种方法消失了,客户端又重新负责确保尊重被调用方法的线程约定。

本文介绍了一种编写多线程应用程序的新方法。为了说清楚我的观点,让我们假设我们实现了一个名为 FooBarBaz 的类

class FooBarBaz
{
    public void Foo() {}
    public void Bar(int numberOfBars) {}
    public void Baz(String howToBaz, int numberOfBaz) {}
}

文档指出 BarBaz 不是线程安全的,并且必须从实例化该类的同一线程调用。Foo 被认为是线程安全的,可以从任何线程调用。

现在,在实现程序集的 2.0 版本时,以线程安全的方式实现 Foo 变得越来越复杂。您希望在下一个版本中放弃此功能。简单来说:您不能。可能有许多应用程序从不同的线程使用 Foo,并且所有这些应用程序都需要进行更改。我认为这违反了隔离原则。调用方和被调用方因其线程行为而紧密耦合。

难道不应该只是声明 FooBarBaz 的不同方法应该如何被调用,并由一个框架来确保它们以正确的线程调用吗?这将解耦调用方和被调用方,因为框架负责处理线程。如果被调用方的线程能力发生变化,框架将找出如何以正确的方式调用程序集。1

在代码上贴便利贴

ThreadBinding 库是我在 .NET Framework 的 ContextBoundObject 类之上实现声明式多线程的方法。以下代码片段展示了如果 FooBarBaz 类使用声明式线程,它会是什么样子。

[ThreadBound(ThreadBoundAttribute.ThreadBinding.WorkerContext)]
class FooBarBaz :
    ThreadBoundObject
{
    [FreeThreaded]
    public void Foo() {}

    public void Bar(int numberOfBars) {}
    public void Baz(String howToBaz, int numberOfBaz) {}
}

现在,线程绑定系统将在其自己的线程(WorkerContext)上创建该类,并将所有对 BarBaz 的方法调用编排到该线程。这确保了所有这些调用都在同一线程上执行。标记为 FreeThreaded 属性的 Foo 方法将在调用线程上执行。

现在我们有了该类的辅助线程,我们可以从主线程异步调用 Bar。这样,我们就可以继续绘制用户界面,而无需等待函数完成。

[ThreadBound(ThreadBoundAttribute.ThreadBinding.WorkerContext)]
class FooBarBaz :
    ThreadBoundObject
{
    [FreeThreaded]
    public void Foo() {}

    [AsyncThreaded]
    public void Bar(int numberOfBars) {}

    public void Baz(String howToBaz, int numberOfBaz) {}
}

现在,该类声明 Bar 可以异步调用。框架负责将调用参数编排到辅助线程。通过使用不序列化数据的轻量级包装器来确保速度。如果异步方法与同步调用混合,框架将按顺序执行它们。在“最坏情况下”,一个同步调用跟随十个异步调用,必须等待所有这些调用完成后才能执行。但这样,调用方和被调用方都可以确保调用顺序不受双方线程偏好的影响。

简单来说:类/程序集仅声明其如何被调用以及如何使用线程。其余部分由框架完成。客户端无需知道方法应该从哪个线程调用。框架会拦截调用并将其编排到正确的线程 — 或者立即执行。

举例说明

为了更深入地了解 ThreadBinding 库的细节,让我们创建一个示例项目。它使用一个非常“复杂”的服务来生成递增的数字序列。调用服务类上的方法时,数字之间的延迟可以配置。我们将使用声明式线程来进行一些异步调用,并确保 GUI 更新在正确的线程(GUI 线程)内完成。

首先,我们创建一个带有几个按钮和一个接收数字的列表控件的窗体。

using System;
using System.Windows.Forms;
using ThreadBound;

namespace FormsTest
{
    public interface IUpdateInterface
    {
        void NewNumber(int number);
    }

    public partial class MainForm :
        Form,
        IUpdateInterface
    {
        private NumberWorker numWorker;
        private IUpdateInterface threadBoundUpdater;

        public MainForm()
        {
            InitializeComponent();

            numWorker= new NumberWorker();

            threadBoundUpdater = this.BindInterfaceToThread<IUpdateInterface>();

            numWorker.NewNumber += threadBoundUpdater.NewNumber;
        }

        private void BtnStart_Click(object sender, EventArgs e)
        {
            numWorker.CreateNumbers(10, 500, new Test());
        }

        private void BtnCancel_Click(object sender, EventArgs e)
        {
            numWorker.Cancel();
        }

        public void NewNumber(int number)
        {
            LBNumbers.Items.Add(number.ToString());
        }

        private void BtnDispose_Click(object sender, EventArgs e)
        {
            numWorker.Dispose();
        }
    }
}

这相对直接。创建类并定义所需的一切。唯一“奇怪”的是 IUpdateInterface 接口。让我们暂时忽略它。我稍后会解释。首先,我们创建辅助类

using System;
using ThreadBound;
using System.Threading;

namespace FormsTest
{
    [ThreadBound(ThreadBoundAttribute.ThreadBinding.WorkerContext)]
    class NumberWorker : ThreadBoundObject, IDisposable
    {
        private volatile bool CancelFlag=false;

        public delegate void NewNumberHandler(int number);
        public event NewNumberHandler NewNumber;

        [AsyncThreaded]
        public void CreateNumbers(int limit, int delay)
        {
            CancelFlag = false;
            for (int i = 0; i <= limit; i++)
            {
                Thread.Sleep(delay);
                if (NewNumber!=null)
                    NewNumber(i);

                if (CancelFlag)
                    break;
            }
        }
        
        [FreeThreaded]
        public void Cancel()
        {
            CancelFlag = true;
        }

        public void Dispose()
        {
        }
    }
}

如您所见,服务实现中没有太多代码。让我们从服务类开始分析。它必须派生自 ThreadBoundObjectContextBoundObject。这是整个线程绑定机制正常工作所必需的。ThreadBoundObjectContextBoundObject 之间的区别稍后也会解释。

接下来要注意的是 ThreadBound 属性。它将线程样式声明为 ThreadBinding.WorkerContextThreadBinding 库将为类的每个实例创建一个单独的线程。目前实现了三种线程样式

  • ThreadBinding.WorkerContext:在单独的线程上执行所有未标记 FreeThreaded 属性的方法调用。类的每个实例都有自己的线程。
  • ThreadBinding.CurrentContext:在当前线程上执行所有未标记 FreeThreaded 属性的方法调用。只有当当前线程具有已分配的执行上下文时才能使用此选项。目前只有 WPF 或 WinForms GUI 线程支持使用此属性实例化类。
  • ThreadBinding.PoolContext:在线程池线程上执行所有未标记 FreeThreaded 属性的方法调用。每个方法调用(无论同步还是异步)都可能在不同的线程池线程上执行。ThreadBound 库不保证任何线程亲和性,除了执行线程将是线程池线程。

AsyncThreaded 属性将 CreateNumbers 方法标记为异步。ThreadBinding 库将在辅助线程上调用此方法。调用将立即返回,并且该方法将在该实例的辅助线程上执行。很明显,异步方法不能返回值。因此,所有异步方法都必须返回 void。如果它们不返回 void,则会抛出 ThreadBoundException。正如我们稍后将看到的,对此有一个例外。

Cancel 方法标记为 FreeThreaded 属性。此方法始终在调用线程的上下文中执行。它将易失性变量 CancelFlag 设置为 true。辅助方法会定期检查取消标志,如果设置了标志,则停止执行。

NewNumberHandler 事件从辅助线程调用。如果接收端没有线程绑定,则事件将在辅助线程内执行。但这也可以通过声明式方式更改。

到目前为止一切都很简单。但还有一件事需要一点额外的关注。线程绑定机制如何知道何时终止辅助线程?您可能已经猜到了。Dispose 方法负责终止辅助线程。它必须在那里,即使它像本例中一样是空的。ThreadBinding 库会检测对 Dispose 的调用,并在 Dispose 方法在辅助线程上执行之后终止线程。如果对象未派生自 IDisposable,辅助线程最终会在进程结束时终止。如果您有一个必须在应用程序运行时一直存在的单例对象,则可以省略实现 IDisposable

现在回到主窗体。NewNumberHandler 方法是分配给 NewNumber 事件的方法。但是,如果我们直接分配它,该方法将在我们的辅助线程上调用。这是不希望的,因为 GUI 更新必须在 GUI 线程上完成。Form 类派生自 ContextBoundObject 是一个特殊的巧合。但 WPF 类不是。假设 Form 类不派生自 ContextBoundObject,以便我能够展示 ThreadBinding 库如何处理这种情况。

如前所述,ThreadBinding 库的魔力基于 ContextBoundObject 类。每个需要绑定到线程的类都必须派生自它。为了允许绑定不派生自 ContextBoundObject 的类的成员,必须使用一个小技巧。BindInterfaceToThread 扩展方法可以将任何接口绑定到当前线程,只要当前线程具有当前上下文。

但是它是如何工作的呢?BindInterfaceToThread 创建一个实现指定接口的所有方法的包装器。此类派生自 ContextBoundObject,因此可以拦截所有方法调用并将其传输到正确的线程。所有方法实现最终都会调用真实接口的方法。这样,任何接口实现都可以绑定到线程。您只需通过 this.BindInterfaceToThread 创建接口包装器,并使用返回的引用而不是 this 引用。

这就是为什么 MainForm 类实现 IUpdateInterface2。此接口绑定到当前线程,然后 NumberWorker 事件的 NewNumber 方法被分配给 NumberWorker

如您所见,NumberWorkerMainForm 指定了线程如何处理。MainForm 将一个 NewNumber 回调传递给 NumberWorker,该回调会自动将调用转移回 GUI 线程。NumberWorker 已将 CreateNumbers 方法声明为异步,这样,在 NumberWorker 工作时,GUI 线程不会被阻塞。

这就是声明式线程背后的主要思想:只需声明您想要什么,然后让框架完成其余的工作。

管理异步方法调用

ThreadBinding 库确保方法调用始终按顺序执行。这样,调用方和被调用方的耦合度就尽可能松散。但这会产生一些额外的问题。像第一个示例中使用的一个简单的取消标志不能用于取消仍在队列中的异步方法。调用方如何取消异步调用,而不管该调用当前正在执行还是正在等待执行?

在这种情况下,ThreadBinding 库允许异步方法返回 IAsyncCallState 接口。

using System;
using ThreadBound;
using System.Threading;

namespace FormsTest
{
    [ThreadBound(ThreadBoundAttribute.ThreadBinding.WorkerContext)]
    class NumberWorker : ThreadBoundObject, IDisposable
    {
        public delegate void NewNumberHandler(int number);
        public event NewNumberHandler NewNumber;

        [AsyncThreaded]
        public IAsyncCallState CreateNumbers(int limit, int delay)
        {
            for (int i = 0; i <= limit; i++)
            {
                Thread.Sleep(delay);
                if (NewNumber != null)
                    NewNumber(i);

                if (WasCanceled())
                    break;
            }

            return null;
        }

        public void Dispose()
        {
        }
    }
}

NumberWorker 的实现类似于之前的实现。主要区别在于移除了 Cancel 方法,并且 CreateNumbers 现在返回 IAsyncCallState 接口。

“但是 CreateNumbers 总是返回 null”,您可能会说。如果您在没有 AsyncThreaded 属性的情况下调用它,这是正确的。如前所述,ThreadBinding 库无法为异步调用提供返回值。但是,如果调用返回 IAsyncThreaded,则会创建一个合成返回值。返回的接口可用于取消方法调用。

如果调用当前未执行,则简单地取消执行。该方法从未被调用。如果该方法当前正在执行,则 ThreadBoundObject 基类的 WasCanceled 成员返回 true。这是 ContextBoundObjectThreadBoundObject 之间的区别。ThreadBoundObject 实现 WasCanceled。对于同步方法,WasCanceled 始终返回 false。如果方法调用是异步的,ThreadBinding 库将确保显示当前正在执行方法的正确取消状态。

这样,队列中的每个方法都可以被取消,而无论其当前执行状态如何。

让我们更新示例以使用新的 NumberWorker

using System;
using System.Windows.Forms;
using ThreadBound;

namespace FormsTest
{
    public interface IUpdateInterface
    {
        void NewNumber(int number);
    }

    public partial class MainForm :
        Form,
        IUpdateInterface
    {
        private NumberWorker numWorker;
        private IUpdateInterface threadBoundUpdater;
        private ThreadBound.IAsyncCallState callState;

        public MainForm()
        {
            InitializeComponent();

            numWorker= new NumberWorker();

            threadBoundUpdater = this.BindInterfaceToThread<IUpdateInterface>();

            numWorker.NewNumber += threadBoundUpdater.NewNumber;
        }

        private void BtnStart_Click(object sender, EventArgs e)
        {
            callState = numWorker.CreateNumbers(10, 500, new Test());
        }

        private void BtnCancel_Click(object sender, EventArgs e)
        {
            if (callState != null)
           callState.Cancel();
        }

        public void NewNumber(int number)
        {
            LBNumbers.Items.Add(number.ToString());
        }

        private void BtnDispose_Click(object sender, EventArgs e)
        {
            numWorker.Dispose();
        }
    }
}

请注意,BtnCancel_Click 方法会检查 callState 是否为 null。如果调用未标记为异步,则可能会发生这种情况。然后 ThreadBinding 库不会创建合成返回值,而是返回实际的 null 值。为了在代码和被调用程序集之间实现最佳解耦,您应该准备好接收 null 作为异步调用的返回值。

在实际应用程序中,您会排队返回的 IAsyncCallState 引用,以便能够独立取消每个调用。

内部事务

拦截机制有一个无法做到的事情:它无法拦截您自己的类中的调用。这应该不是问题,因为在您的类中,您控制着执行流程。但有时您可能会从一个自由线程方法调用一个线程绑定的方法。由于框架无法拦截此方法调用,因此线程绑定的方法将从自由线程方法的当前线程调用。

这允许您从不同线程的源方法调用内部方法。但是,您也可能通过意外跨越线程边界来损坏您的内部状态。

如果您使用 FreeThreaded 属性并调用其他类方法,则应格外小心。

也许该库的后续版本将包含一个安全的“跨线程样式”调用机制。

幕后

在本章中,我将解释源代码的结构以及各个部分如何协同工作。我不想用函数调用和函数描述来让您感到厌烦,这更多是对源代码进行高层次的概览,以便您入门。有关更详细的描述,请参阅源代码中的注释。

ThreadBound 属性

为了让 ThreadBinding 库工作并发挥其魔力,有必要能够拦截对对象的调用。这样,线程绑定机制就可以决定如何处理函数调用。

ThreadBoundAttribute 派生自 ProxyAttribute。如果一个类被标记为 ProxyAttribute 或派生属性,则该类的实例化将被拦截,并调用 ProxyAttributeCreateInstance

这就是 ThreadBinding 库使用的钩子。CreateInstance 方法然后检查构造函数中传递的线程绑定类型,并实例化或获取所需的 SynchronizationContext

下一步是创建一个新的 ThreadBoundProxy 实例并获取其透明代理实现。此透明代理将返回给调用方,而不是实际实例。

调用应用程序不会注意到拦截。它只是像使用 new 调用创建的实际实例一样使用返回的透明代理。这样,使用这种拦截类型的开发人员不必担心如何创建新实例。一切都像以前一样工作:对类调用 new,就完成了。

处理拦截的方法调用

在方法调用被拦截后,有以下几种进一步处理调用的可能性

  • 我们在正确的上下文(线程)内:只需将方法调用直接转发到 InvokeMethod 方法。创建一个 SyncRemoteMethod 实例来传输参数和返回值。
    • 该方法标记为 FreeThreaded 属性:只需将方法调用直接转发到 InvokeMethod 方法。创建一个 SyncRemoteMethod 实例来传输参数和返回值。
  • 我们在错误的上下文(线程)内:在这种情况下,我们还有两个附加选项
    • 该方法标记为 AsyncThreaded 属性(它派生自 OneWay 属性):执行一些检查以确保方法符合异步方法的规则。创建一个 AsyncRemoteMethod 实例来包装方法调用。然后通过执行上下文的 Post 方法调用 InvokeMethodAsyncRemoteMethod 实例作为“state”参数传递。如果方法返回 IAsyncCallState 接口,则会将 AsyncRemoteMethod 实例强制转换为 IAsyncCallSate 并作为合成返回值返回。
    • 该方法未标记为 AsyncThreaded 属性:创建一个 SyncRemoteMethod 实例来包装方法调用。然后使用执行上下文的 Send 方法调用 InvokeMethodSyncRemoteMethod 实例作为“state”参数传递。Send 调用在方法调用完成之前不会返回。这样,方法调用的返回值就可以放入传递的 SyncRemoteMethod 实例中。存储的返回值由代理返回。这样,同步方法调用就可以返回值。

CheckForShutdown 方法执行一项特殊任务。它在方法调用处理完毕后调用。它实现了执行上下文的关闭逻辑。某些执行上下文实现了 IDispoable 接口。ThreadBinding 库需要知道执行上下文是否不再需要。这就是 CheckForShutdown 方法的目的。它检测被代理实例是否调用了 Dispose,并检查执行上下文是否实现了 IDisposable。如果上下文是可处置的,则调用 Dispose 来关闭执行上下文。

同步上下文

如果您在 WinForms 或 WPF 应用程序中使用 ThreadBinding 库,.NET Framework 会自动创建一个执行上下文。如果您为 ThreadBound 属性设置了 CurrentContext 标志,则会使用它。对于所有其他类型的线程绑定,ThreadBinding 库会实现自己的同步上下文

  • WorkerSynchronizationContext:用于 ThreadBinding.WorkerContext 标志。实现一个辅助线程和一个工作项的 FIFO 缓冲区。这保证了方法调用顺序的保留。Worker 类实现了后台工作线程。其实现没有什么特别之处,除了全局完成事件。每个线程都有自己的完成事件,该事件存储在线程局部变量中。详细信息请参阅注释。
  • PoolSynchronizationContext:非常简单的执行上下文。每个方法调用都在线程池线程上执行。同步调用等待其完成。异步调用被分派到工作线程。此执行上下文不保证任何执行顺序!

AsyncCallState 的实现

IAsyncCallState 的实现 — 特别是异步方法调用的取消 — 需要特别注意。当前运行的方法必须能够查询其取消状态。因为所有类都派生自 ThreadBoundObject,所以最简单的方法是通过基类提供一个 WasCanceled 方法。如果您这样做,基类必须能够从方法调用信息中提取类指针,因此采用了不同的方法

当前 AsyncRemoteMethod 实例的 IRemoteMessage 接口由 InvokeMethod 方法保存到线程静态值 ThreadStatic.currentMethod 中。这样,对 ThreadBoundObject::WasCanceled() 的调用就能够访问分配给当前线程的 IRemoteMessage 接口的 WasCanceled 方法。因为一个线程一次只能运行一个方法,所以有一个线程静态值来存储当前正在运行的方法的 IRemoteMessage 接口就足够了。在 ExecuteMethod 完成运行 AsyncRemoteMethod 实例中存储的方法后,将线程静态值重置为 null

CallStates.Canceled 成员标记为 volatile,以便允许通过 IAsyncCallState.Cancel() 方法设置其值,并允许正在执行方法的线程查询它。如果调用状态设置为 CallStates.CanceledAsyncRemoteMethodWasCanceled() 方法将返回 true

如果您在方法开始执行之前调用 IAsyncCallState.Cancel(),该方法调用将被简单地跳过。这由 ThreadBoundProxyInvokeMethod 方法完成。此方法尝试将 AsyncRemoteMethod 实例的当前状态设置为 CallStates.Running。如果失败(因为被取消的方法无法进入运行状态),则不会执行调用。

接口代理生成器

InterfaceProxyGenerator 类实现了创建派生自 ThreadBoundObject 并实现特定接口的类的魔力。如前所述,所有方法调用都只是转发到真实接口。InterfaceProxyGenerator 类是一个静态类,由所有拦截的接口共享。该类的静态构造函数会生成一个应用程序域,该域是所有动态生成的包装器类的家。InterfaceProxyGenerator 使用缓存来存储生成的包装器类。这降低了动态类创建的开销,因为只有在第一次创建特定接口的包装器时才会产生开销。再次包装接口要快得多,因为类已经被缓存,只需要实例化。

调用 CreateProxy 会创建包装器并返回相应的 MarshalByRefObject。该方法获取对真实接口的引用作为参数。

包装器类是使用 MethodBuilderILGenerator 类生成的。接口通过反射进行扫描,然后生成必要的方法。每个包装器方法的工作方式都相同

  • 获取真实接口的引用并将其推到堆栈上。
  • 将传递给原始方法的参数全部推到堆栈上。
  • 调用原始方法。

包装器生成器的详细信息可以在 GenerateProxyMethod 方法中找到。构造函数的生成由 GenerateProxyCtor 方法处理。它创建一个构造函数,将传递的真实接口引用保存到包装器类的私有字段中。

在生成包装器类或从缓存中提取包装器类后,将调用 Activator.CreateInstance 来创建包装器类的实例。该实例将返回给调用方以供进一步使用。

敬请期待

这个概念验证库非常有用,可以为常见情况提供简单的多线程。它允许您轻松地构建响应式 GUI。但在测试此库时,我注意到了一些缺点

  • 无法确定异步操作的当前状态。
  • 无法等待异步操作完成。
  • 处理同步或异步方法中的未捕获异常。
  • 一种安全机制,用于以不同的线程样式调用私有方法。

我认为可以通过扩展 IAsyncCallState 来消除这些限制。

摘要

我希望这篇文章能够阐明声明式线程的思想。但是,尽管该库使多线程更容易,但它也附带了我认为每个多线程库都应该附带的警告

您必须了解该库的工作原理以及它能做什么和不能做什么。人类天生不善于轻易掌握多线程的所有方面,因为我们的大脑通常是单线程的。我们一次只能专注于一项任务。在实现并行执行路径时要小心。该库使我们能够轻松而优雅地“射穿自己的脚”。但这不会减轻痛苦……

由于使用了调用拦截技术,线程绑定库肯定不适合高性能服务器应用程序。那是 .NET Framework 中已实现的、其他线程模型所能胜任的领域。

修订历史

  • 2011年11月22日:首次发布。
  • 2011年11月23日:更新了源代码并翻译了缺失的注释。
  • 2012年3月13日:在“处理拦截的方法调用”中找到并修复了错误的缩进。

1 好的,我承认为了说明观点,这有点过于简化了。如果线程行为意外更改,可能会发生死锁和其他奇怪的事情。使用程序集的应用程序必须始终尊重被调用方法的线程行为。编写一个真正线程无关的客户端可能是一项艰巨的任务(或一项不可能的任务)。

2 请记住:WinForms 窗体派生自 ContextBoundObject,并且可以立即绑定到线程。但是,我假设这是不可能的,以便向您展示如何使用 BindInterfaceToThread 绑定任何接口。

© . All rights reserved.