理解 SynchronizationContext:第二部分






4.96/5 (71投票s)
创建您自己的 Synchronization Context。
引言
在第一部分中,我解释了 SynchronizationContext
在 .NET Framework 2.0 中的作用。它主要用于允许线程与 UI 线程通信。我们在第一部分了解到 SynchronizationContext
本身并不能在线程之间封送代码;事实上,这个类应该是一个抽象类。 .NET Framework 为我们提供了一个该类的版本,用于将代码封送到 UI 线程,但如何编写您自己的 SynchronizationContext
版本来做其他事情呢?听起来像是一项艰巨的任务,但实际上并没有那么糟糕。我将在这篇文章中展示如何编写您自己的 SynchronizationContext
,用于将任何线程的代码封送到一个 STA 线程。在本系列的第三部分,我将在 WCF 框架中利用这个类。但是,在奔跑之前,我们必须学会走路。所以,让我们开始吧。
将工作切换到 STA 线程
您可能会想,我为什么要写这个呢?STA 是一种由 COM 使用的线程模型。很久以前,在 .NET Remoting 和 WCF 时代(听起来像是百年前),开发人员编写的 COM 类只能在单线程公寓(STA)上运行。为什么?因为 COM 运行时会为开发人员处理线程封送,并始终确保您的 COM 类在同一个线程上执行。这样,COM 开发人员就不需要担心多线程、互斥体、信号量、事件以及所有其他多线程工具。仅供记录,COM 还提供了一个多线程公寓模型(供勇敢者使用)。使用 MTA,开发人员必须处理多线程问题,但拥有更多控制权。关于 MTA 和 STA 有很多文档,您只需在 Google 上搜索“STA MTA”即可阅读所有历史。感谢上帝,我不再需要编写 COM 代码了。然而,仍然存在很多 COM 组件,在我们公司,很多业务逻辑都写在只能在 STA 线程上执行的 COM 类中。为了能够从 .NET 中的任何线程调用这些类,我决定编写一个自定义的 STA 线程同步上下文。经过所有的汗水和努力,我觉得应该与您分享,希望一些不幸的开发人员能够从中受益。虽然这篇文章解释了如何将代码封送到 STA 线程,但您可以根据这篇文章的信息,让您自己的同步上下文做其他事情。
我们如何在两个线程之间切换?
第一个问题是如何管理两个正在运行的线程之间的封送。这个问题通常通过实现一种双方线程都可以读取和写入的通用通信块来解决。两个线程之间的理想通信对象是一个队列。队列使我们能够根据调用顺序将工作从一个线程发送到另一个线程。这是典型的生产者/消费者模型,其中一个线程扮演消费者(从队列读取),另一个线程扮演生产者(向队列写入项)。为了简化,让我们看看这可能如何工作
- 线程 1:将消息发送到公共队列。
- 线程 2:监听来自公共队列的传入消息。在我们的例子中,线程 2 将是一个 STA 线程。
深入了解阻塞队列
我想要一个队列来排队从线程 X 到我的 STA 线程的工作项。我还希望我的线程只在队列中有项时才出队。如果没有项,我希望 Dequeue
方法等待直到有东西进入队列。通常,这被称为“阻塞队列”。让我们看看代码
internal interface IQueueReader<t> : IDisposable
{
T Dequeue();
void ReleaseReader();
}
internal interface IQueueWriter<t> : IDisposable
{
void Enqueue(T data);
}
internal class BlockingQueue<t> : IQueueReader<t>,
IQueueWriter<t>, IDisposable
{
// use a .NET queue to store the data
private Queue<t> mQueue = new Queue<t>();
// create a semaphore that contains the items in the queue as resources.
// initialize the semaphore to zero available resources (empty queue).
private Semaphore mSemaphore = new Semaphore(0, int.MaxValue);
// a event that gets triggered when the reader thread is exiting
private ManualResetEvent mKillThread = new ManualResetEvent(false);
// wait handles that are used to unblock a Dequeue operation.
// Either when there is an item in the queue
// or when the reader thread is exiting.
private WaitHandle[] mWaitHandles;
public BlockingQueue()
{
mWaitHandles = new WaitHandle[2] { mSemaphore, mKillThread };
}
public void Enqueue(T data)
{
lock (mQueue) mQueue.Enqueue(data);
// add an available resource to the semaphore,
// because we just put an item
// into the queue.
mSemaphore.Release();
}
public T Dequeue()
{
// wait until there is an item in the queue
WaitHandle.WaitAny(mWaitHandles);
lock (mQueue)
{
if (mQueue.Count > 0)
return mQueue.Dequeue();
}
return default(T);
}
public void ReleaseReader()
{
mKillThread.Set();
}
void IDisposable.Dispose()
{
if (mSemaphore != null)
{
mSemaphore.Close();
mQueue.Clear();
mSemaphore = null;
}
}
}
- 因为这个队列被多个线程使用,请注意,我正在使用
lock
语句来阻止对队列的访问。 - 通常,我会在有项进入队列之前阻塞
Dequeue
方法。工作原理是使用一个信号量,该信号量代表队列中的所有项作为资源。当信号量首次创建时,队列是空的,因此没有可用的资源,所以调用Dequeue
将会阻塞(请注意,第一个参数为零表示没有可用资源,第二个参数为一个大数字表示队列的大小)。private Semaphore mSemaphore = new Semaphore(0, int.MaxValue);
- 请注意,当我出队一项时,我会在一组
WaitHandle
(WaitHandle.WaitAny(mWaitHandles);)
上阻塞。这段代码的意思是“等待直到有消息,或者直到读取线程被标记为停止运行。” - 我还没有展示实际的线程,我将在下一步展示。但是,STA 线程将是读取线程,大部分时间将花费在等待队列中的消息或处理队列中的消息。
- 请注意,当消息入队到队列中时,它会释放信号量,表示有可用资源;这将导致
Dequeue
方法解除阻塞。 - 信号量的最大限制是
Int.Max
;只要线程出队的速度与入队速度大致相同,我们就永远不会接近这个限制。
SendOrPostCallbackItem 类
请注意,阻塞队列类是泛型的,这是为了以防万一我决定在另一个应用程序中重用它(您也可以随意为您的需求使用它)。那么,我们计划将什么放入这个队列?考虑到这个队列负责在线程之间封送代码,排队的理想项是委托。不过,我们需要的不只是一个简单的委托,而是一个 SendOrPostCallback
委托。
internal enum ExecutionType
{
Post,
Send
}
internal class SendOrPostCallbackItem
{
object mState;
private ExecutionType mExeType;
SendOrPostCallback mMethod;
ManualResetEvent mAsyncWaitHandle = new ManualResetEvent(false);
Exception mException = null;
internal SendOrPostCallbackItem(SendOrPostCallback callback,
object state, ExecutionType type)
{
mMethod = callback;
mState = state;
mExeType = type;
}
internal Exception Exception
{
get { return mException; }
}
internal bool ExecutedWithException
{
get { return mException != null; }
}
// this code must run ont the STA thread
internal void Execute()
{
if (mExeType == ExecutionType.Send)
Send();
else
Post();
}
// calling thread will block until mAsyncWaitHandle is set
internal void Send()
{
try
{
// call the thread
mMethod(mState);
}
catch (Exception e)
{
mException = e;
}
finally
{
mAsyncWaitHandle.Set();
}
}
/// <summary />
/// Unhandled exceptions will terminate the STA thread
/// </summary />
internal void Post()
{
mMethod(mState);
}
internal WaitHandle ExecutionCompleteWaitHandle
{
get { return mAsyncWaitHandle; }
}
}
SendOrPostCallbackItem
包含我们希望在 STA 线程上执行的委托。Send
和Post
实际上是辅助方法,它们都负责启动代码,并且都设计为从 STA 线程调用。但是,因为Send
需要阻塞,并将异常报告回调用线程(非 STA 线程),我在执行完成后使用ManualResentEvent
。我还跟踪异常;如果存在异常,它将在非 STA 线程(生产者线程)上抛出。Post
很简单。它只是调用方法,无需通知完成,也无需跟踪异常。
总的来说,这个类负责两项主要任务:存储要执行的委托,并以两种可能模式执行它:Send
和 Post
。Send
需要额外的跟踪(例如异常和完成通知)。Post
只执行方法而不做其他事情。通常,如果 Post
在 STA 线程上执行,委托报告的任何异常都会导致线程终止。我将在文章的第三部分介绍 WCF 时更详细地解释这一点。但就目前而言,请记住这个问题。
STA 线程及其所有荣耀
最后,我们可以展示和解释这个同步上下文的核心内容。现在我们有了队列,并且知道我们要向其中放入什么,让我们来看看 STA 线程(负责封送代码的线程)。
internal class StaThread
{
private Thread mStaThread;
private IQueueReader<sendorpostcallbackitem> mQueueConsumer;
private ManualResetEvent mStopEvent = new ManualResetEvent(false);
internal StaThread(IQueueReader<sendorpostcallbackitem> reader)
{
mQueueConsumer = reader;
mStaThread = new Thread(Run);
mStaThread.Name = "STA Worker Thread";
mStaThread.SetApartmentState(ApartmentState.STA);
}
internal void Start()
{
mStaThread.Start();
}
internal void Join()
{
mStaThread.Join();
}
private void Run()
{
while (true)
{
bool stop = mStopEvent.WaitOne(0);
if (stop)
{
break;
}
SendOrPostCallbackItem workItem = mQueueConsumer.Dequeue();
if (workItem != null)
workItem.Execute();
}
}
internal void Stop()
{
mStopEvent.Set();
mQueueConsumer.ReleaseReader();
mStaThread.Join();
mQueueConsumer.Dispose();
}
}
这个类中最重要的一部分在构造函数中,所以让我们再看一遍。
internal StaThread(IQueueReader<sendorpostcallbackitem> reader)
{
mQueueConsumer = reader;
mStaThread = new Thread(Run);
mStaThread.Name = "STA Worker Thread";
mStaThread.SetApartmentState(ApartmentState.STA);
}
- 这个类接受一个
IQueueReader
类型的接口,这实际上是我们的阻塞队列。我决定在这里放一个接口的原因是这个线程是读取线程,不应该有权访问写入方法。 - 线程被设置为 STA 线程。给线程命名有助于在调试时使用线程输出窗口。
- 请注意,线程尚未启动。一个名为
Start
的方法将启动线程,这将在我们的StaSynchronizationContext
类中发生,我将在稍后展示。
让我们来看看 Run
方法。Run
方法代表我们的 STA 线程。它的主要任务是从阻塞队列中出队项并执行它们。在 Run
方法中执行任何工作项意味着在 STA 线程上执行它们。因此,无论哪个线程将它们放入队列,并不真正重要,重要的是项在 STA 线程内读取,并在 STA 线程中执行。如果您仔细想想,这实际上就是线程封送的体现。
private void Run()
{
while (true)
{
bool stop = mStopEvent.WaitOne(0);
if (stop)
{
break;
}
SendOrPostCallbackItem workItem = mQueueConsumer.Dequeue();
if (workItem != null)
workItem.Execute();
}
}
我尽量使 Run
方法尽可能简单,但让我们强调几点。
- STA 线程一直都在运行,所以我做了一个
while(true)
循环。通常,我不太喜欢这种循环,但我希望代码的读者明白,这个线程不应该停止运行,除非上下文类被释放。一个while(true)
发出了这种信息。 mStopEvent
是一个ManualResetEvent
,当 STA 线程被标记为停止运行时,它会被触发。当调用Stop()
方法时,mStopEvent
会被设置,导致主循环退出。Stop
方法还通过标记队列停止处理消息来释放任何正在等待的出队操作。mQueueConsumer.Dequeue()
负责从队列中读取工作项。此方法将阻塞,直到队列中有工作项。- 当工作项出队时,工作项会被执行。
Execute()
,如果您还记得的话,将执行与工作项关联的委托中的代码。正是在这个Execute
方法中,代码在 STA 线程上被封送。
创建 STA 同步上下文类
我们几乎拥有了运行 STA Sync Context 所需的所有组件。我们有一个包含要在 STA 线程上执行的委托的工作项。我们有一个小巧的阻塞队列来处理 STA 线程与任何其他线程之间的通信。我们甚至还有一个小的 STA Run
方法,它一直在查看我们的队列,从中取出消息,并运行任何取出的工作项。现在我们唯一缺少的是实际的 Synchronization Context 类本身。所以,让我们看看它,并详细回顾一下代码...
StaSynchronizationContext.cs
public class StaSynchronizationContext : SynchronizationContext, IDisposable
{
private BlockingQueue<sendorpostcallbackitem > mQueue;
private StaThread mStaThread;
public StaSynchronizationContext()
: base()
{
mQueue = new BlockingQueue<sendorpostcallbackitem />();
mStaThread = new StaThread(mQueue);
mStaThread.Start();
}
public override void Send(SendOrPostCallback d, object state)
{
// create an item for execution
SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
ExecutionType.Send);
// queue the item
mQueue.Enqueue(item);
// wait for the item execution to end
item.ExecutionCompleteWaitHandle.WaitOne();
// if there was an exception, throw it on the caller thread, not the
// sta thread.
if (item.ExecutedWithException)
throw item.Exception;
}
public override void Post(SendOrPostCallback d, object state)
{
// queue the item and don't wait for its execution. This is risky because
// an unhandled exception will terminate the STA thread. Use with caution.
SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
ExecutionType.Post);
mQueue.Enqueue(item);
}
public void Dispose()
{
mStaThread.Stop();
}
public override SynchronizationContext CreateCopy()
{
return this;
}
}
这实际上是使用了我之前展示的所有其他类的类。我将其命名为 StaSynchronizationContext
,因为它负责将代码封送到 STA 线程,允许调用者执行必须在 STA 线程上运行的 COM API。让我们看看负责在 STA 线程上发送工作的 Send API。请注意,此类继承自 SynchronizationContext
,但会覆盖默认的 Send
和 Post
方法。
public override void Send(SendOrPostCallback d, object state)
{
// create an item for execution
SendOrPostCallbackItem item =
new SendOrPostCallbackItem(d, state, ExecutionType.Send);
// queue the item
mQueue.Enqueue(item);
// wait for the item execution to end
item.ExecutionCompleteWaitHandle.WaitOne();
// if there was an exception, throw it on the caller thread, not the
// sta thread.
if (item.ExecutedWithException)
throw item.Exception;
}
请注意,send 操作是一个阻塞操作,这意味着我们一直阻塞直到 STA 线程上的操作完成。记住,我们在 SendOrPostCallbackItem
类上放置了一个 ManualReset
事件,这样我们就知道何时执行完成。我们还捕获并缓存 SendOrPostCallbackItem
中的任何异常,以便我们可以在调用线程上抛出它们,而不是在 STA 线程上。另一方面,Post
不是一个等待调用,所以我们所做的只是将项入队,并且我们不等待委托执行完成。
就是这样,我们现在有了一个 SynchronizationContext
,它将在任何线程之间将代码封送到一个 STA 线程。在我的例子中,这将允许我在 STA 线程中执行 COM API,这样 COM 类就可以感觉宾至如归,就像它们在 VB6 中运行一样。为了实际测试这个类,我创建了一个测试程序;这是代码
public class Params
{
public string Output {get; set;}
public int CallCounter { get; set; }
public int OriginalThread { get; set; }
}
class Program
{
private static int mCount = 0;
private static StaSynchronizationContext mStaSyncContext = null;
static void Main(string[] args)
{
mStaSyncContext = new StaSynchronizationContext();
for (int i = 0; i < 100; i++)
{
ThreadPool.QueueUserWorkItem(NonStaThread);
}
Console.WriteLine("Processing");
Console.WriteLine("Press any key to dispose SyncContext");
Console.ReadLine();
mStaSyncContext.Dispose();
}
private static void NonStaThread(object state)
{
int id = Thread.CurrentThread.ManagedThreadId;
for (int i = 0; i < 10; i++)
{
var param = new Params { OriginalThread = id, CallCounter = i };
mStaSyncContext.Send(RunOnStaThread, param);
Debug.Assert(param.Output == "Processed", "Unexpected behavior by STA thread");
}
}
private static void RunOnStaThread(object state)
{
mCount++;
Console.WriteLine(mCount);
int id = Thread.CurrentThread.ManagedThreadId;
var args = (Params)state;
Trace.WriteLine("STA id " + id + " original thread " +
args.OriginalThread + " call count " + args.CallCounter);
args.Output = "Processed";
}
}
测试程序将使用线程池创建多个线程。这些线程池线程然后使用 StaSynchronizationContext
在 RunOnStaThread
方法中执行代码。运行此测试程序,我从 100 个 .NET 线程中冲击了 STA 线程,每个线程将 RunOnStaThread
代码封送 10 次。请注意,STA 线程 ID 始终相同。请看下面的结果,STA 线程始终是 11,并且调用来自多个线程(我已缩小输出)。
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
STA id 11 original thread 7 call count 7
STA id 11 original thread 12 call count 7
STA id 11 original thread 7 call count 8
STA id 11 original thread 12 call count 8
STA id 11 original thread 7 call count 9
STA id 11 original thread 12 call count 9
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
STA id 11 original thread 7 call count 7
STA id 11 original thread 12 call count 7
STA id 11 original thread 7 call count 8
STA id 11 original thread 12 call count 8
STA id 11 original thread 7 call count 9
STA id 11 original thread 12 call count 9
STA id 11 original thread 7 call count 0
STA id 11 original thread 12 call count 0
STA id 11 original thread 7 call count 1
STA id 11 original thread 12 call count 1
STA id 11 original thread 7 call count 2
STA id 11 original thread 12 call count 2
STA id 11 original thread 7 call count 3
STA id 11 original thread 12 call count 3
STA id 11 original thread 7 call count 4
STA id 11 original thread 12 call count 4
STA id 11 original thread 7 call count 5
STA id 11 original thread 12 call count 5
STA id 11 original thread 7 call count 6
STA id 11 original thread 12 call count 6
等等,我可以在不使用 SynchronizationContext 的情况下完成同样的事情,那么为什么还要费力呢?
确实,真的没有必要构建一个同步上下文来将代码封送到另一个线程。我可以直接使用我的队列和 STA 线程在我自己的代码中完成所有工作。我遵守 .NET 的规则,并提供自己的实现,这样其他人就可以使用它,但这并不是真正必需的。那么,为什么要费力呢?原因在于 WCF。WCF 允许您为服务内操作的执行提供 SynchronizationContext
。这是 WCF 的一个非常强大的功能,通常用于将 WCF 服务调用封送到 UI 线程。但是,在我的例子中,因为我做得很好并且创建了一个 SynchronizationContext
类而不是任何类,我现在可以告诉 WCF 在 STA 线程上执行我所有的服务方法。这是编写关于这个类的系列文章的主要原因。
结论
本文表明,您可以创建自己的 SynchronizationContext
类版本。我已经展示了阻塞队列是线程间通信的一个好选择。我们成功地将代码从任何线程封送到了一个 STA 线程。现在我们有了自己的 Synchronization Context 类,我们可以在 WCF 服务中利用它。这将是本系列第三部分的重点。一旦您了解了如何构建自己的 Synchronization Context 并将其应用于 WCF 服务,您就可以控制 WCF 方法执行时代码的“运行位置”——这是非常强大的 WCF 框架中一个非常强大的功能。
对于那些想要代码的人,我将在本系列文章的第三部分提供一个完整的 VS2008 解决方案。