65.9K
CodeProject 正在变化。 阅读更多。
Home

理解 .NET C# 中的 SynchronizationContext

starIconstarIconstarIconstarIconstarIcon

5.00/5 (31投票s)

2020 年 7 月 26 日

MIT

11分钟阅读

viewsIcon

81531

downloadIcon

1004

掌控你的代码将在哪个线程上执行,以及如何执行

SynchronizationContext Demo

引言

在这里,我打算揭开 .NET 框架的另一个黑暗角落——同步上下文。我将带你了解它们为何存在、它们做什么以及它们如何工作。最后,我们甚至将实现自己的同步上下文。本文假定你至少对多线程有所了解。你不必写很多多线程代码,只要你理解它的核心原理和注意事项即可。我也会稍微涵盖一下。

注意:本文的源代码包含了我到目前为止的其余 Tasks 框架。相关项目是 `SyncContextDemo` 和 `Tasks`。在 `Tasks` 下,你会找到 `MessagingSynchronizationContext.cs`,它使用了 `MessageQueue.cs` 作为辅助。

更新:这可能没有影响你现有的代码,但第一版存在一个潜在问题,即 `SemaphoreSlim` 和 `ConcurrentQueue` 会根据你的使用方式而不同步。这是不理想的。我已经更新了代码和示例以反映这些更改。

概念化这个混乱的局面

什么是同步上下文?

首先,我们来谈谈它解决的问题。对于多线程代码,你不能随意地跨线程读写值,这也意味着你不能随意地跨线程调用方法和属性,因为你可能会导致竞态条件,这是编程中最难调试的问题之一。编写多线程代码很复杂,很容易出错,而且很难调试。一定有更简单的方法!

我想说一个有趣的观点:设想线程之间存在“边界”——看不见的墙,你必须穿过它们。在那些墙之间就是你的(成员)数据所在的地方。不要毫无准备地跨越这些边界。如果你写过多线程代码,这一点很容易理解,甚至可能非常熟悉。

我们如何跨越这些边界?这取决于。有很多方法可以做到,其中一些主要方法是比较粗糙的同步原语(如互斥锁和信号量),以及更高级的消息传递(实际上是建立在同步原语之上的)。

问题是,我们能否抽象出一个灵活的东西,它允许我们无论底层实现如何,都能跨越线程边界进行通信,并向开发者呈现一个易于使用的外观?

答案基本上是肯定的,这就是微软通过 `SynchronizationContext` 所做的。这基本上是一个契约类,你需要从中派生才能使其执行任何操作。默认实现只是“越过墙壁”,而不进行任何同步。

然而,有时你并不是在处理默认的同步上下文。例如,WinForms 有自己的同步上下文,用于帮助你安全地运行 `BackgroundWorker` 任务,并在单独的线程上报告回 UI,即使报告的起始线程与 UI 线程不同。记住,你不能随意地跨越线程边界。

`SynchronizationContext` 及其派生类的工作方式类似于消息队列,或者至少它们向开发者呈现的是这样的外观。通过它,你可以在目标线程(我们消息循环“存在”的线程)上以两种方式之一执行委托。我们稍后会讲到消息循环。第一种将委托分派到目标线程的方法是 `Post()`,它是异步的,但它不会告诉你何时完成。第二种方法是 `Send()`,它是同步的,会阻塞发送者,直到接收者完成委托的执行。这不算很好,但这是我们拥有的。由于消息队列的性质,无法实现双向通信——它们是单向的,所以你需要两个。这就是为什么 `Post()` 不会通知你的原因。

同步上下文的另一件事是,每个线程都可以与之关联。这有时与查找传入消息的消息循环线程是同一个线程,但并非总是如此。委托可以被分派到运行消息循环的线程,以便该线程可以执行它们。我们将在下面介绍它的样子。

为什么要抽象它?

这是一个好问题。答案是你可以扩展它,并且框架中的一些部分可以消费它。`await` 机制会将对你 `async` 例程代码的调用插入到 `await` 中,以确保 `await` 前后的代码在同一上下文中(在同一线程上)执行。其他时候,框架会提供自己的上下文,例如 WinForms 实现的上下文,它能保持 UI 线程的安全,例如,当 `BackgroundWorker`(它会消费该上下文)与之通信时。

编写这个混乱的程序

我们如何使用它?

你可以通过检索 `SynchronizationContext.Current` 来获取线程当前的同步上下文。你可以通过调用 `SetSynchronizationContext()` 来设置它。

一旦你拥有了它,你就可以调用 `Post()` 来“即发即弃”地将委托发送到 `SynchronizationContext` 的关联消息循环线程,或者你可以调用 `Send()` 来阻塞直到远程执行完成。

如果你创建一个新线程,你可以将其同步上下文设置为由你的 UI 线程驱动的那个——就是你调用 `Application.Run()` 所在的、包含你的 `Form` 的那个线程,因为 WinForms 的同步上下文在那里运行。所以你会这样做:

// in a Winforms app UI thread somewhere:
var sctx = SynchronizationContext.Current;
var thread = new Thread(() => {
    // now await and other things can dispatch messages to 
    // to sctx which here in WinForms will be the UI's 
    // SynchronizationContext:
    SynchronizationContext.SetSynchronizationContext(sctx); 
    // ... do work including sctx.Post() and/or sctx.Send()
});
thread.Start();

在上面我们使用它的情况下,你并不真正需要设置线程的同步上下文,因为我们可以直接访问 `sctx`,所以不必查询 `SynchronizationContext.Current`,但像 `await` 这样的其他东西依赖于它,所以你确实应该设置它。

一旦你拥有了一个同步上下文,无论是通过检索 `Current` 属性,还是通过我们上面提到的“提升”方式,我们都可以调用 `Post()` 和 `Send()`。

你的消息,通常是从另一个线程传输过来的,然后会被分派到接收上下文的关联消息循环线程上。消息的传输看起来像这样:

// executes on the target thread, not this thread:
sctx.Post((object state) => { MessageBox.Show(string.Format("Hello from thread {0} (via Post)",
 Thread.CurrentThread.ManagedThreadId)); }, null);

匿名方法在目标线程上执行,在本例中是 UI 线程,此时调用 `MessageBox.Show()` 是安全的。

`Send()` 的工作方式完全相同,只是它会阻塞直到目标委托执行完毕。

我们可以使用这些来基本上将代码转移到其他线程,只要那些线程有消息循环和同步上下文。

它是如何工作的?

这不是魔法,这只是消息传递,我保证。为了更好地理解它,让我们先看看我构建的一个 `SynchronizationContext` 特定实现的的消息循环:

Message msg;
do
{
    // blocks until a message comes in:
    msg = _messageQueue.Receive();
    // execute the code on this thread
    msg.Callback?.Invoke(msg.State);
    // let Send() know we're done:
    if (null != msg.FinishedEvent)
        msg.FinishedEvent.Set();
    // exit on the quit message
} while (null != msg.Callback);

我不特别喜欢 `Callback` 这个名字,但我用它是因为微软在他们的默认实现中就是这样称呼它的,也是 `Send()` 和 `Post()` 委托所接受的。它就是包含要执行的代码的委托(通常是一个匿名方法),就像我们之前在 `Post()` 和 `Send()` 中所做的那样。如果它是 `null`,这表示停止消息循环,但这取决于我的具体实现。最后,我们简单地调用消息中收到的委托,因为现在我们在目标线程上。

现在让我们看看演示,它虽然有点人为,但很有启发性:

// determine if we are using a synchronization context other than the default
// if it's null, we're using the default, which executes code on the same thread
// that Send() or Post() sent it on.
Console.WriteLine("Current thread id is {0}", Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Synchronization context is {0}set",
                   SynchronizationContext.Current == null?"not ":"");
            
// create a new custom synchronization context
var sc = new MessagingSynchronizationContext();
Console.WriteLine("Setting context to MessageQueueSynchronizationContext");
                    

// now start our message loop, and an auxiliary thread
Console.WriteLine("Starting message loop for thread {0}",
                   Thread.CurrentThread.ManagedThreadId);
var thread = new Thread(() => {
    // always set the synchronization context if you'll be using 
    // a non-default one on the thread
    SynchronizationContext.SetSynchronizationContext(sc);
                
    // don't use the synchronization context - posts from this thread:
    Console.WriteLine("Hello from thread {0}", Thread.CurrentThread.ManagedThreadId);
                
    // use the synchronization context - posts from Main()'s thread:
    sc.Post((object state) => { Console.WriteLine("Hello from thread {0} (via Post)", 
                                Thread.CurrentThread.ManagedThreadId); }, null);
});
// start the auxiliary thread
thread.Start();
var task = Task.Run(async () =>
{
    // set the synchronization context
    // uncomment this to see what happens!
    // SynchronizationContext.SetSynchronizationContext(sc);
    Thread.Sleep(50);
    Console.WriteLine("Awaiting task");
    await Task.Delay(50);
    // this will wake up on main thread or not
    // depending on the synchronization context
    Console.WriteLine("Hello from thread {0} (via await)", 
                       Thread.CurrentThread.ManagedThreadId);
});

// start the message loop:
sc.Start(); // blocks
// doesn't matter but shuts up the compiler:
await task;

它应该输出类似这样的内容:

Current thread id is 1
Synchronization context is not set
Setting context to MessageQueueSynchronizationContext
Starting message loop for thread 1
Hello from thread 3
Hello from thread 1 (via Post)
Awaiting task
Hello from thread 4 (via await)

现在尝试取消注释这一行(写着“看看会发生什么!”的地方):

SynchronizationContext.SetSynchronizationContext(sc);

现在运行程序。这次它说线程 1(或者任何你当前的线程 ID)。你会注意到这次 `await` 没有导致代码在新线程上执行。这是什么魔法?`await` 使用当前的同步上下文来执行 `await` 之后的代码。

var sctx = SynchronizationContext.Current??new SynchronizationContext();

sctx.Post(()=>{ /* next portion of code */ },null);

稍后我会详细解释。

请注意,`SynchronizationContext` 没有 `Start()` 方法。同步上下文使用的消息循环是如何实现的,这是一个我们不应该考虑的实现细节。然而,在我们的自定义实现中,我们需要一个东西来充当我们的消息循环,而我本质上只是在该方法后面提供了一个样板化的消息循环,以保持简单。

如果你还不完全理解它是如何工作的,让我再解释一遍。某个地方有一个消息循环。它在你代码中还是在框架底层,这是一个实现细节。关键在于,无论消息循环在哪条线程上运行,代码最终都会在那里执行。你从其他线程调用 `Send()` 和 `Post()`,并将“包含”你的代码的委托“传输”到目标线程上执行。这使得跨线程通信变得容易。

如何创建自己的同步上下文?

有时,例如在控制台应用程序或 Windows 服务中,你可能没有一个好的 `SynchronizationContext` 可以使用。问题是没有消息循环。如果你想要一个,你必须自己创建一个,这正是本文的目的。它应该足以满足你需要在你选择的线程上执行代码的自定义多线程场景。我们将在这里进行探讨。首先,我们有一个嵌套的 `struct` 声明和一个重要的成员字段:

private struct Message
{
    public readonly SendOrPostCallback Callback;
    public readonly object State;
    public readonly ManualResetEventSlim FinishedEvent;
    public Message(SendOrPostCallback callback, 
                   object state, ManualResetEventSlim finishedEvent)
    {
        Callback = callback;
        State = state;
        FinishedEvent = finishedEvent;
    }
    public Message(SendOrPostCallback callback, object state) : this(callback, state, null)
    {
    }
}
MessageQueue<Message> _messageQueue = new MessageQueue<Message>();

消息包含我之前提到的“callback”,一个用于可选用户状态的字段(你传递给 `Send()` 或 `Post()`),最后是一个 `ManualResetEventSlim`,我将在后面解释它。它用于向 `Send()` 发送信号,表明我们已处理完消息,以便 `Send()` 能够阻塞直到消息被接收。这个类型声明了我们在消息循环线程上执行委托所需的所有信息。

接下来,我们有一个名为 `MessageQueue` 的东西,它保存上面声明的 `Message` `struct` 实例。这个 `class` 通过发布和接收 `Message` 来提供线程安全的通信方式。它完成了大部分繁重的工作,但我们最终也会进行探讨。

上面的是我们 `SynchronizationContext` 实现特定的。你可能也有自己的跨线程通信方式,并且你可以实现任何你喜欢的东西,只要它符合 `SynchronizationContext` 提供的必要契约。

这是我们自定义同步上下文的 `Send()` 和 `Post()` 实现:

/// <summary>
/// Sends a message and does not wait
/// </summary>
/// <param name="callback">The delegate to execute</param>
/// <param name="state">The state associated with the message</param>
public override void Post(SendOrPostCallback callback, object state)
{
    _messageQueue.Post(new Message(callback, state));
}
/// <summary>
/// Sends a message and waits for completion
/// </summary>
/// <param name="callback">The delegate to execute</param>
/// <param name="state">The state associated with the message</param>
public override void Send(SendOrPostCallback callback, object state)
{
    var ev = new ManualResetEventSlim(false);
    try
    {
        _messageQueue.Post(new Message(callback, state, ev));
        ev.Wait();
    }
    finally
    {
        ev.Dispose();
    }
}

你可以看到 `Post()` 非常简单。`Send()` 稍微复杂一些,因为我们必须在它最终完成时得到通知,而这正是我们之前的 `ManualResetEventSlim` 的作用。在这里,我们创建它,用 `Message` 发布它,然后等待它。在我们的消息循环中,它会被 `Set()`,表示我们可以继续了。最后,我们 `Dispose()` 事件。回收这些事件可能更有效率,但这样做会更复杂,而且我不确定能获得多少性能提升,如果有的话。

请注意,我们可以将一个 `State` 与 `Message` 一起传递。它会被发送到 `Callback` 进行处理,其值由使用者任意定义。

现在让我们再次看看 `Start()` 中的消息循环,希望这次会更清楚一些:

Message msg;
do
{
    // blocks until a message comes in:
    msg = _messageQueue.Receive();
    // execute the code on this thread
    msg.Callback?.Invoke(msg.State);
    // let Send() know we're done:
    if (null != msg.FinishedEvent)
        msg.FinishedEvent.Set();
    // exit on the quit message
} while (null != msg.Callback);

而 `Stop()` 看起来是这样的:

var ev = new ManualResetEventSlim(false);
try
{
    // post the quit message
    _messageQueue.Post(new Message(null, null, ev));
    ev.Wait();
}
finally {
    ev.Dispose();
}

请注意我们是如何等待消息完成的。之所以这里不使用 `Send()` 却能达到同样的效果,是因为我考虑在 `Send()` 中添加对 `null` `Callback` 的检查并抛出异常。这段代码确保了如果我添加这个检查,这里的行为不会受到影响。

MessageQueue 类呢?

`MessageQueue` 提供了在线程之间发布和接收消息的核心功能。它使用 `ConcurrentQueue` 和 `SemaphoreSlim` 来实现其功能。原则是,每次有人向队列中添加一个消息(类型为 `T`),他们都会在信号量上调用 `Release(1)`,允许下一个 `Receive()` 不被阻塞地通过。其结果是,这只有在队列为空时才会阻塞,所以 `Receive()` 只有在没有消息时才阻塞。否则,它会返回队列中的下一个消息并将其移除。

T result;
_sync.Wait();
if (!_queue.TryDequeue(out result))
    throw new InvalidOperationException("The queue is empty");
return result;

同时,`Post()`(没有 `Send()` 等效方法)的工作方式如下:

_queue.Enqueue(message);
_sync.Release(1);

这就是它的核心。有几种变体支持可等待操作和/或接受 `CancellationToken`,但它们本质上都执行与上面相同的功能。

Await 和 SynchronizationContext

`await` 语言特性通常会为你生成使用线程同步上下文的代码。每当找到 `await` 时,它就会为你的方法构建的状态机创建一个新的状态,以便能够暂停方法的执行。该方法变得可重新启动,并且在修改和转换代码方面与 C# 迭代器和 `yield` 的工作方式非常相似。问题是,你的方法在 `await` 之后经常会在不同的线程上“重新启动”,因为它被挂接到设备 I/O 回调上,或者被另一个 OS 线程“唤醒”/恢复。你需要的是无缝地将代码转换回原始线程,而这正是 `await` 所提供的。它使用线程当前的 `SynchronizationContext` 来运行重新启动的方法,并在例程最初被调用的那个线程上执行,使用 `Post()` 或 `Send()`。这就是为什么设置 `SynchronizationContext` 很重要,特别是如果你正在使用 `async`/`await` 并且需要一个自定义的同步上下文,例如上面那个。

然而,如果你通过使用 `ConfigureAwait(false)` 配置 `Task`,它将覆盖典型的行为,并且当前同步上下文将不会被使用,使得行为与未设置同步上下文时相同。

历史

  • 2020 年 7 月 26 日 - 首次提交
  • 2020 年 7 月 28 日 - 更新 1 (错误修复)
© . All rights reserved.