如何使用 TAP 在 C# 中实现和使用可等待的套接字





5.00/5 (3投票s)
探索将 Socket 的异步模型调整为基于任务的模型,并为您的项目添加一些可等待的套接字操作。
引言
本文介绍如何通过 TAP 在 C# 中提供基于 TAP 的可等待 Socket
,以便我们使用 C# 内置的 async
/await
功能进行异步套接字操作。此外,它演示了如何使用可等待功能来提供 HTTP 内容。HTTP 服务器非常简洁,仅提供一个 "Hello World!
" 页面,但如果出于某种原因您想对其进行修改,它可以轻松地扩展以执行更多功能。
在大多数情况下,我建议使用 ASP.NET 甚至 http.sys 来提供 HTTP 内容。ASP.NET 更安全、更具可扩展性,并且经过彻底的测试和文档记录。这个示例项目几乎不具备这些优点。即使您真的想自己构建,也可以考虑使用 HttpListener
类。它只是提供 HTTP 功能,以便套接字能够发送和接收数据。这实际上并不是一篇关于如何构建 Web 服务器的文章。
目标是教授一些使用 TAP 模型的技术。本文假定您之前已经使用过 async
/await
、Task
和 Socket
。
概念化这个混乱的局面
实现 TAP 模式使我们能够以一致的方式公开可等待方法。遵循 TAP 模型可确保您的代码能够被广大 .NET 开发者所接受,因为与 Microsoft 提供的 API 相比,您的 API 更为熟悉。这种公开可等待方法的方式可以降低学习曲线,并使代码更加健壮。我们将在本文中实现这一点。
此外,能够将其他异步模型(如 EAP 或类似的基于事件的模型(例如 Socket
使用的模型))适配到 TAP,将使您的代码更加灵活和现代化。我们将以此 文章 为参考和背景进行介绍。部分源代码来自 Stephen Toub 的代码,在此,但我会解释足够的内容,以便我们普通人能够理解。
从 APM 到 TAP
我们将需要做的是用我们自己的方法包装 Socket
的异步方法,这些方法使用 TAP 模型和可等待方法公开这些成员。需要注意的是,Socket 公开了多种异步模式——既有 BeginXXXX()
/EndXXXX()
调用的 APM 模型,也有其自身的、类似于 EAP 的基于事件的模型。我们关注后者基于事件的模型,因为它在底层更有效率,但我们并非总能在所有情况下都使用它,所以我们也会包装前者。
当我们用 TAP 方法包装 APM 方法时,主要要做的是创建一个称为任务完成源 (Task Completion Source) 的对象,然后根据异步操作的结果对其进行配置。对于旧式的 APM 模型,带有 BeginXXXX()
/EndXXXX()
方法,包装过程很简单——来自 Stephen Toubb 的示例代码
public static Task<int> ReceiveAsync(
this Socket socket, byte[] buffer, int offset, int size,
SocketFlags socketFlags)
{
var tcs = new TaskCompletionSource<int>();
socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
{
try { tcs.TrySetResult(socket.EndReceive(iar)); }
catch (Exception exc) { tcs.TrySetException(exc); }
}, null);
return tcs.Task;
}
我们这里所做的是创建一个新的任务完成源,开始异步操作,然后挂钩它的回调以调用 EndReceive()
,然后根据结果配置任务完成源。最后,我们返回任务完成源关联的任务。请习惯这种模式,因为我们会反复进行非常相似的操作。我们可以做的一件事是,为了提高效率,将 tcs
变量作为异步操作的 asyncState
传递,而不是依赖于匿名方法变量提升来将 tcs
值传递给匿名方法。
public static Task<int> ReceiveAsync(
this Socket socket, byte[] buffer, int offset, int size,
SocketFlags socketFlags)
{
var tcs = new TaskCompletionSource<int>(socket);
socket.BeginReceive(buffer, offset, size, socketFlags, iar =>
{
var t = (TaskCompletionSource<int>)iar.AsyncState;
var s = (Socket)t.Task.AsyncState;
try { t.TrySetResult(s.EndReceive(iar)); }
catch (Exception exc) { t.TrySetException(exc); }
}, tcs);
return tcs.Task;
}
唯一的区别是我们不依赖编译器来提升我们的 tcs
值并使其对匿名方法可用,所以我们不必处理额外的低效率。相反,我们将其作为 APM 的 AsyncState
的一部分传递。根据 Stephen Toub 的说法,这可以避免额外的分配。否则,行为是相同的。
从 Socket 的事件模型到 TAP
我们上面已经介绍了包装 APM 的基本知识,但我们还想包装 Socket
的基于事件的模型,因为它更有效率,这样我们就可以使用它。这要复杂得多,因为我们首先必须创建一个特殊的、可用于 await
关键字的可等待类。如果对此有文档,我不知道在哪里可以找到,但基本上它需要实现 System.Runtime.CompilerServices.INotifyCompletion
以及其他几个方法;GetAwaiter()
和 GetResult()
。没有这些“花哨”的部分,你就无法对其使用 await
。Stephen Toub 实现中的 GetAwaiter()
方法返回其自身的实例。通常,您会有一个单独的可等待类或结构。总之,如果您自己实现一个可等待类,它将需要这个方法,并且它应该是一个可等待类。另一个是 GetResult()
,它检查结果,然后返回 await 的值。在本例中,我们不返回值,因此它是 null。这里的方法不属于任何接口。编译器只是期望它们。** 我们将在下一节介绍可等待类。这里有一些关于此的文档:这里 和 这里。
所有这些的根本原因在于 Socket
的基于事件的异步方法允许您重用其事件参数,而不是每次执行发送或接收时都分配更多的参数。这可能看起来微不足道,但套接字操作的频率可能非常高,我们希望避免垃圾回收器为了清理所有这些额外的东西而周期性地暂停我们的发送或接收。这就是为什么 Socket
除了 APM 模型之外还公开了其 EAP 风格的事件模型,以及为什么它更有效率。我们只需要一种方法将其与 TAP 可等待任务模型进行包装,而这正是 SocketAwaitable
类所要做的。我们可以在连续的读写操作中重用 SocketAwaitable
类,以重用我们套接字的底层事件参数,从而按照其设计方式使用 Socket
的基于事件的异步模型。
** 编译器在找到 await
关键字时会生成代码,该代码使用指定的方法。没有供其使用的二进制接口,因此这是编译器特定的,而不是框架/运行时特定的。
使用异步套接字操作
在我们演示应用程序中,我们将使用两种与套接字通信的方法,该应用程序公开了一个基础的 HTTP 服务器,用于发出一些静态内容。在演示中,我们公开了更多用于执行异步 HTTP 操作的扩展方法,并使用它们来创建我们的 Web 服务器。HTTP 操作本身使用我们上面的包装器来高效、异步地处理请求。最后,主程序只是启动一个监听套接字并开始请求/响应循环,在按下某个键退出之前,它会一直提供内容。基于 HTTP 的 Socket
扩展方法比演示所需的功能要多,但我提供了它们,以防您想用它构建一个小型自制 Web 服务器。核心内容在 HttpSocketUtility
中,它依赖于 HttpRequest
来返回请求信息。没有相应的 HttpResponse
对象,仅仅是因为我还没有实现它。它对演示来说不是必需的,而且我已经添加了比演示功能所需更多的内容。
我们的 HTTP 请求处理与服务器的其余部分一样,完全是异步的。它使用我们的可等待套接字扩展和 SocketAwaiter
来执行工作,从而以最高效的方式(在异步性方面)完成工作。所有操作都异步进行意味着我们不会占用太多实际的系统线程来等待和执行操作。我们从主线程执行所有操作,使用异步 I/O 完成端口(至少在 Windows 中是这样,其他平台可能不同),因此内核基本上会向我们提供驱动程序回调,我们在这些回调中进行处理,而不是在某个线程上。这对于处理大量请求的服务器来说是强大的代码,我们可以通过这种方式在单个线程上高效地处理多个请求!
如果不深入研究代码,很难进一步探讨这些概念,所以我们现在就开始吧。
编写这个混乱的程序
SocketAwaitable 类
可以对该类的实例调用 await
,这样如果从一个函数返回该类,该函数就是可等待的。我们使用它将我们的异步编程包装到 TAP 模型中。该类有三个主要职责:持有 SocketEventArgs
实例以供重用、跟踪我们的continuation,以及提供使 await
工作所需的必要成员。它只做这些。我们来看看。
/// <summary>
/// An awaiter for asynchronous socket operations
/// </summary>
// adapted from Stephen Toub's code at
// https://blogs.msdn.microsoft.com/pfxteam/2011/12/15/awaiting-socket-operations/
public sealed class SocketAwaitable : INotifyCompletion
{
// placeholder for when we don't have an actual continuation. does nothing
readonly static Action _sentinel = () => { };
// the continuation to use
Action _continuation;
/// <summary>
/// Creates a new instance of the class for the specified <paramref name="eventArgs"/>
/// </summary>
/// <param name="eventArgs">The socket event args to use</param>
public SocketAwaitable(SocketAsyncEventArgs eventArgs)
{
if (null == eventArgs) throw new ArgumentNullException("eventArgs");
EventArgs = eventArgs;
eventArgs.Completed += delegate
{
var prev = _continuation ?? Interlocked.CompareExchange(
ref _continuation, _sentinel, null);
if (prev != null) prev();
};
}
/// <summary>
/// Indicates the event args used by the awaiter
/// </summary>
public SocketAsyncEventArgs EventArgs { get; internal set; }
/// <summary>
/// Indicates whether or not the operation is completed
/// </summary>
public bool IsCompleted { get; internal set; }
internal void Reset()
{
_continuation = null;
}
/// <summary>
/// This method supports the async/await framework
/// </summary>
/// <returns>Itself</returns>
public SocketAwaitable GetAwaiter() { return this; }
// for INotifyCompletion
void INotifyCompletion.OnCompleted(Action continuation)
{
if (_continuation == _sentinel ||
Interlocked.CompareExchange(
ref _continuation, continuation, null) == _sentinel)
{
Task.Run(continuation);
}
}
/// <summary>
/// Checks the result of the socket operation, throwing if unsuccessful
/// </summary>
/// <remarks>This is used by the async/await framework</remarks>
public void GetResult()
{
if (EventArgs.SocketError != SocketError.Success)
throw new SocketException((int)EventArgs.SocketError);
}
}
构造函数中包含一些需要解释的“狡猾”代码。由于 Socket
通过事件来信号完成其异步操作,因此我们必须挂钩适当的完成委托。里面有一些看起来很吓人的代码。Interlocked.CompareExchange()
本身就不容易调用。基本上,我们正在检查 _continuation
是否为 null
,如果是,我们就使用线程安全操作将其设置为 _sentinel
,_sentinel
只是一个空方法。无论如何,我们都会将旧值或当前值存储在 prev
中,然后调用它。归根结底,这既是一个屏障,以防止 _continuation
为 null
,又是当 _continuation
不为 null
时调用它。这是为了确保每当异步套接字操作完成时,continuation 都会被触发。
另一个非平凡的方法是 INotifyCompletion.OnCompleted()
,当您 await
此类并且操作完成时,就会调用它。在这里,如果 _continuation
和 _sentinel
匹配,我们就短路并运行提供的 continuation
。否则,我们比较当前的 _continuation
和 null
,如果是,我们就用新的 continuation
替换它,然后退出,不运行任务。这是一种非常高效的处理一些复杂逻辑的方式,而无需使用内置的 if
语句,但我不太确定我会这样写,因为它很难读。至少,也许我应该写效率较低但更清晰的对应代码,并将其放在注释中。我之所以没有这样做,是因为我在这里已经解释了。目前的奇怪之处是 Stephen Toub 的代码,我不想在更改它时可能引入错误,即使它可能使其更清晰。
现在我们需要使用这个类。
ReceiveAsync() 和 SendAsync() 包装方法
我们只介绍其中的两个,因为只有这两个使用了 SocketAwaitable
。其他方法使用了我之前概述的 APM 范例。这些方法使用事件模型来执行异步操作。
/// <summary>
/// Receive data using the specified awaitable class
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="awaitable">An instance of <see cref="SocketAwaitable"/></param>
/// <returns><paramref name="awaitable"/></returns>
public static SocketAwaitable ReceiveAsync(this Socket socket,
SocketAwaitable awaitable)
{
awaitable.Reset();
if (!socket.ReceiveAsync(awaitable.EventArgs))
awaitable.IsCompleted = true;
return awaitable;
}
/// <summary>
/// Sends data using the specified awaitable class
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="awaitable">An instance of <see cref="SocketAwaitable"/></param>
/// <returns><paramref name="awaitable"/></returns>
public static SocketAwaitable SendAsync(this Socket socket,
SocketAwaitable awaitable)
{
awaitable.Reset();
if (!socket.SendAsync(awaitable.EventArgs))
awaitable.IsCompleted = true;
return awaitable;
}
这很好,文档比代码还长。在这里,我们使用 Reset()
清除上面的 continuation,这会将它设置为 null
。在开始异步套接字操作之前,我们必须这样做。然后我们基本上是在转换调用,在必要时设置 IsCompleted
标志,然后返回我们的 SocketAwaitable
以便对其进行 await
。我们接受一个参数的原因也是为了能够回收它们以供多次调用,这使得我们可以在不分配托管堆对象的情况下进行这些调用。
使用它们大致如下所示:
var recv = new byte[1024];
var args = new SocketAsyncEventArgs();
args.SetBuffer(recv, 0, recv.Length);
var saw = new SocketAwaitable(args);
await socket.ReceiveAsync(saw);
var bytesRead = args.BytesTransferred;
if (0 != bytesRead)
{
var reqheaders = new StringBuilder();
var s = Encoding.ASCII.GetString(recv, 0, bytesRead);
reqheaders.Append(s);
var i = reqheaders.ToString().IndexOf("\r\n\r\n");
while (0 > i && 0 != bytesRead)
{
await socket.ReceiveAsync(saw);
bytesRead = args.BytesTransferred;
if (0 != bytesRead)
{
s = Encoding.ASCII.GetString(recv, 0, bytesRead);
reqheaders.Append(s);
i = reqheaders.ToString().IndexOf("\r\n\r\n");
}
}
if (0 > i)
throw new Exception("Bad Request");
}
这是我们实际在 HttpSocketUtility.ReceiveHttpRequestAsync()
中使用的一些代码的一个细微变体。我只是截取了一部分。我们所做的是创建一个 byte[]
缓冲区,一些 SocketEventArgs
,最后是 SocketAwaitable
。这是读取我们的 HTTP 请求标头的代码,我们每次异步读取 1024 个字节,直到遇到两个连续的回车符("\r\n\r\n"),这表示标头的结束。每次我们获得更多数据时,我们都会将其附加到 reqheaders
。正如您所看到的,我们每次都将 saw
传递给 ReadAsync()
调用,然后检查 args.BytesTransferred
来查看我们读取了多少。它之所以有效,是因为我们将 saw
设置为使用 recv
和 args
,因此我们现在可以使用它们来查看结果。SendAsync()
的工作方式相同,只是您应该先用要发送的数据填充 args
。实际上,我发现这个重载的 SendAsync()
的用途不大,只有一个场景是我从类似 Stream
的东西读取然后发送。我通常会使用 SendFileAsync()
或其他 SendAsync()
重载。
演示 Web 服务器
现在让我们看一下 Web 服务器代码,以便我们能够看到其余部分是如何工作的。
// make Main() awaitable so we can use async/await in it
static async Task Main()
{
// create a socket
using (var socket = new Socket(SocketType.Stream, ProtocolType.Tcp))
{
// bind to localhost:8080
socket.Bind(new IPEndPoint(IPAddress.Loopback, 8080));
socket.Listen(16); // listen
// begin our accept task
var s = await socket.AcceptTaskAsync();
// when we're done accepting, process the request
await _ProcessRequest(socket, s);
}
}
static async Task _ProcessRequest(Socket socket,Socket s)
{
// we need to execute this part concurrently
var t = Task.Run(async () =>
{
// spawn another waiter
var sss = await socket.AcceptTaskAsync();
await _ProcessRequest(socket, sss);
});
// read the incoming HTTP data
var req = await s.ReceiveHttpRequestAsync();
// report it
Console.Write("Got Request on thread: " +
Thread.CurrentThread.ManagedThreadId.ToString("X")+", ");
Console.WriteLine(req.Method + " " + req.Url);
// our html to send
var html = "<html><head><title>Hello</title></head><body><h1>Hello World!</h1></body>";
// our headers
var headers = "HTTP/1.1 200 OK\nDate: "
+ DateTime.Now.ToUniversalTime().ToString("r")
+ "\nContent-Type: text/html\nContent-Length: "
+ html.Length.ToString()
+ "\nConnection: Closed\n";
// send them asynchronously
await s.SendAsync(headers + "\n" + html, Encoding.ASCII);
// disconnect (no keep-alive in demo)
await s.DisconnectAsync(false);
s.Close();
// finally wait for our accepting task if it's still running
if(!t.IsCompleted && !t.IsFaulted && !t.IsCanceled)
await t;
}
它实际上并不复杂。基本上,我们运行一个循环,但它不是真正的循环。我们调用 _ProcessRequest()
,它又调用 _ProcessRequest()
,在接受连接后再次调用以使其继续运行。之后,在 _ProcessRequest()
内部,就是我们提供服务的主要部分。它创建了一个并发任务来再次在套接字上进行接受,然后提供它刚刚获得的请求并断开连接。最后,它等待在方法顶部启动的那个并发可等待任务。目前,标头几乎是静态的,内容本身也是静态的,但在现实世界中,它可能会被动态生成或从文件中提供,并根据 req.Url
、req.QueryString
和 req.Body
的内容提供。目前,它不支持 cookie。我不想在这上面花费更多精力,因为再说一遍,这个项目甚至不是关于提供 HTTP 的。这只是我用来演示异步套接字操作的媒介。
关注点
深入研究 Task 框架并非易事。那里有很多东西,很容易在看所有东西时迷失方向,更不用说将它们组合成一个连贯的东西了。我最初大约一年前尝试了上面的方法,但我对此了解不够,无法使其正常工作,更不用说像现在这样在代码中使用 async
/await
了。这个东西实际上不再阻塞,除了等待传入套接字连接。我最近发布了一篇 文章,揭示了 Task 框架的一个晦涩角落,并且我计划发布更多内容。这里有许多有趣的领域值得探索。
历史
- 2020 年 6 月 23 日 - 初始提交