C# 中的异步事件
关于 C# 中异步事件调用的教程文章
引言
像 C# 这样的现代语言集成了事件机制,它实际上将观察者模式融入了语言机制。
事实上,事件机制实际上提供的是同步调用,这一点常常被忽视,也没有得到足够的强调。程序员经常产生并行性的错觉,但这并非现实,在当今多核处理器时代,这是一个重要的问题。接下来我们提供多线程问题的分析和解决方案。
提供的代码是教程、概念验证级别的,为简洁起见,并未处理或展示所有变体/有问题的方面。
事件机制在单线程上提供同步调用
需要强调的是,在调用
if (SubjectEvent != null)
{
SubjectEvent(this, args);
}
//or
SubjectEvent?.Invoke(this, args);
订阅的 EventHandler
是在单个线程上同步调用的。这会产生一些不那么明显的结果
EventHandler
按顺序执行,一个接一个,按照它们订阅事件的顺序执行。- 这意味着,在更早订阅的
EventHandler
中的对象/值比在其他EventHandler
中更早更新,这可能会对程序逻辑产生影响。 - 调用某个
EventHandler
会阻塞线程,直到该EventHandler
中的所有工作完成。 - 如果在某个
EventHandler
中抛出异常,所有在其后订阅的EventHandler
都将不会执行。
我们将通过一个示例来演示这一点。计划创建三个 EventHandler
,每个 EventHandler
需要 10 秒才能完成,并监控每个 EventHandler
运行的线程以及总共花费的时间。我们将输出此示例相关的每个 ThreadId
,以查看使用了多少线程。
public class EventArgsW : EventArgs
{
public string StateW = null;
}
public class EventWrapper
{
public event EventHandler<EventArgsW> EventW;
public string StateW;
public void Notify()
{
Console.WriteLine("Notify is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
EventArgsW args = new EventArgsW();
args.StateW = this.StateW;
EventW?.Invoke(this, args);
}
}
public class HandlerWrapper
{
private string name;
private string StateW;
private ManualResetEvent mrs;
public HandlerWrapper(string name, ManualResetEvent mrs)
{
this.name = name;
this.mrs = mrs;
}
public void Handler(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0} is running on ThreadId:{1}",
name, Thread.CurrentThread.ManagedThreadId);
Worker(subject, args);
}
private void Worker(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
name, Thread.CurrentThread.ManagedThreadId);
StateW = args.StateW;
for (int i = 1; i <= 2; ++i)
{
Thread.Sleep(5000);
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
name, Thread.CurrentThread.ManagedThreadId, i);
}
mrs.Set();
}
}
internal class Client
{
public static void Main(string[] args)
{
Console.WriteLine("Client is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
ManualResetEvent[] mres = new ManualResetEvent[3];
for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);
EventWrapper s = new EventWrapper();
s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
// Change subject state and notify observers
s.StateW = "ABC123";
var timer = new Stopwatch();
timer.Start();
s.Notify();
ManualResetEvent.WaitAll(mres);
timer.Stop();
TimeSpan timeTaken = timer.Elapsed;
string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
Console.WriteLine(tmp1);
Console.ReadLine();
}
}
执行结果是
从执行结果可以看出,EventHandler
一个接一个地运行,所有都在 thread Id=1
上运行,与客户端运行在同一个线程上。所有工作完成花费了 30.059 秒。
使用 TPL 的异步事件
使用任务并行库 (TPL),我们可以让我们的 EventHandler
在单独的线程上异步运行。更进一步,如果我们想将 Client
线程从任何工作中解放出来(假设我们的 Client
是 UI 线程),我们可以让 Event
的触发(分派 EventHandler
的调用)在一个独立于 Client
线程的线程上进行。这是新的实现
新的解决方案代码在这里
public class EventArgsW : EventArgs
{
public string StateW = null;
}
public class EventWrapper
{
public event EventHandler<EventArgsW> EventW;
public string StateW;
public void Notify()
{
Task.Factory.StartNew(
() => {
Console.WriteLine("Notify is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
EventArgsW args = new EventArgsW();
args.StateW = this.StateW;
EventW?.Invoke(this, args);
});
}
}
public class HandlerWrapper
{
private string name;
private string StateW;
private ManualResetEvent mrs;
public HandlerWrapper(string name, ManualResetEvent mrs)
{
this.name = name;
this.mrs = mrs;
}
public void Handler(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0} is running on ThreadId:{1}",
name, Thread.CurrentThread.ManagedThreadId);
Task.Factory.StartNew(
() => Worker(subject, args)); ;
}
private void Worker(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
name, Thread.CurrentThread.ManagedThreadId);
StateW = args.StateW;
for (int i = 1; i <= 2; ++i)
{
Thread.Sleep(5000);
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
name, Thread.CurrentThread.ManagedThreadId, i);
}
mrs.Set();
}
}
internal class Client
{
public static void Main(string[] args)
{
Console.WriteLine("Client is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
ManualResetEvent[] mres = new ManualResetEvent[3];
for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);
EventWrapper s = new EventWrapper();
s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
// Change subject state and notify observers
s.StateW = "ABC123";
var timer = new Stopwatch();
timer.Start();
s.Notify();
ManualResetEvent.WaitAll(mres);
timer.Stop();
TimeSpan timeTaken = timer.Elapsed;
string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
Console.WriteLine(tmp1);
Console.ReadLine();
}
}
执行结果在这里
从执行结果可以看出,我们看到 EventHandler
在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.020 秒。
使用 TPL 的异步事件 – 扩展方法
由于使用 TPL 需要修改现有代码,并且使代码的可读性降低,所以我创建了一个扩展方法来简化 TPL 的使用。而不是写
EventW?.Invoke(this, args);
人们会写
EventW?.InvokeAsync<EventArgsW>(this, args);
所有 TPL 的魔力都将在后台发生。这是新解决方案的全部源代码
public class EventArgsW : EventArgs
{
public string StateW = null;
}
public class EventWrapper
{
public event EventHandler<EventArgsW> EventW;
public string StateW;
public void Notify()
{
Console.WriteLine("Notify is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
EventArgsW args = new EventArgsW();
args.StateW = this.StateW;
EventW?.InvokeAsync<EventArgsW>(this, args); //(1)
}
}
public class HandlerWrapper
{
private string name;
private string StateW;
private ManualResetEvent mrs;
public HandlerWrapper(string name, ManualResetEvent mrs)
{
this.name = name;
this.mrs = mrs;
}
public void Handler(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0} is running on ThreadId:{1}",
name, Thread.CurrentThread.ManagedThreadId);
Worker(subject, args);
}
private void Worker(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
name, Thread.CurrentThread.ManagedThreadId);
StateW = args.StateW;
for (int i = 1; i <= 2; ++i)
{
Thread.Sleep(5000);
Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
name, Thread.CurrentThread.ManagedThreadId, i);
}
mrs.Set();
}
}
public static class AsyncEventsUsingTplExtension
{
public static void InvokeAsync<TEventArgs> //(2)
(this EventHandler<TEventArgs> handler, object sender, TEventArgs args)
{
Task.Factory.StartNew(() =>
{
Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
var delegates = handler?.GetInvocationList();
foreach (var delegated in delegates)
{
var myEventHandler = delegated as EventHandler<TEventArgs>;
if (myEventHandler != null)
{
Task.Factory.StartNew(() => myEventHandler(sender, args));
}
};
});
}
}
internal class Client
{
public static void Main(string[] args)
{
Console.WriteLine("Client is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
ManualResetEvent[] mres = new ManualResetEvent[3];
for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);
EventWrapper s = new EventWrapper();
s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
// Change subject state and notify observers
s.StateW = "ABC123";
var timer = new Stopwatch();
timer.Start();
s.Notify();
ManualResetEvent.WaitAll(mres);
timer.Stop();
TimeSpan timeTaken = timer.Elapsed;
string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
Console.WriteLine(tmp1);
Console.ReadLine();
}
}
这是执行结果
从执行结果可以看出,我们看到 EventHandler
在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.039 秒。TPL 将工作分派到线程池中的线程,可以看到线程 Id=4
被使用了两次,可能是因为它提前完成了工作,并且再次可用于工作。
使用 TAP 的异步事件
根据它们在 C# 中的定义方式,EventHandler
是同步函数,在任务异步模式 (TAP) 的上下文中。如果你希望 EventHandler
在 TAP 的上下文中是异步的,这样你就可以在其中使用 `await`,你实际上需要自己构建一个事件通知机制,该机制支持你自定义版本的异步 EventHandler
。一个很好的例子可以在 [1] 中看到。我修改了该代码以用于我的示例,这是新版本的解决方案
public class EventArgsW : EventArgs
{
public string StateW = null;
}
public class EventWrapper
{
public event AsyncEventHandler<EventArgsW> EventW;
public string StateW;
public async Task Notify(CancellationToken token)
{
Console.WriteLine("Notify is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
EventArgsW args = new EventArgsW();
args.StateW = this.StateW;
await this.EventW.InvokeAsync(this, args, token);
}
}
public class HandlerWrapper
{
private string name;
private string StateW;
private ManualResetEvent mrs;
public HandlerWrapper(string name, ManualResetEvent mrs)
{
this.name = name;
this.mrs = mrs;
}
public async Task Handler(object subject, EventArgsW args,
CancellationToken token)
{
Console.WriteLine("Handler{0} is running on ThreadId:{1}",
name, Thread.CurrentThread.ManagedThreadId);
await Worker(subject, args);
}
private async Task Worker(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
"{1}, i:0",
name, Thread.CurrentThread.ManagedThreadId);
StateW = args.StateW;
for (int i = 1; i <= 2; ++i)
{
Thread.Sleep(5000);
Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
"{1}, i:{2}",
name, Thread.CurrentThread.ManagedThreadId, i);
}
await Task.Delay(0);
mrs.Set();
}
}
public delegate Task AsyncEventHandler<TEventArgs>(
object sender, TEventArgs e, CancellationToken token);
public static class AsynEventHandlerExtensions
{
// invoke an async event (with null-checking)
public static async Task InvokeAsync<TEventArgs>(
this AsyncEventHandler<TEventArgs> handler,
object sender, TEventArgs args, CancellationToken token)
{
await Task.Run(async () =>
{
Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
var delegates = handler?.GetInvocationList();
if (delegates?.Length > 0)
{
var tasks =
delegates
.Cast<AsyncEventHandler<TEventArgs>>()
.Select(e => Task.Run(
async () => await e.Invoke(sender, args, token)));
await Task.WhenAll(tasks);
}
}).ConfigureAwait(false);
}
}
internal class Client
{
public static async Task Main(string[] args)
{
Console.WriteLine("Client is running on ThreadId:{0}",
Thread.CurrentThread.ManagedThreadId);
ManualResetEvent[] mres = new ManualResetEvent[3];
for (int i = 0; i < mres.Length; i++)
mres[i] = new ManualResetEvent(false);
EventWrapper s = new EventWrapper();
s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
// Change subject state and notify observers
s.StateW = "ABC123";
var timer = new Stopwatch();
timer.Start();
await s.Notify(CancellationToken.None);
ManualResetEvent.WaitAll(mres);
timer.Stop();
TimeSpan timeTaken = timer.Elapsed;
string tmp1 = "Client time taken: " +
timeTaken.ToString(@"m\:ss\.fff");
Console.WriteLine(tmp1);
Console.ReadLine();
}
}
这是执行结果
从执行结果可以看出,我们看到 EventHandler
,现在是异步的,在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.063 秒。
使用 TAP 的异步事件 – Ver2
虽然这不是本文的主要目的,但我们可以更改代码以更好地演示 TAP 模式。我们只会对上面的项目代码做一些小改动,改变一个方法,其他所有方法与上面相同。
private async Task Worker(object subject, EventArgsW args)
{
Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
"{1}, i:0",
name, Thread.CurrentThread.ManagedThreadId);
StateW = args.StateW;
for (int i = 1; i <= 2; ++i)
{
await Task.Delay(5000);
Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
"{1}, i:{2}",
name, Thread.CurrentThread.ManagedThreadId, i);
}
mrs.Set();
}
现在,我们得到以下执行结果
如果我们关注,例如,Handler1.Worker
,我们可以看到该异步方法在 ThreadPool
的三个不同线程上运行,线程 ID 分别是 5、8、6。由于 TAP 模式,这都是可以的,因为在 await
之后,工作被 ThreadPool
中下一个可用的线程接管。并发性再次显而易见,总时间为 10.101 秒。
结论
事件机制实际上提供对 EventHandler
的同步调用。我们在上面的示例中展示了如何使 EventHandler
的调用异步。代码中提供了两个可重用的扩展方法,它们简化了异步调用的实现。其好处是 EventHandler
的并行调用,这在当今的多核系统中非常重要。
参考
历史
- 2022年9月11日:初始版本