65.9K
CodeProject 正在变化。 阅读更多。
Home

C# 中的异步事件

starIconstarIconstarIconstarIconstarIcon

5.00/5 (27投票s)

2022年9月11日

CPOL

4分钟阅读

viewsIcon

49875

downloadIcon

542

关于 C# 中异步事件调用的教程文章

引言

像 C# 这样的现代语言集成了事件机制,它实际上将观察者模式融入了语言机制。

事实上,事件机制实际上提供的是同步调用,这一点常常被忽视,也没有得到足够的强调。程序员经常产生并行性的错觉,但这并非现实,在当今多核处理器时代,这是一个重要的问题。接下来我们提供多线程问题的分析和解决方案。

提供的代码是教程、概念验证级别的,为简洁起见,并未处理或展示所有变体/有问题的方面。

事件机制在单线程上提供同步调用

需要强调的是,在调用

if (SubjectEvent != null)
{
    SubjectEvent(this, args);
}

//or

SubjectEvent?.Invoke(this, args);

订阅的 EventHandler 是在单个线程上同步调用的。这会产生一些不那么明显的结果

  • EventHandler 按顺序执行,一个接一个,按照它们订阅事件的顺序执行。
  • 这意味着,在更早订阅的 EventHandler 中的对象/值比在其他 EventHandler 中更早更新,这可能会对程序逻辑产生影响。
  • 调用某个 EventHandler 会阻塞线程,直到该 EventHandler 中的所有工作完成。
  • 如果在某个 EventHandler 中抛出异常,所有在其后订阅的 EventHandler 都将不会执行。

我们将通过一个示例来演示这一点。计划创建三个 EventHandler,每个 EventHandler 需要 10 秒才能完成,并监控每个 EventHandler 运行的线程以及总共花费的时间。我们将输出此示例相关的每个 ThreadId,以查看使用了多少线程。

public class EventArgsW : EventArgs
{
    public string StateW = null;
}

public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;

    public string StateW;

    public void Notify()
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;

        EventW?.Invoke(this, args);
    }
}

public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;

    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }

    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);

        Worker(subject, args);
    }

    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;

        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}

internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);

        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;

        // Change subject state and notify observers
        s.StateW = "ABC123";

        var timer = new Stopwatch();
        timer.Start();

        s.Notify();

        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);

        Console.ReadLine();
    }
}

执行结果是

从执行结果可以看出,EventHandler 一个接一个地运行,所有都在 thread Id=1 上运行,与客户端运行在同一个线程上。所有工作完成花费了 30.059 秒。

使用 TPL 的异步事件

使用任务并行库 (TPL),我们可以让我们的 EventHandler 在单独的线程上异步运行。更进一步,如果我们想将 Client 线程从任何工作中解放出来(假设我们的 Client 是 UI 线程),我们可以让 Event 的触发(分派 EventHandler 的调用)在一个独立于 Client 线程的线程上进行。这是新的实现

新的解决方案代码在这里

public class EventArgsW : EventArgs
{
    public string StateW = null;
}

public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;

    public string StateW;

    public void Notify()
    {
        Task.Factory.StartNew(
            () => {
                Console.WriteLine("Notify is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);

                EventArgsW args = new EventArgsW();
                args.StateW = this.StateW;

                EventW?.Invoke(this, args);
            });
    }
}

public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;

    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }

    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);

        Task.Factory.StartNew(
            () => Worker(subject, args)); ;
    }

    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;

        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}

internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);

        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;

        // Change subject state and notify observers
        s.StateW = "ABC123";

        var timer = new Stopwatch();
        timer.Start();

        s.Notify();

        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);

        Console.ReadLine();
    }
}

执行结果在这里

从执行结果可以看出,我们看到 EventHandler 在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.020 秒。

使用 TPL 的异步事件 – 扩展方法

由于使用 TPL 需要修改现有代码,并且使代码的可读性降低,所以我创建了一个扩展方法来简化 TPL 的使用。而不是写

EventW?.Invoke(this, args);

人们会写

EventW?.InvokeAsync<EventArgsW>(this, args);

所有 TPL 的魔力都将在后台发生。这是新解决方案的全部源代码

public class EventArgsW : EventArgs
{
    public string StateW = null;
}

public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;

    public string StateW;

    public void Notify()
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;

        EventW?.InvokeAsync<EventArgsW>(this, args);  //(1)
    }
}

public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;

    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }

    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);

        Worker(subject, args);
    }

    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;

        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}

public static class AsyncEventsUsingTplExtension
{
    public static void InvokeAsync<TEventArgs>   //(2)
        (this EventHandler<TEventArgs> handler, object sender, TEventArgs args)
    {
        Task.Factory.StartNew(() =>
        {
            Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);

            var delegates = handler?.GetInvocationList();

            foreach (var delegated in delegates)
            {
                var myEventHandler = delegated as EventHandler<TEventArgs>;
                if (myEventHandler != null)
                {
                    Task.Factory.StartNew(() => myEventHandler(sender, args));
                }
            };
        });
    }
}

internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);

        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;

        // Change subject state and notify observers
        s.StateW = "ABC123";

        var timer = new Stopwatch();
        timer.Start();

        s.Notify();

        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);

        Console.ReadLine();
    }
}

这是执行结果

从执行结果可以看出,我们看到 EventHandler 在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.039 秒。TPL 将工作分派到线程池中的线程,可以看到线程 Id=4 被使用了两次,可能是因为它提前完成了工作,并且再次可用于工作。

使用 TAP 的异步事件

根据它们在 C# 中的定义方式,EventHandler 是同步函数,在任务异步模式 (TAP) 的上下文中。如果你希望 EventHandler 在 TAP 的上下文中是异步的,这样你就可以在其中使用 `await`,你实际上需要自己构建一个事件通知机制,该机制支持你自定义版本的异步 EventHandler。一个很好的例子可以在 [1] 中看到。我修改了该代码以用于我的示例,这是新版本的解决方案

public class EventArgsW : EventArgs
{
    public string StateW = null;
}

public class EventWrapper
{
    public event AsyncEventHandler<EventArgsW> EventW;

    public string StateW;

    public async Task Notify(CancellationToken token)
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;

        await this.EventW.InvokeAsync(this, args, token);
    }
}

public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;

    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }

    public async Task Handler(object subject, EventArgsW args,
        CancellationToken token)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);

        await Worker(subject, args);
    }

    private async Task Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
            "{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;

        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
                "{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        await Task.Delay(0);
        mrs.Set();
    }
}

public delegate Task AsyncEventHandler<TEventArgs>(
        object sender, TEventArgs e, CancellationToken token);

public static class AsynEventHandlerExtensions
{
    // invoke an async event (with null-checking)
    public static async Task InvokeAsync<TEventArgs>(
        this AsyncEventHandler<TEventArgs> handler,
        object sender, TEventArgs args, CancellationToken token)
    {
        await Task.Run(async () =>
        {
            Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);

            var delegates = handler?.GetInvocationList();
            if (delegates?.Length > 0)
            {
                var tasks =
                delegates
                .Cast<AsyncEventHandler<TEventArgs>>()
                .Select(e => Task.Run(
                    async () => await e.Invoke(sender, args, token)));
                await Task.WhenAll(tasks);
            }
        }).ConfigureAwait(false);
    }
}

internal class Client
{
    public static async Task Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++)
            mres[i] = new ManualResetEvent(false);

        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;

        // Change subject state and notify observers
        s.StateW = "ABC123";

        var timer = new Stopwatch();
        timer.Start();

        await s.Notify(CancellationToken.None);

        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " +
            timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);

        Console.ReadLine();
    }
}

这是执行结果

从执行结果可以看出,我们看到 EventHandler,现在是异步的,在单独的线程上运行,从执行日志中可以看到并发性,总共花费的时间是 10.063 秒。

使用 TAP 的异步事件 – Ver2

虽然这不是本文的主要目的,但我们可以更改代码以更好地演示 TAP 模式。我们只会对上面的项目代码做一些小改动,改变一个方法,其他所有方法与上面相同。

private async Task Worker(object subject, EventArgsW args)
{
    Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
        "{1}, i:0",
        name, Thread.CurrentThread.ManagedThreadId);
    StateW = args.StateW;

    for (int i = 1; i <= 2; ++i)
    {
        await Task.Delay(5000);
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
            "{1}, i:{2}",
            name, Thread.CurrentThread.ManagedThreadId, i);
    }
    mrs.Set();
}

现在,我们得到以下执行结果

如果我们关注,例如,Handler1.Worker,我们可以看到该异步方法在 ThreadPool 的三个不同线程上运行,线程 ID 分别是 5、8、6。由于 TAP 模式,这都是可以的,因为在 await 之后,工作被 ThreadPool 中下一个可用的线程接管。并发性再次显而易见,总时间为 10.101 秒。

结论

事件机制实际上提供对 EventHandler 的同步调用。我们在上面的示例中展示了如何使 EventHandler 的调用异步。代码中提供了两个可重用的扩展方法,它们简化了异步调用的实现。其好处是 EventHandler 的并行调用,这在当今的多核系统中非常重要。

参考

历史

  • 2022年9月11日:初始版本
© . All rights reserved.