65.9K
CodeProject 正在变化。 阅读更多。
Home

通用包装器,用于简单的多线程编程

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (33投票s)

2009年4月16日

CPOL

14分钟阅读

viewsIcon

79503

downloadIcon

728

基于事件的通用包装器和管理器,用于在您的应用程序中实现多线程

引言

在您的应用程序中实现多线程并非易事。对于相对简单的解决方案,自 2.0 版本以来 .NET Framework 中存在的 BackgroundWorker 组件提供了一个直接的答案。

但是,对于更复杂的异步应用程序,Microsoft 建议实现一个遵循 基于事件的异步模式 的类。这很棒,但仍然有点难实现,而且每次需要在新线程中运行某些内容时都必须重复一遍。

我从 .NET 1.1 版本开始,自己做了一些多线程实验。虽然我不会称自己为这方面的专家,但我对几件事情很确定:

  • 不容易
  • 很难调试
  • 组织不应该允许初级开发人员在没有仔细思考和没有经验开发人员(了解多线程的开发人员)的帮助下开始编写多线程应用程序。

为了帮助减轻这些担忧,我一直在思考如何通过某种方式将多线程的固有复杂性封装和处理在可重用的基类和辅助类中,从而使多线程更容易。这样,开发人员就可以少关注多线程带来的技术细节,而更多地关注他们想要作为多线程实现的功能,以及它们如何与主线程交互。

我希望我的包装器和管理器类能够做到:

  • 处理新线程的创建
  • 处理在创建的线程中发生的异常
  • 轻松切换异步和同步运行工作线程(第一是为了观察性能影响,第二是为了帮助调试——尽管我承认这并不完美)
  • 处理父线程和子线程之间的线程间通信(进度报告、在工作线程结束时报告结果、中断请求等)

借助泛型以及 AsyncOperationManagerAsyncOperation 类,这变得非常可行,而且我认为相当简洁。

目标读者

在本文中,我将不描述或解释线程背后的概念、它们如何工作或如何使用它们。本文不是关于线程所有可能方面和技术细节的参考指南。因此,它主要面向已经了解这些细节的开发人员和架构师。您需要理解各种线程原理,例如在线程之间共享对象时的锁定机制以及如何使这些类成为线程安全的,以及如何使用线程更新 UI 等。

如果您想了解这些,Sacha Barber 写了一篇很好的文章,介绍了所有这些概念。您可以在此处找到它:.NET 线程入门指南(第 1 部分)

源代码和演示

我在这篇文章中包含了源代码,以及一个演示项目,该项目同时使用了三个线程(加上主 UI 线程),其中第三个线程由第二个线程管理和启动。

您可以从上面的链接下载源代码。

类图

这是包装器/辅助库的类图。

Click to enlarge image

Click to enlarge image

事件参数类

好的,现在我们来看看代码。首先,我从我的管理器发布了两个不同的事件,所以让我们回顾一下它们的事件参数类。

Namespace Events

#Region "Class BaseEventArgs"

    ''' (summary)
    ''' Abstract base class for all defined events
    ''' (/summary)
    ''' (remarks)(/remarks)
    Public MustInherit Class BaseEventArgs
        Inherits System.EventArgs

        ''' (summary)
        ''' ID of the worker that fires the event
        ''' (/summary)
        ''' (remarks)(/remarks)
        Private aIdentity As String

        ''' (summary)
        ''' ID of the worker that fires the event
        ''' (/summary)
        ''' (value)aIdentity (String)(/value)
        ''' (returns)Identity of the worker that fires the event(/returns)
        ''' (remarks)(/remarks)
        Public ReadOnly Property Identity() As String
            Get
                Return aIdentity
            End Get
        End Property

        ''' (summary)
        ''' Base constructor
        ''' (/summary)
        ''' (param name="identity")Identity of the worker that fires the event(/param)
        ''' (remarks)(/remarks)
        Protected Sub New(ByVal identity As String)
            aIdentity = identity
        End Sub

    End Class

#End Region

#Region "Class WorkerDoneEventArgs"

    ''' (summary)
    ''' Arguments for the event that signals the end of the worker
    ''' (/summary)
    ''' (typeparam name="TResultType")
    ''' Type of the object that will contain the results of the worker
    ''' (/typeparam)
    ''' (remarks)(/remarks)
    Public Class WorkerDoneEventArgs(Of TResultType)
        Inherits BaseEventArgs

        ''' (summary)
        ''' Worker result
        ''' (/summary)
        ''' (remarks)(/remarks)
        Private aResult As TResultType

        ''' (summary)
        ''' Possible exception that has occurred in the worker
        ''' (/summary)
        ''' (remarks)(/remarks)
        Private aException As Exception

        ''' (summary)
        ''' Flags that indicates if the worker has received an interruption request
        ''' (/summary)
        ''' (remarks)(/remarks)
        Private aInterrupted As Boolean

        ''' (summary)
        ''' Worker result
        ''' (/summary)
        ''' (value)aResult (TResultType)(/value)
        ''' (returns)The worker results, or Nothing if an exception occurred(/returns)
        ''' (remarks)(/remarks)
        Public ReadOnly Property Result() As TResultType
            Get
                Return aResult
            End Get
        End Property

        ''' (summary)
        ''' Possible exception that has occurred in the worker
        ''' (/summary)
        ''' (value)aException(/value)
        ''' (returns)Nothing if there was not exception, otherwise the exception
        ''' (/returns)
        ''' (remarks)(/remarks)
        Public ReadOnly Property Exception() As Exception
            Get
                Return aException
            End Get
        End Property

        ''' (summary)
        ''' Flags that indicates if the worker has received an interruption request
        ''' (/summary)
        ''' (value)aInterrupted (Boolean)(/value)
        ''' (returns)True if the worker has received an interruption request,
        ''' otherwise False(/returns)
        ''' (remarks)(/remarks)
        Public ReadOnly Property Interrupted() As Boolean
            Get
                Return aInterrupted
            End Get
        End Property

        ''' (summary)
        ''' Constructor
        ''' (/summary)
        ''' (param name="identity")Identify of the worker(/param)
        ''' (param name="result")Worker result(/param)
        ''' (param name="exception")Exception (if any) that occurred in the worker
        ''' (/param)
        ''' (param name="interrupted")Indicates if the worker
        ''' received an interruption request(/param)
        ''' (remarks)(/remarks)
        Public Sub New(ByVal identity As String, _
                       ByVal result As TResultType, _
                       ByVal exception As Exception, _
                       ByVal interrupted As Boolean)

            MyBase.New(identity)

            If Not result Is Nothing Then
                aResult = result
            End If
            If Not exception Is Nothing Then
                aException = exception
            End If
            aInterrupted = interrupted

        End Sub
    End Class

#End Region

#Region "Class WorkerProgressEventArgs"

    ''' (summary)
    ''' Arguments for the event that signals a progression in the worker
    ''' (/summary)
    ''' (typeparam name="TProgressType")
    ''' Type of the object that will contain the progress data
    ''' (/typeparam)
    ''' (remarks)(/remarks)
    Public Class WorkerProgressEventArgs(Of TProgressType)
        Inherits BaseEventArgs

        ''' (summary)
        ''' Progress data
        ''' (/summary)
        ''' (remarks)(/remarks)
        Private aProgressData As TProgressType

        ''' (summary)
        ''' Progress data
        ''' (/summary)
        ''' (value)aProgressData (TProgressType)(/value)
        ''' (returns)The progress data coming from the worker(/returns)
        ''' (remarks)(/remarks)
        Public ReadOnly Property ProgressData() As TProgressType
            Get
                Return aProgressData
            End Get
        End Property

        ''' (summary)
        ''' Constructor
        ''' (/summary)
        ''' (param name="identity")Identify of the worker(/param)
        ''' (param name="progressData")Progress data(/param)
        ''' (remarks)(/remarks)
        Public Sub New(ByVal identity As String, _
                       ByVal progressData As TProgressType)

            MyBase.New(identity)

            If Not progressData Is Nothing Then
                aProgressData = progressData
            End If

        End Sub

    End Class

#End Region

End Namespace

您可以看到我有一个基类“必须继承”(其他两个类继承自它),其中包含一个 Identity 字段,该字段将存在于两个事件参数类中。Identity 的存在是为了让您能够识别由工作线程处理的工作。

WorkerDoneEventArgs 类有一个泛型 TResultType,它表示用于存储工作线程结果的对象类型。该类还提供了一个成员来存储工作线程中可能发生的异常,以及一个 Interrupted 标志,用于指示是否收到了中断请求。

WorkerProgressEventArgs 类有一个泛型 TProgressType,它表示用于存储工作线程进度数据类型的对象。这可以很简单,比如一个 Integer 来表示进度百分比,或者一个功能齐全的自定义类,用于在每个进度步骤之间保存工作线程所有者所需的其他信息。

工作线程接口 (IWorker)

接下来,我需要一个简单的接口来定义基工作线程类(管理器将使用它)的成员。

Namespace Worker

''' (summary)
''' Worker interface (without any generics)
''' (/summary)
''' (remarks)(/remarks)
Public Interface IWorker

#Region "Properties"

    ''' (summary)
    ''' Used between AsyncManager and WorkerBase, to become AsyncManagerTyped
    ''' (/summary)
    ''' (returns)The handle to the parent async manager(/returns)
    ''' (remarks)(/remarks)
    Property AsyncManagerObject() As Object

    ''' (summary)
    ''' Used by the derived worker class to check
    ''' if an interruption request was received
    ''' (/summary)
    ''' (returns)Flag that indicates
    ''' if an interruption request was received(/returns)
    ''' (remarks)(/remarks)
    ReadOnly Property InterruptionRequested() As Boolean

    ''' (summary)
    ''' Used to hold the results of the worker, as object
    ''' (/summary)
    ''' (returns)Worker result(/returns)
    ''' (remarks)(/remarks)
    ReadOnly Property ResultObject() As Object

    ''' (summary)
    ''' Used to hold the exception (if any) that occurred in the worker
    ''' (/summary)
    ''' (returns)Exception (if any) that occurred in the worker(/returns)
    ''' (remarks)(/remarks)
    ReadOnly Property WorkerException() As Exception

#End Region

#Region "Subs and Functions"

    ''' (summary)
    ''' Called by the AsyncManager to start the worker in asynchronous mode
    ''' (/summary)
    ''' (remarks)(/remarks)
    Sub StartWorkerAsynchronous()

    ''' (summary)
    ''' Called by the AsyncManager to start the worker in synchronous mode
    ''' (/summary)
    ''' (remarks)(/remarks)
    Sub StartWorkerSynchronous()

    ''' (summary)
    ''' Called by the AsyncManager to stop the worker (interruption request)
    ''' (/summary)
    ''' (remarks)(/remarks)
    Sub StopWorker()

#End Region

End Interface

End Namespace

首先,您看到 AsyncManagerObject。它在那里是因为管理器需要将对自身的句柄提供给工作线程实例,以便工作线程可以调用其方法。为什么将其声明为 Object?因为在这个阶段,我不知道将用作泛型的类型来声明管理器的实例。您将在后面的 WorkerBase 类中看到我如何将此版本的 AsyncManagerObject 转换为其强类型版本。

ResultObject 也是类似的。由于我不知道用于指定工作线程类结果对象类型的泛型,因此我必须将其声明为 Object

该接口还包含三个非常明确的方法,用于处理工作线程的启动和停止。

AsyncManager 类

接下来,我们来看管理器类,它是解决方案中最重要的部分之一。

''' (summary)
''' Manager class, handling the execution of the worker
''' in synchronous or asynchronous mode
''' (/summary)
''' (typeparam name="TProgressType")
''' Type of the object that will contain the progress data
''' (/typeparam)
''' (typeparam name="TResultType")
''' Type of the object that will contain the results of the worker
''' (/typeparam)
''' (remarks)(/remarks)
Public Class AsyncManager(Of TProgressType, TResultType)

首先,您会看到该类使用了两个泛型类型。TProgressType 指定了用于存储进度数据(与 WorkerProgressEventArgs 相关)的对象类型。TResultType 指定了用于存储工作线程结果(与 WorkerDoneEventArgs 相关)的对象类型。

这些泛型在管理器代码中到处使用,以允许强类型签名,从而防止使用者总是需要强制转换每个对象(这在性能方面也很昂贵)。

''' (summary)
''' Constructor that receives the required parameters to manage the worker's execution
''' (/summary)
''' (param name="identity")Identity of the worker(/param)
''' (param name="worker")Instance of a worker class
''' derived from Worker.WorkerBase(/param)
''' (remarks)(/remarks)
Public Sub New(ByVal identity As String, _
               ByVal worker As Worker.IWorker)

    aWorker = worker
    aIdentity = identity

    'Give a handle to ourselves to the worker
    aWorker.AsyncManagerObject = Me

End Sub

构造函数接收一个 string,用于唯一标识管理器和工作线程,以及一个声明为 IWorker 的工作线程实例。您可以看到工作线程通过 AsyncManagerObject 属性获得了对管理器的句柄。

''' (summary)
''' Event used to signal the end of the worker
''' (/summary)
''' (remarks)(/remarks)
Public Event WorkerDone As EventHandler(Of Events.WorkerDoneEventArgs(Of TResultType))

''' (summary)
''' Event used to signal a progression in the worker
''' (/summary)
''' (remarks)(/remarks)
Public Event WorkerProgress As EventHandler_
    (Of Events.WorkerProgressEventArgs(Of TProgressType))

接下来,我们看到事件声明。当工作线程完成处理时(无论是正常完成、发生异常后,还是收到中断后),将触发 WorkerDone。事件参数将允许区分可能的结束方式。

WorkerProgress 事件仅在工作线程代码需要时触发。

''' (summary)
''' Handle to the calling thread, used to call methods on the calling thread
''' (/summary)
''' (value)aCallingThreadAsyncOp(/value)
''' (returns)The AsyncOperation associated to the calling thread(/returns)
''' (remarks)(/remarks)
Friend ReadOnly Property CallingThreadAsyncOp() As AsyncOperation
    Get
        Return aCallingThreadAsyncOp
    End Get
End Property

''' (summary)
''' Identity of the worker
''' (/summary)
''' (value)aIdentity (String)(/value)
''' (returns)Identity of the worker(/returns)
''' (remarks)(/remarks)
Public ReadOnly Property Identity() As String
    Get
        Return aIdentity
    End Get
End Property

''' (summary)
''' Indicates if the worker is running
''' (/summary)
''' (value)Boolean(/value)
''' (returns)True if the worker is running, otherwise False(/returns)
''' (remarks)(/remarks)
Public ReadOnly Property IsWorkerBusy() As Boolean
    Get
        Return Not (aWorkerThread Is Nothing)
    End Get
End Property

至于属性,请注意 CallingThreadAsyncOp() As AsyncOperation。这是允许工作线程和管理器线程之间通信的魔法对象。此对象通过 System.ComponentModel.AsyncOperationManagerCreateOperation 方法创建。您将在下方进一步看到我们如何使用此对象在管理器线程上执行方法。

另一个重要的属性是 IsWorkerBusy,它指示工作线程是否存在,这意味着,在我看来,工作线程仍在忙于执行其任务。

''' (summary)
''' Used as SendOrPostCallback, called through CallingThreadAsyncOp
''' (/summary)
''' (param name="state")Object containing the parameters
''' to transfer to the strongly-typed overload(/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerProgressInternalSignal(ByVal state As Object)
    WorkerProgressInternalSignal(DirectCast(state, TProgressType))
End Sub

''' (summary)
''' Used as SendOrPostCallback, called through CallingThreadAsyncOp
''' (/summary)
''' (param name="state")Object containing the parameters
''' to transfer to the strongly-typed overload(/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerExceptionInternalSignal(ByVal state As Object)
    WorkerExceptionInternalSignal(DirectCast(state, Exception))
End Sub

''' (summary)
''' Used as SendOrPostCallback, called through CallingThreadAsyncOp
''' (/summary)
''' (param name="state")Object containing the parameters
''' to transfer to the strongly-typed overload(/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerDoneInternalSignal(ByVal state As Object)
    WorkerDoneInternalSignal(DirectCast(state, TResultType))
End Sub

为了能够在管理器线程(所有者线程)上调用方法,我们必须使用 AsyncOperation 对象(通过 CallingThreadAsyncOp 成员)的 Post 方法。Post 方法需要一种特殊的委托,该委托只接受一个 State(作为 Object)作为参数。您将在下文 WorkerBase 类中看到我们如何调用上述 SendOrPostCallBack 方法。请注意,这些方法仅通过强制转换状态对象为其强类型版本来调用重载签名。

我们将从我们的 WorkerBase 向管理器发出三种不同的状态信号:进度、异常和正常结束。

''' (summary)
''' Sub called by the worker to signal a progression
''' (/summary)
''' (param name="progressData")
''' Progression data (if any)
''' (/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerProgressInternalSignal(ByVal progressData As TProgressType)

    'Prepare and raise the event for the owner to process
    Dim e As Events.WorkerProgressEventArgs(Of TProgressType) = _
                    New Events.WorkerProgressEventArgs(Of TProgressType) _
                                                       (Identity, _
                                                       progressData)

    RaiseEvent WorkerProgress(Me, e)

End Sub

''' (summary)
''' Sub called by the worker to signal an exception
''' (/summary)
''' (param name="workerException")
''' Exception
''' (/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerExceptionInternalSignal(ByVal workerException As Exception)

    If aIsAsynchonous AndAlso Not aWorkerThread Is Nothing Then
        aWorkerThread.Join()
        aWorkerThread = Nothing
    End If

    'Check if the results/exception have already been processed
    '(because the owner was waiting for the worker to end)
    If Not aCancelWorkerDoneEvent Then
        'Prepare and raise the event for the owner to process
        Dim e As Events.WorkerDoneEventArgs(Of TResultType) = _
              New Events.WorkerDoneEventArgs(Of TResultType) _
                                             (Identity, _
                                              Nothing, _
                                              workerException, _
                                              aWorker.InterruptionRequested)

        RaiseEvent WorkerDone(Me, e)
    End If

    'If the worker was running in asynchronous mode,
    'we also need to post the "complete" message
    If aIsAsynchonous Then
        aCallingThreadAsyncOp.PostOperationCompleted(AddressOf DoNothing, Nothing)
    End If

End Sub

''' (summary)
''' Sub called by the worker to signal the end of the work
''' (/summary)
''' (param name="result")
''' Worker result
''' (/param)
''' (remarks)(/remarks)
Friend Overloads Sub WorkerDoneInternalSignal(ByVal result As TResultType)

    If aIsAsynchonous AndAlso Not aWorkerThread Is Nothing Then
        aWorkerThread.Join()
        aWorkerThread = Nothing
    End If

    'Check if the results/exception have already been processed
    '(because the owner was waiting for the worker to end)
    If Not aCancelWorkerDoneEvent Then
        'Prepare and raise the event for the owner to process
        Dim e As Events.WorkerDoneEventArgs(Of TResultType) = _
            New Events.WorkerDoneEventArgs(Of TResultType) _
                                           (Identity, _
                                            result, _
                                            Nothing, _
                                            aWorker.InterruptionRequested)

        RaiseEvent WorkerDone(Me, e)
    End If

    'If the worker was running in asynchronous mode,
    'we also need to post the "complete" message
    If aIsAsynchonous Then
        aCallingThreadAsyncOp.PostOperationCompleted(AddressOf DoNothing, Nothing)
    End If

End Sub

现在,这些是 SendOrPostCallBack 版本的重载方法。在异步模式下,工作线程将调用 SendOrPostCallBack 版本(带有 state 作为参数),因为它由 AsyncOperation.Post 操作需要;在同步模式下,工作线程将直接调用接受强类型版本参数的版本。

这三个方法负责创建所需的特定事件参数实例并引发事件,以便管理器实例的所有者或订阅了事件的任何其他对象都能收到通知。对于 WorkerExceptionInternalSignalWorkerDoneInternalSignal 方法,引发的事件相同,但参数包含的内容不同。在第一种情况下,Exception 不会是 null,但 Result 会。在第二种情况下,Exception 会是 null,但 Result 不应如此。然而,还有另一个小问题。

您将在下面看到管理器有一个由其所有者调用的函数来“等待”工作线程完成(WaitForWorker)。此函数返回与“异常”和“完成”信号相同的 WorkerDoneEventArgs 类。在我的设计中,我得出的结论是,如果所有者在某个时候想要等待工作线程完成,它也应该在请求等待的同一方法中立即处理结果。因此,我决定当所有者正在等待工作线程结束时,不应引发事件本身。这就是 aCancelWorkerDoneEvent 字段的用途。

最后,当异步调用时,这两个“InternalSignal”方法还调用 aCallingThreadAsyncOpAsyncOperation)对象的 PostOperationCompleted。请注意,这反过来调用 private DoNothing 方法,该方法……什么都不做。我不确定 PostOperationCompleted 是否绝对必需,但我决定不冒任何风险,让它在那里,但什么也不做。

让我们转到 public 方法。

''' (summary)
''' Called from the owner to start the worker
''' (/summary)
''' (param name="asynchronous")
''' Specifies if the worker must run in asynchronous mode (True) or not (False)
''' (/param)
''' (remarks)(/remarks)
Public Sub StartWorker(ByVal asynchronous As Boolean)

    aIsAsynchonous = asynchronous
    aCancelWorkerDoneEvent = False

    If aIsAsynchonous Then
        'Asynchronous mode - we need to create a thread
        'and start the worker using this thread
        aCallingThreadAsyncOp = AsyncOperationManager.CreateOperation(Nothing)
        aWorkerThread = New Thread(New ThreadStart(AddressOf _
                                   aWorker.StartWorkerAsynchronous))
        aWorkerThread.Start()
    Else
        'Synchronous mode - simply call the worker's start method
        aWorker.StartWorkerSynchronous()
    End If

End Sub

这是我们启动工作线程的地方。您为此方法提供一个简单的 Boolean 参数,以指定您是想以异步(True)还是同步(False)模式运行。如果是异步模式,该方法将负责通过 AsyncOperationManager.CreateOperation 服务创建 AsyncOperation,创建线程,并将其启动点设置为工作线程接口 aWorker 中的 StartWorkerAsynchronous,然后启动线程。

如果是同步模式,它只需调用同一工作线程接口 aWorker 中的 StartWorkerSynchronous 方法。

''' (summary)
''' Called from the owner to stop the worker
''' (/summary)
''' (remarks)(/remarks)
Public Sub StopWorker()

    'Signal the worker to stop working
    aWorker.StopWorker()

End Sub

此服务仅调用其在工作线程接口 aWorker 中的对应项,以请求其停止。

''' (summary)
''' Called from the owner to wait for the worker to complete
''' (/summary)
''' (remarks)(/remarks)
Public Function WaitForWorker() As Events.WorkerDoneEventArgs(Of TResultType)

    If (Not aWorkerThread Is Nothing) AndAlso aWorkerThread.IsAlive Then

        aWorkerThread.Join()
        aWorkerThread = Nothing

    End If

    'Since the results (or exception) are returned through
    'this function to be immediately processed by
    'the owner waiting for the worker's completion, we cancel the WorkerDone event
    aCancelWorkerDoneEvent = True

    Return New Events.WorkerDoneEventArgs(Of TResultType) _
                        (Identity, _
                         DirectCast(aWorker.ResultObject, TResultType), _
                         aWorker.WorkerException, _
                         aWorker.InterruptionRequested)

End Function

如上所述,此服务用于等待工作线程完成其工作。当它完成时,它会设置 aCancelWorkerDoneEvent 标志以防止 WorkerDone 事件触发,而是直接返回事件参数。也许这不是最好的设计,因为事件参数可能只应用于事件,但这是我目前能提供的。

''' (summary)
''' Called from the owner to stop and wait for the worker
''' (/summary)
''' (remarks)(/remarks)
Public Function StopWorkerAndWait() As Events.WorkerDoneEventArgs(Of TResultType)

    Dim result As Events.WorkerDoneEventArgs(Of TResultType) = Nothing

    If (Not aWorkerThread Is Nothing) AndAlso aWorkerThread.IsAlive Then

        StopWorker()
        result = WaitForWorker()

    End If

    Return result

End Function

最后,第三个服务是前两个的混合。它首先请求工作线程停止,这只是一个发送并将在子线程上执行的请求,因此在调用完成后,执行将继续在管理器中,这并不意味着工作线程已经处理了中断请求。

因此,它还等待工作线程处理中断请求并完成,以确保线程处于非活动状态。

WorkerBase 类

管理器类就到这里。现在,我们来看我的包装器设计的最后一部分,即 abstract WorkerBase 类。

''' (summary)
''' Abstract base worker class from which to inherit to create its own worker class
''' (/summary)
''' (typeparam name="TInputParamsType")
''' Type of the object that will contain the input parameters required by the worker
''' (/typeparam)
''' (typeparam name="TProgressType")
''' Type of the object that will contain the progress data
''' (/typeparam)
''' (typeparam name="TResultType")
''' Type of the object that will contain the results of the worker
''' (/typeparam)
''' (remarks)(/remarks)
Public MustInherit Class WorkerBase(Of TInputParamsType, _
       TProgressType, TResultType)
       Implements IWorker

首先,您会看到我们在管理器和事件参数类中使用的熟悉的泛型。这次,除了 TProgressTypeTResultType,我们还有一个 TInputParamsType,它定义了用于保存工作线程类所需的参数的对象类型。

由于此类被定义为 MustInherit,因此我们必须在派生类声明中指定泛型类型,如下所示:

Friend Class SubWorker1
    Inherits SIGLR.Async.GenericWrapper.Worker.WorkerBase(Of SubWorker1Input, _
             Integer, SubWorker1Result)

这个特定的工作线程类继承自 WorkerBase 类,其输入参数为 SubWorker1Input,进度数据为 Integer,结果为 SubWorker1Result

同样,这些泛型在基类中随处使用,以允许强类型签名,并防止使用者总是需要强制转换每个对象。

''' (summary)
''' Constructor receiving the input parameters required by the worker
''' (/summary)
''' (param name="inputParams")Input parameters required by the worker(/param)
''' (remarks)(/remarks)
Protected Sub New(ByVal inputParams As TInputParamsType)

    aInputParams = inputParams

End Sub

基构造函数仅需要指定为泛型 TInputParamsType 类型的输入参数。这些参数将被传递给 DoWork 方法,该方法是 MustOverride 的,并且包含工作线程将执行的“实际工作”。

''' (summary)
''' Used between AsyncManager and WorkerBase, to become AsyncManagerTyped
''' (/summary)
''' (value)aAsyncManagerObject (Object)(/value)
''' (returns)The handle to the parent AsyncManager instance(/returns)
''' (remarks)(/remarks)
Friend Property AsyncManagerObject() As Object Implements IWorker.AsyncManagerObject
    Get
        Return aAsyncManagerObject
    End Get
    Set(ByVal value As Object)
        aAsyncManagerObject = value
    End Set
End Property

''' (summary)
''' Indicates if an interruption request was received
''' (/summary)
''' (value)aInterruptionRequested (Boolean)(/value)
''' (returns)True if an interruption request was received, otherwise False(/returns)
''' (remarks)(/remarks)
Protected Friend ReadOnly Property InterruptionRequested() As Boolean _
                                   Implements IWorker.InterruptionRequested
    Get
        Return aInterruptionRequested
    End Get
End Property

''' (summary)
''' Used to hold the results of the worker, as object
''' (/summary)
''' (value)aResult(/value)
''' (returns)Le résultat du travail(/returns)
''' (remarks)(/remarks)
Friend ReadOnly Property ResultObject() As Object _
                         Implements IWorker.ResultObject
    Get
        Return aResult
    End Get
End Property

''' (summary)
''' Used to hold the exception (if any) that occurred in the worker
''' (/summary)
''' (value)aWorkerException(/value)
''' (returns)Exception (if any) that occurred in the worker(/returns)
''' (remarks)(/remarks)
Friend ReadOnly Property WorkerException() As Exception _
                         Implements IWorker.WorkerException
    Get
        Return aWorkerException
    End Get
End Property

由于基类实现了 IWorker 接口,因此这些是必需的属性。除了 AsyncManagerObjectResultObject 之外,没有什么特别之处,我已经在上面的接口部分讨论过它们。

''' (summary)
''' Function to hold the worker code (to actually do the work!)
''' (/summary)
''' (param name="inputParams")Paramètres d'entrée de type TTypeParametre(/param)
''' (returns)Result (as TTypeResultat) of the worker(/returns)
''' (remarks)(/remarks)
Protected MustOverride Function DoWork(ByVal inputParams _
                       As TInputParamsType) As TResultType

这是 DoWork 方法,它是 MustOverride 的。它通过 TInputTypeParamsType 泛型接收其参数,并以泛型类型 TResultType 的对象形式返回其结果。够简单了吧?

在这里,您需要了解线程的工作原理。如果此方法中使用的对象有可能被其他线程(包括调用线程)共享,则这些对象类应使用某些锁定机制使其“线程安全”。如本文开头的“受众”部分所述,您可能想阅读 Sacha Barber 关于线程的文章以了解所有这些信息

''' (summary)
''' Called from the AsyncManager to start the worker in asynchronous mode
''' (/summary)
''' (remarks)(/remarks)
Friend Sub StartWorkerAsynchronous() Implements IWorker.StartWorkerAsynchronous

    aIsAsynchronous = True
    aInterruptionRequested = False

    'We can strongly-type the AsyncManager parent object
    aAsyncManagerTyped = DirectCast(aAsyncManagerObject, _
                         AsyncManager(Of TProgressType, TResultType))

    'Set the SendOrPostCallback delegate to signal progress
    aWorkerProgressInternalSignalCallback = New  _
        SendOrPostCallback(AddressOf aAsyncManagerTyped.WorkerProgressInternalSignal)

    'Set the SendOrPostCallback delegate to signal the normal end of the worker
    Dim workerDoneInternalSignalCallback As SendOrPostCallback = New  _
        SendOrPostCallback(AddressOf aAsyncManagerTyped.WorkerDoneInternalSignal)

    'Set the SendOrPostCallback delegate to signal
    'an exception that occurred in the worker
    Dim workerExceptionInternalSignalCallback As SendOrPostCallback = New  _
                        SendOrPostCallback(AddressOf _
                              aAsyncManagerTyped.WorkerExceptionInternalSignal)

    'We must catch all exceptions
    Try

        'Do the actual work
        aResult = DoWork(aInputParams)

        'When the worker is done, we must signal the AsyncManager (on its own thread)
        aAsyncManagerTyped.CallingThreadAsyncOp.Post(_
                           workerDoneInternalSignalCallback, aResult)

    Catch ex As Exception
        'When an exception occurs, we must signal the AsyncManager (on its own thread)
        aWorkerException = ex
        aAsyncManagerTyped.CallingThreadAsyncOp.Post(_
              workerExceptionInternalSignalCallback, aWorkerException)

    End Try

End Sub

好的……这是管理器创建的新线程的启动方法。请注意,我们如何强制转换通过 IWorker 接口的 AsyncManagerObject 属性作为对象传递的 AsyncManager。现在,它可以强制转换为其强类型版本 AsyncManagerTyped,因为我们知道要使用的泛型类型(它们与类声明中使用的泛型类型相同)。

该方法还定义了三个必需的 SendOrPostCallBack 委托,指向 AsyncManager 接收状态对象的三个方法。请注意,进度方法被声明为类全局属性,而其他两个只是方法内的变量。这是因为进度委托在该方法之外是必需的,如您将在下文中看到的。

最后,该方法调用 DoWork 函数。它在 Try/Catch 中完成,以便如果发生未处理的异常,它能够使用异常作为参数调用 workerExceptionInternalSignalCallback 委托。否则,当函数返回时,它将使用结果作为参数调用 workerDoneInternalSignalCallback 委托。这两个委托都通过管理器 CallingThreadAsyncOpPost 服务调用,以便在管理器线程上执行。

''' (summary)
''' Called from the AsyncManager to start the worker in synchronous mode
''' (/summary)
''' (remarks)(/remarks)
Friend Sub StartWorkerSynchronous() Implements IWorker.StartWorkerSynchronous

    aIsAsynchronous = False
    aInterruptionRequested = False

    'We can strongly-type the AsyncManager parent object
    aAsyncManagerTyped = DirectCast(aAsyncManagerObject, _
                         AsyncManager(Of TProgressType, TResultType))

    'We must catch all exceptions
    Try

        'Do the actual work
        aResult = DoWork(aInputParams)

        'When the worker is done, we must signal
        'the AsyncManager (we're on the same thread)
        aAsyncManagerTyped.WorkerDoneInternalSignal(aResult)

    Catch ex As Exception
        'When an exception occurs, we must signal
        'the AsyncManager (we're on the same thread)
        aWorkerException = ex
        aAsyncManagerTyped.WorkerExceptionInternalSignal(aWorkerException)

    End Try

End Sub

尽管与异步启动工作线程的方法非常相似,但这个方法更简单、更直接,因为它不需要任何 SendOrPostCallBack 委托,因为一切都在同一个线程上运行。

''' (summary)
''' Called by the derived class to signal a progression in the work process
''' (/summary)
''' (param name="progressData")
''' Object (as TProgressType) containing the progression data
''' (/param)
''' (remarks)(/remarks)
Protected Sub WorkerProgressSignal(ByVal progressData As TProgressType)

    If aIsAsynchronous Then
        'We are in asynchronous mode - the call
        'must be performed on the calling thread
        aAsyncManagerTyped.CallingThreadAsyncOp.Post(_
            aWorkerProgressInternalSignalCallback, progressData)
    Else
        'We are in synchronous mode - the call can be performed directly
        aAsyncManagerTyped.WorkerProgressInternalSignal(progressData)
    End If

End Sub

此方法的范围是 Protected,因为它将由派生类调用以发出某种进度的信号。它接收 TProgressType 作为进度数据。如果工作线程以异步模式运行,则该方法使用 StartWorkerAsynchronous 方法中先前定义的 aWorkerProgressInternalSignalCallback SendOrPostCallBack 委托。它通过管理器 CallingThreadAsyncOpPost 服务执行此操作,以便在管理器线程上执行委托方法。

如果不是异步模式,它将直接调用管理器中的 WorkerProgressInternalSignal 方法。

''' (summary)
''' Called from the AsyncManager to stop the worker
''' (/summary)
''' (remarks)(/remarks)
Friend Sub StopWorker() Implements IWorker.StopWorker
    aInterruptionRequested = True
End Sub

最后一个方法由管理器中的对应方法调用。它只是将 aInterruptionRequested 标志设置为 True,以便派生类可以检测到请求并以干净的方式停止其工作。

如何使用包装器/管理器

对于您想要在单独线程中运行的每种不同功能,您需要:

  1. 创建一个继承自 WorkerBase 类的类,并将您的工作代码放在 DoWork 方法中。
  2. (可选)创建一个类来保存您要完成的工作所需的输入参数(这将成为 TInputParamsType 泛型)。
  3. (可选)创建一个类来保存您想在工作各个阶段(可能在循环中)在工作线程和调用线程之间传递的进度数据(这将成为 TProgressType 泛型)。
  4. (可选)创建一个类来保存工作结果,供调用线程处理(这将成为 TResultType 泛型)。

如果您不需要一个或多个泛型类型(例如,您不需要任何输入参数,或者您的工作不报告进度,或者不返回结果),您可以使用任何其他标准类型(如 Integer)作为各种声明的泛型,这无关紧要。

然后,从您想启动并行工作的某个代码点开始,您可以使用 AsyncManager 类的新实例,将其声明为 WithEvents 以便能够处理它将引发的两个可能事件(WorkerDoneWorkerProgress),并为其提供您工作线程类的新实例,然后调用管理器的 StartWorker 方法。

在您关心的事件处理程序中添加一些代码,这样就完成了!听起来很简单吗?请查看本文随附源代码中包含的演示项目。如果您有任何疑问,我将很乐意回答。

尽管如此,在不久的将来(取决于我的空闲时间!),我将写一篇跟进文章,详细解释如何使用该库。

结论

当然,仍有很大的改进空间。例如,目前,一个管理器实例一次只能处理一个线程。如果您反复调用 StartWorker,可能会得到意外的结果。

您可能会看到其他几个可以改进的地方。老实说,这就是我最初决定发布这篇文章的原因,这样我就可以从您这样经验丰富的开发人员的观点和想法中受益……

版本历史

  • 2014-05-22
    • 创建了演示的 C# 版本
  • 2009-04-22
    • 更新了源代码以包含该库的 C# 版本,并改进了一些代码(修复了多个代码分析警告)
    • 将代码列表更新到最新库版本
    • 添加了类图部分
    • 更新了“如何使用包装器/管理器”部分,以宣布即将发布的跟进文章
    • 添加了可执行演示下载
    • 删除了“通知”部分
  • 2009-04-18
    • 添加了“受众”部分,并链接到 Sacha Barber 的线程入门文章
    • 添加了版本历史记录部分
  • 2009-04-16
    • 原始版本
© . All rights reserved.