Scalable COMET 结合 ASP.NET






4.88/5 (45投票s)
一篇关于使用持久 AJAX 连接(又名 COMET)结合 ASP.NET 实现可扩展方法的文章。
免责声明
此代码尚未投入生产;它旨在演示使用 ASP.NET 中的 COMET 的理论解决方案。本文介绍了 COMET 的服务器端实现以及如何解决可扩展性问题。为了演示客户端代码,我将很快发布一篇较短的文章,该文章将使用我下面提到的 COMET 线程池机制演示一个小型井字游戏,这应该能让您对在实际应用中使用它有所了解。
引言
在过去的六个月里,我一直在构建一个在线国际象棋应用程序,玩家可以在其中注册、登录并实时下棋。我必须克服的一个障碍是如何在客户端和服务器之间实现实时通信。要实现这一点,需要解决许多因素
- 可扩展性 - 我希望它能在负载均衡的环境中运行,并且不会消耗大量服务器资源。
- 兼容性 - 我希望它能在许多不同的浏览器中运行,最好无需插件。
- 性能 - 我需要尽快将玩家的响应提供给对手,以便我可以管理时间限制并提供更好的体验。
- 简单性 - 我希望在不安装第三方服务器应用程序的情况下实现通信层。主要是为了让它能在托管环境(如www.discountasp.net)上运行。
因此,我评估了所有选项。我的第一个原型使用了轮询服务器的标准 AJAX 机制;这产生了过多的延迟和过多的流量,所以我很快就放弃了。我研究了其他传输机制,例如通过隐藏的 Flash Applet 使用套接字通信;这需要浏览器插件,所以我又放弃了。然后我发现了 COMET 的想法,并想, Bingo,这就是我想要的,于是我做了一些进一步的研究并构建了一个原型。
COMET 的理念
COMET 在客户端(Web 浏览器,使用XmlHttpRequest
)和服务器之间使用持久连接。此持久连接在服务器上保持打开状态一段预定义的时间(例如 5 秒),并且仅在接收到超时消息或服务器应用程序逻辑的某一部分想要发送消息时才响应客户端。一旦客户端收到消息,它将使用客户端实现的任何应用程序逻辑进行处理。然后重新打开持久连接,过程重新开始。
此机制解决了性能要求;这意味着无论何时需要向客户端发送消息,并且如果持久连接已打开,客户端都应该以极低的延迟接收到消息,几乎是即时的。
第二个连接用于将消息发送到服务器;此连接不是持久的,通常在处理完成后立即返回。从国际象棋游戏的角度来看,持久连接将等待对手的移动,而非持久连接将发送我的移动。
COMET 的实际应用实现
到目前为止,一切看起来都很棒;我们有了一种无需插件即可实时向浏览器发送消息的机制,但在实践中,这要困难得多。许多关于使用持久连接功能的文章都提到了这是多么“ hack ”。我倾向于不同意这些说法。
确实,COMET 在某些浏览器上运行时可能会遇到问题(主要是因为 HTTP 强制规定每个浏览器每个主机限制两个连接)。HTTP 协议的此限制是为了提供更好的性能,以便在低带宽连接(例如拨号调制解调器)上进行正常浏览,并且可能导致 COMET 运行时出现性能问题(有解决办法!)。此问题仅在 Internet Explorer 中真正明显(我认为 IE 直到 IE8 之前都严格遵守此标准);Firefox 2 允许更多连接,并且管理得更好,Firefox 3 允许更多,这意味着 COMET 类型应用程序的未来光明。
第二个问题来自该技术的可扩展性,这也是本文主要试图解决的问题。此问题之所以存在,是因为平台缺乏对 COMET 类型协议的支持,并且目前在使用持久连接时扩展性不佳。我认为这不是 COMET 整体思想的失败,而是特定 COMET 服务器实现上的失败。
其他开发人员已经开发了一些服务器,这些服务器位于我们开发平台的前面,允许我们将 COMET 请求机制与 Web 服务器分离,并通过管理它们自己的持久连接来允许解决方案进行扩展。本文将演示不应如何在 ASP.NET 中使用 COMET,以及可能的解决方案。
COMET 的负载测试
持久连接到 ASP.NET 的主要缺点是,每个连接在连接打开的五秒钟内都会占用一个 ASP.NET 工作线程。因此,每个连接的客户端都将占用 ASP.NET 线程池中的一个线程,最终,在负载下,服务器将停止响应。
为了演示这一点,我构建了一个非常简单的应用程序来模拟到 ASP.NET 应用程序的持久连接,该应用程序有一个处理程序,在返回客户端之前将请求保持打开 5 秒钟。
public class CometSyncHandler : IHttpHandler
{
#region IHttpHandler Members
public bool IsReusable
{
get { return true; }
}
public void ProcessRequest(HttpContext context)
{
int workerAvailable = 0;
int completionPortAvailable = 0;
ThreadPool.GetAvailableThreads(out workerAvailable,
out completionPortAvailable);
Debug.WriteLine("CometSyncHandler.ProcessRequest Start");
Debug.WriteLine(string.Format("Worker Threads Available: {0}",
workerAvailable));
Debug.WriteLine(string.Format("Completion Port Threads Available: {0}",
completionPortAvailable));
DateTime now = DateTime.Now;
while (true)
{
Thread.Sleep(50);
if (DateTime.Now.Subtract(now).TotalSeconds >= 5)
break;
}
Debug.WriteLine("CometSyncHandler.ProcessRequest End");
}
#endregion
}
此处理程序非常简单,它将请求的执行延迟最多 5 秒钟,然后返回。这模拟了一个最终超时并返回客户端的 COMET 请求。
我还编写了一个控制台应用程序,该应用程序将WebRequest
调用增加到CometSyncHandler
处理程序。结果基本符合预期;每个客户端都使用了 ASP.NET 工作线程,最终,在连接大约 40 个后,网站开始性能下降,页面请求开始响应非常缓慢。
下面的截图显示了这种情况
我使用了两个 CometClientSimulator 应用程序实例,模拟了大约 50 个连接,随着网站性能下降,模拟器开始掉线。要重复此测试,您可以打开并调试 CometAsync 网站,它会打开default.aspx并启动所有内容,然后打开一个 CometClientSimulator 控制台应用程序实例并键入 addsyncclients;这将模拟 25 个客户端,每两秒钟添加到 ASP.NET 应用程序中。
显然,这对于任何实际应用程序来说都不行,所以我做了一些研究并设计了一个解决方案。
IHttpAsyncHandler
这是解决方案的第一部分。这个小程序允许我们在服务器处理请求到处理程序时异步运行代码。如果您不熟悉IAsyncHttpHandler
,请阅读我下面对其工作原理的简要解释。
IHttpAsyncHandler
公开了两个主要需要实现的方法:BeginProcessRequest
和EndProcessRequest
。其基本思想是在BeginProcessRequest
中处理请求的开始;然后我们将执行交给某种异步方法,例如数据库查询执行,或任何异步 .NET 方法。异步方法完成后,然后在EndProcessRequest
中处理对客户端的响应。
下面的序列图显示了它的工作原理
CometThreadPool
上面的序列图介绍了一个自定义线程池,用于处理服务器上的 COMET 请求。这是必需的,因为我们不希望 ASP.NET 在等待 COMET 请求超时时使用其一个线程。
此池机制的代码位于网站的CometAsync文件夹中。它包含以下文件
- CometAsyncHandler - 这是
IHttpAsyncHandler
实现。 - CometAsyncResult - 这是一个自定义
IAsyncResult
实现,其中包含 COMET 异步操作的状态。 - CometThreadPool - 这是一个静态类,用于管理 COMET 线程池。
- CometWaitRequest - 这是一个代表客户端请求的对象。这些请求被排队以在自定义线程池中进行处理。
- CometWaitThread - 这是一个处理队列中
CometWaitRequest
对象的线程。
此实现的工作方式是首先创建一组后台CometWaitThread
对象,每个对象包含一个线程,该线程处理CometWaitRequest
队列项。在我们的 Web 应用程序中,我们将在global.asax文件中的Application_Start
中初始化线程池。
protected void Application_Start(object sender, EventArgs e)
{
//
// queue 5 threads to run
// the comet requests
CometThreadPool.CreateThreads(5);
}
这会启动五个线程,它们在后台空闲等待CometWaitRequest
实例进行服务。
我们的CometAsyncHandler
然后等待客户端的请求;它的职责是将请求排入线程池。
public IAsyncResult BeginProcessRequest(HttpContext context,
AsyncCallback cb, object extraData)
{
int workerAvailable = 0;
int completionPortAvailable = 0;
ThreadPool.GetAvailableThreads(out workerAvailable,
out completionPortAvailable);
Debug.WriteLine(
string.Format(
"BeginProcessRequest: {0} {1} out of {2}/{3} ({4} Requests Active)",
Thread.CurrentThread.IsThreadPoolThread,
Thread.CurrentThread.ManagedThreadId,
workerAvailable,
completionPortAvailable,
CometWaitRequest.RequestCount));
// get the result here
CometAsyncResult result =
new CometAsyncResult(context, cb, extraData);
result.BeginWaitRequest();
// ok, return it
return result;
}
BeginProcessRequest
输出一些调试信息,以便我们能够确切地了解哪些线程可用,然后创建一个CometAsyncResult
类的实例,该类跟踪HttpContext
并返回给 ASP.NET 以指示它已启动异步进程。在返回之前,它调用BeginWaitRequest
,这将请求添加到线程池中。
public void BeginWaitRequest()
{
CometThreadPool.QueueCometWaitRequest(new CometWaitRequest(this));
}
此代码创建一个CometWaitRequest
对象的实例,并将其排入线程池。
internal static void QueueCometWaitRequest(CometWaitRequest request)
{
CometWaitThread waitThread;
lock (state)
{
// else, get the next wait thread
waitThread = waitThreads[nextWaitThread];
// cycle the thread that we want
nextWaitThread++;
if (nextWaitThread == maxWaitThreads)
nextWaitThread = 0;
CometWaitRequest.RequestCount++;
}
// queue the wait request
waitThread.QueueCometWaitRequest(request);
}
此逻辑根据轮询方法选择一个CometWaitThread
来分配CometWaitRequest
(例如,如果线程 1 接收了前一个请求,则线程 2 将接收第二个)。
CometWaitThread 类
internal void QueueCometWaitRequest(CometWaitRequest request)
{
lock (this.state)
{
waitRequests.Add(request);
}
}
请求被添加到所选线程的CometWaitRequest
对象内部列表。
此时,CometAsyncHandler
已将 ASP.NET 线程返回给池,并等待CometWaitThread
完成异步过程,以便它可以完成客户端的请求。我们的CometWaitThread
代码如下。
private void QueueCometWaitRequest_WaitCallback()
{
// here we are...
// in a loop
while (true)
{
CometWaitRequest[] processRequest;
lock (this.state)
{
processRequest = waitRequests.ToArray();
}
Thread.Sleep(100);
for (int i = 0; i < processRequest.Length; i++)
{
// timed out so remove from the queue
if (DateTime.Now.Subtract
(processRequest[i].DateTimeAdded).TotalSeconds >= 5)
{
//
// queue anotehr wait callback, so
// we tell close handler down
// the endRequest will exist on a
// different thread to this
// one and not tear down this thread
processRequest[i].Result.ResponseObject =
this.CheckForServerPushEvent(processRequest[i], true);
this.QueueCometWaitRequest_Finished(processRequest[i]);
}
else
{
object serverPushEvent =
this.CheckForServerPushEvent(processRequest[i], false);
if (serverPushEvent != null)
{
// we have our event, which is good
// it means we can serialize it back to the client
processRequest[i].Result.ResponseObject =
serverPushEvent;
// queue the response on another
// ASP.NET Worker thread
this.QueueCometWaitRequest_Finished
(processRequest[i]);
// dequeue the request
DequeueCometWaitRequest(processRequest[i]);
}
}
Thread.Sleep(100);
}
}
}
QueueCometWaitRequest_WaitCallback
是此线程的入口点,并且在Application_Start
中早已启动。它一直在循环中等待其队列中出现CometWaitRequest
项;一旦客户端请求CometAsyncHandler
处理程序,就会有一个可用。
它在每个循环迭代中按顺序处理队列中的每个项目,例如,如果有三个请求待处理,它将检查请求 1、2、然后 3,然后继续循环并再次处理请求 1、2 和 3。这确保了每个请求都尽快得到处理,而无需等待前一个请求完成其 5 秒超时。
循环检查CometWaitRequest
是否在队列中停留的时间超过预定义的超时时间(在本例中为 5 秒);否则,它会检查是否有事件等待发送回客户端。如果任一情况为真,它将完成CometWaitRequest
,返回所需的响应对象,然后将其从队列中移除。
private void QueueCometWaitRequest_WaitCallback()
{
.
.
// queue the response on another ASP.NET Worker thread
this.QueueCometWaitRequest_Finished(processRequest[i]);
// dequeue the request
DequeueCometWaitRequest(processRequest[i]);
.
.
}
.
.
private void QueueCometWaitRequest_Finished(object target)
{
CometWaitRequest request = target as CometWaitRequest;
request.Result.SetCompleted();
}
.
.
QueueCometWaitRequest_Finished
方法通过调用CometAsyncResult
对象上的SetCompleted
来完成异步操作,然后调用指向CometAsyncHandler
上的EndProcessRequest
的CometAsyncResult
中的回调委托。然后执行以下代码。
public void EndProcessRequest(IAsyncResult result)
{
int workerAvailable = 0;
int completionPortAvailable = 0;
ThreadPool.GetAvailableThreads(
out workerAvailable, out completionPortAvailable);
Debug.WriteLine(string.Format("EndProcessRequest: {0} {1}" +
" out of {2}/{3} ({4} Requests Active)",
Thread.CurrentThread.IsThreadPoolThread,
Thread.CurrentThread.ManagedThreadId,
workerAvailable,
completionPortAvailable,
CometWaitRequest.RequestCount));
CometAsyncResult cometAsyncResult = result as CometAsyncResult;
if (cometAsyncResult != null &&
cometAsyncResult.ResponseObject != null)
{
DataContractJsonSerializer serializer =
new DataContractJsonSerializer(
cometAsyncResult.ResponseObject.GetType());
serializer.WriteObject(
cometAsyncResult.HttpContext.Response.OutputStream,
cometAsyncResult.ResponseObject);
}
cometAsyncResult.HttpContext.Response.End();
}
此方法通过将我们在完成时设置的任何响应对象序列化到请求的HttpContext
的响应流中来响应客户端。
在这里需要提到的一点是,实际处理请求的线程是什么。当它到达BeginProcessRequest
时,它是一个 ASP.NET 工作进程在执行,并且当CometWaitThread
完成后(无论是超时还是消息),EndProcessRequest
方法都在CometThreadPool
线程之一上执行,这意味着 ASP.NET 只使用了其线程池中的一个线程来初始化 COMET 请求,其余 5 秒是在没有 ASP.NET 线程的情况下处理的。
我们可以在下面的截图看到这一点
此时,还值得一提的是,网站的响应非常好,考虑到有 200 个持久连接,并且该框的 CPU/内存使用率良好(它还运行着客户端)。
为了双重检查一切是否顺利,我为每个请求-响应对记录一个计数器,以确保每个请求都能得到响应。下面的截图显示了运行 200 个客户端五分钟的测试输出。
这表明所有请求都已成功完成(除了 1 个,但我将其归因于关闭代码中的一个小竞争条件,您可以看到它之后完成了!)。
结论
通过实现自定义线程池,我们可以在 ASP.NET 服务器代码中利用 COMET 机制,而无需实现自定义服务器,甚至无需实现任何复杂的通信例程,只需一个简单的线程池来管理多个请求(例如,我们有五个线程管理所有 200 个 COMET 请求)。
使用代码
CometAsync 网站项目将执行并提供一个default.aspx页面,该页面列出了在该请求执行时可用于 ASP.NET 的可用线程数。
CometClientSimulator 应用程序将模拟客户端连接;应如下执行。
CometClientSimulator [websitehost]
websitehost = the host name of the website CometSync app is running e.g. https://:2251
我目前在我的国际象棋网站上使用类似的引擎,但它目前正在接受严格的测试。顺便说一句,如果您有兴趣在那里下国际象棋并想帮助测试,请给我发电子邮件至imebgo@gmail.com。谢谢!
历史
- 2008 年 6 月 19 日 - 创建。