双工 Web 服务






4.92/5 (25投票s)
使用多线程技术创建一个双工(双向)Web 服务,该服务可以将事件/消息推送到客户端。
引言
在我为一位朋友的代码审查时,我偶然发现了一种有趣的技术,可以让 Web 服务在发生某些事情时将事件或消息“推送”回客户端。我们都知道 HTTP 是一个请求/响应协议。客户端发起请求,打开一个套接字,服务器为请求分配一个线程,处理请求,返回结果,然后关闭套接字。在我看到的代码(在本示例中已重构/改进)中,Web 服务被用来在请求和响应之间通信事件或消息。
想象一个完全由 Web 服务处理的聊天室应用程序。必须在用户之间传递的各种“事件”或消息包括“用户已登录”、“用户已注销”以及发送的实际文本消息。
背景
通常,这需要通过编写一些基于 TCP 套接字的应用程序来处理,这些应用程序在服务器和客户端之间维护套接字。使用 Web 服务的优点是您不必自己管理套接字或线程池。您也不必担心配置每个客户端机器上的每个防火墙为您打开一个端口;HTTP 在端口 80 上运行,并且不会被阻止。这被称为 HTTP 隧道。您的客户端可以不通过防火墙和防病毒应用程序将它们视为特洛伊木马,而是通过 HTTP 进行通信。
此技术的主要缺点(我们将在文章中进一步看到)是它需要在服务器上占用一个线程池线程。尽管此问题的调整和性能方面超出了本文的范围,但可以说,此技术的可扩展性可能不佳。不过,如果应用程序由具有预定数量客户端的专用服务器提供服务,并且服务器上的线程池配置已正确完成,那么此解决方案将具有吸引力。
除了实际应用之外,本文还展示了编写多线程应用程序的一些挑战和技术。示例中使用的一些 .NET 线程类可以用其他类替换。我相信所选的类在功能和性能方面最适合示例,但您可以随意尝试各种构造。在本文的最后,我包含了一些关于多线程的网站链接。
Using the Code
好的,那么我如何让 Web 服务在服务器本身或其他客户端上发生某些事情时通知客户端?如前所述,Web 服务使用请求/响应协议。我们如何让它推送回消息?答案是,我们不能。我们通过在请求返回到客户端之前在服务器上停止请求来模拟推送。此过程模拟了“监听”。当发生事件时,例如另一个客户端通过另一个 Web 方法发送消息,我们的监听 Web 方法将被释放,返回一个“事件”对象,该对象表示发生的事件/消息类型,并包含任何相关数据。
这之所以可能,是因为 Web 服务可以从窗体异步调用,而不会锁定窗体的主线程。回调事件处理程序用于处理来自服务器的传入事件。
在 .NET 2.0 中,调用 Web 服务非常容易,因为您创建的每个 Web 方法都会在客户端代理上生成两个方法和一个事件。
例如,如果您创建一个名为 HelloWorld
的 Web 方法,您会看到 Web 服务的客户端代理有一个 HelloWorld
方法,一个 HelloWorldAsynch
方法和一个 HelloWorldCompleted
事件。第一个方法,正如您可能知道的那样,将执行请求中继到服务器,并阻止调用客户端线程直到发生响应。它被认为是同步的。HelloWorldAsynch
充当“即发即弃”方法。您可以从客户端代码调用它,但它永远不会阻塞线程。您的窗体可以继续处理,而无需等待 Web 方法返回值或响应。当响应到达时,它会触发 HelloWorldCompleted
事件,该事件可以由您的客户端代码处理。
上面提到的 Listen
方法就是这样工作的。当窗体加载时,它会使用该方法的异步版本调用 Listen
。当服务器上发生需要报告给客户端的事件时,会触发 ListenCompleted
事件。此事件由一个处理“事件”的方法处理。然后它再次调用 Listen
以准备下一个事件。
ListenCompleted
事件附带一个由 IDE 生成的 ListenCompletedEventArgs
对象。它包含一个与 Listen
Web 方法返回的类型相同的对象。
由于 Listen
Web 方法返回了我定义的“EventObject
”数组,因此事件参数的 Result
属性将包含该类型的对象。EventObject
是我的客户端应用程序感兴趣的监听和处理的三个事件类的祖先。它们是:LoginEvent
、LoggedOutEvent
和 MessageEvent
。
由于我的示例应用程序是一个简单的聊天室,我希望实时显示所有当前登录的用户,以及新用户登录、已登录用户注销以及有人在聊天室发送消息时的通知。
我让 Listen
Web 方法返回一个事件数组,因为在我们监听期间可能会发生各种事件。也就是说,在我们处理客户端上的事件,并在调用 ListenAsync
Web 方法之前,服务器上可能会发生各种其他事件,我们可能会错过(我将在后面的内容中介绍这些事件的缓存机制)。当我们再次监听时,我们希望一次性接收所有错过的事件。因此,是 EventObject
数组。
ListenCompleted
事件处理程序的代码如下:
/// <summary>
/// When Listen ws request returns then an event was registered and needs to be processed.
/// Process the event and call the Listen webmethod again to listen for further events
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void _ws_ListenCompleted(object sender, ListenCompletedEventArgs e) {
bool getAllUsers = false;
ChatClient.TwoWayWS.EventObject[] events = e.Result;
System.Diagnostics.Trace.WriteLine("_ws_ListenCompleted - " +
"Made it in - _formClosing=" + _formClosing);
System.Diagnostics.Trace.WriteLine("_ws_ListenCompleted - " + events.Length);
if (_formClosing)
return;
foreach (EventObject eventObj in events) {
System.Diagnostics.Trace.WriteLine("_ws_ListenCompleted - " +
"event Object type " + eventObj.ToString());
_myLastEventID = eventObj.EventID;
_myLastEventsListResetId = eventObj.EventsListResetID;
switch (eventObj.ToString()) {
case "ChatClient.TwoWayWS.LoginEvent":
// If list of users on UI is empty and we
// got a loggin event then call another ws to
// get a list of all the logged in users.
// This condition would be true first we log in
// and we want a list of the current users.
// Otherwise just add the new user(someone else logging in)
// to the bottom of our list.
if (lvLoggedInUsers.Items.Count == 0)
getAllUsers = true;
else {
LoginEvent le = eventObj as LoginEvent;
// Login user may exist in the list due to bulk
// load of all logged on users. And the cached event
// of the login can be now processed as well.
try {
if (!lvLoggedInUsers.Items.ContainsKey(le.UserName))
lvLoggedInUsers.Items.Add(le.UserName, le.UserName, 0);
}
catch { }
}
break;
case "ChatClient.TwoWayWS.LoggedOutEvent":
LoggedOutEvent lo = eventObj as LoggedOutEvent;
lvLoggedInUsers.Items.RemoveByKey(lo.UserName);
break;
case "ChatClient.TwoWayWS.MessageEvent":
MessageEvent msg = eventObj as MessageEvent;
string fromPart = "From " + msg.From + ":";
// Add the line - Note SelectedText acts as an append text function.
rtChatRoom.SelectionBackColor = Color.LightGray;
rtChatRoom.SelectionColor = Color.Crimson;
rtChatRoom.SelectedText = fromPart; // Append the text
rtChatRoom.SelectionBackColor = Color.White;
rtChatRoom.SelectionColor = Color.Black;
rtChatRoom.SelectedText = " " + msg.Message; // Append the text
rtChatRoom.SelectedText = "\n";
break;
default:
// Non-event processing - increment the non-event count
int nonEventCnt = int.Parse(lblNonEventCnt.Text);
nonEventCnt++;
lblNonEventCnt.Text = nonEventCnt.ToString();
break;
}
// Get a list of all the currently logged in users - see comments above.
if (getAllUsers) {
LoginEvent[] allLoginEvents = _ws.GetAllUsers();
foreach (LoginEvent le in allLoginEvents) {
try {
lvLoggedInUsers.Items.Add(le.UserName, le.UserName, 0);
}
catch { }
}
getAllUsers = false;
}
}
// Sleep the current UI thread to simulate network latency
Application.DoEvents();
int milliSecdelay = string.IsNullOrEmpty(txtListenDelay.Text) ? 0 :
int.Parse(txtListenDelay.Text);
System.Threading.Thread.Sleep(milliSecdelay * 1000);
txtListenDelay.Text = "0"; // reset to no delay.
// re-establish a listen request.
System.Diagnostics.Trace.WriteLine("_ws_ListenCompleted -" +
" Calling _ws.ListenAsync() for " + txtUsername.Text);
_ws.ListenAsync(_myLastEventID, _myLastEventsListResetId);
}
服务器回推和线程管理
让我们来分析一下 Web 服务是如何通过处理上述事件并将它们推送到所有正在监听的客户端来充当聊天应用程序的服务器的。
核心目标机制
Listen
方法中的主要机制是使用 AutoResetEvent
类型的 WaitHandle
来使 Listen
事件阻塞,并将该 WaitHandle
注册到全局列表中。
.
.
.
waithandle = new AutoResetEvent(false);
Global.WaitingListenerList.Add(waithandle);
Global.EventBeingDelivered = false;
}
// Block the response until an "event" occurs.
System.Diagnostics.Trace.WriteLine("Listen - Aquired waithandle and blocked IIS thread");
//****************************************************************************
// Wait for an event, but continue before the request times out.
listenTimedOut = !waithandle.WaitOne(Global.EventWaitTimeout, true);
事件到达时将遍历列表,并使用 WaitHandle
上的 Set
方法释放每个 WaitHandle
。因此,如果我们查看 SendObjectEvent
方法的代码片段(每个事件都会发生),我们会看到如下内容:
// Release the waiting response threads
foreach (AutoResetEvent waitingListner in Global.WaitingListenerList) {
waitingListner.Set();
}
SendObjectEvent
然后进入自己的等待状态,该状态仅在所有 Listen 线程都被释放后才会释放,以便它可以执行一些事件释放后的维护工作。这是通过 Listen
方法在返回事件对象到客户端之前递减监听器计数来实现的。当计数为零时,它通过一个名为 AllListenersReleased
的全局等待句柄向 SendObjectEvent
方法发出信号。
这就是概念的简要概述。当然,没有多线程解决方案会如此简单。但是,在我深入研究编写此示例应用程序时出现的各种问题和解决方案之前,理解核心目标机制很重要。
- 让
Listen
方法自行阻塞线程,这样它就不会在发生某些事件并准备好返回与该事件相关的数据给客户端之前返回响应。 - 保留每个运行
Listen
Web 方法的客户端应用程序的阻塞WaitHandle
对象在 Web 服务器线程上。WaitHandle
列表保留在一个全局列表中。 - 当发生需要推送到所有正在监听的客户端的事件时,
SendObjectEvent
通过对我们在全局列表中累积的每个WaitHandle
发出Set
来释放每个监听线程。然后,它将阻塞/等待直到所有监听器都已释放。 Listen
方法向等待的SendObjectEvent
发出信号,表明所有监听线程都已释放,以便它可以执行一些维护工作,例如取消注册监听器WaitHandle
(客户端将在后续调用Listen
时重新注册WaitHandle
)。
深入研究
辅助问题
在核心目标完成之前,必须解决以下问题和挑战:
- 同步 -
Listen
Web 方法和SendEventObject
方法(所有事件都调用该方法来释放监听器并传递事件特定数据)必须同步且线程安全。 - 在发送事件时无法开始
Listen
。 - 在
Listen
方法注册其等待句柄时无法发送事件。 - 一次只能处理一个事件。
- 不想同时注册两个监听器等待句柄。
- 事件缓存 - Web 服务必须缓存事件,直到它确定所有客户端都已收到它们。当事件在一名或多名客户端因忙于处理先前事件而未监听时被推送回时,这会成为一个问题。
- 如果客户端有未完成的事件,应在随后的
Listen
调用中立即返回它们。 - Web 服务必须有一种合理的方法来确定所有事件是否已发送到所有客户端。一旦确定,就可以清除事件缓存。此时也可以清除等待句柄列表。
- 请求超时 - 必须解决请求超时问题。在超时发生之前,我们必须向客户端推送一个虚拟事件。客户端反过来会识别出它收到了一个“非事件”事件,并简单地重新执行
Listen
。
同步
代码中有各种锁,以确保多个线程不会同时修改全局变量。我将重点关注同步 Listen
Web 方法和所有“应用程序事件”用于推送回事件的 SendEventObject
方法访问的锁。
概念上讲,监听和发送事件需要同步。我不想在 Listen
方法尝试将其等待句柄注册到全局列表时发送事件。这可能会导致竞态条件,即 Listen
方法尚未注册接收事件,但事件已发布给所有正在监听的线程。由于假定所有已注册的监听线程都已收到事件,因此可以清除事件缓存,并且客户端可能会丢失事件。
此外,我也不想同时处理两个事件。虽然我可以向客户端推送多个事件,但我想一次管理“一个事件/一次释放”到所有正在监听的客户端。
以下是 Listen
方法的前几行。一个通用锁对象 (Global.LockObj
) 用于代码块,以保护正在被更改的各种全局变量——这必须原子地完成。在该代码块内,我进入一个循环,检查是否正在传递事件,并阻塞当前线程(如果正在传递)。
[WebMethod]
[XmlInclude(typeof(EventObject))]
[XmlInclude(typeof(LoginEvent))]
[XmlInclude(typeof(LoggedOutEvent))]
[XmlInclude(typeof(MessageEvent))]
public EventObject[] Listen(int myLastEventID, int myLastEventsListResetId) {
AutoResetEvent waithandle;
bool listenTimedOut = false;
EventObject[] returnedEvents;
lock (Global.LockObj) {
// Don't register a listener while event is being delivered
while (Global.EventBeingDelivered)
Monitor.Wait(Global.LockObj);
//Release the lock and wait for a pulse
//to re-check message delivery state again
Global.EventBeingDelivered = true;
// Immediatly return any outstanding events
// that have accumulated while I wasn't listening.
returnedEvents = EventUtils.PrepareEventsForSending(myLastEventID,
myLastEventsListResetId);
if(returnedEvents.Length > 0) {
Global.EventBeingDelivered = false;
return returnedEvents;
}
waithandle = new AutoResetEvent(false);
Global.WaitingListenerList.Add(waithandle);
Global.EventBeingDelivered = false;
}
我使用一个名为 EventBeingDelivered
的全局状态变量来确定客户端是否正在注册 Listen
,或者事件是否正在发送给所有客户端。由于我使用的是“阻塞条件标志”,如 Joseph Albahar 出色的 C# 线程电子书中提到的,我选择 Monitor
类作为 Listen
和 SendEventObject
方法之间的同步机制。
这很重要,因为 Monitor.Pulse
可能已执行,表明应用程序事件已成功处理,但另一条运行 SendEventObject
的线程可能在 Listen
代码执行之前获得了执行控制。因此,我们在继续注册之前检查阻塞条件标志的状态。如果标志仍然为 true,我们将循环到另一个等待。
SendEventObject
方法中存在相同的代码,以确保另一条线程在处理当前线程的事件之前不会发送事件。
public class EventUtils {
public static void SendEventObject(EventObject eo) {
lock (Global.LockObj) {
// Don't register a listener or process a new
// event/message while an event/message is being delivered
while (Global.EventBeingDelivered)
Monitor.Wait(Global.LockObj); //
// Cause New Listeners and events to wait until
// message is delivered to all existing listeners
Global.EventBeingDelivered = true;
System.Diagnostics.Trace.WriteLine("Event processing commenced");
Global.NewEventID++;
// Add the current event to the list.
Global.EventsList.Add(eo);
// Register the amount of listeners waiting to get events
Global.ListenerCount = Global.WaitingListenerList.Count;
// Release the waiting response threads
foreach (AutoResetEvent waitingListner in Global.WaitingListenerList) {
waitingListner.Set();
}
}
// Before clearing the event and the listener
// waithandles, wait for all the listeners to release.
Global.AllListenersReleased.WaitOne();
lock (Global.LockObj) {
Global.LastEventID = Global.NewEventID;
// If all logged-on users got the last event, clear the events list
if (Global.loggedInUsers.Count == Global.WaitingListenerList.Count) {
// TODO: Need to clear the Global.NewEventID,
// but also figure a way to notify the threads to reset their event ids
Global.EventsList.Clear();
Global.NewEventID = 0;
Global.EventsListResetId++;
}
// Clear all the listeners
Global.WaitingListenerList.Clear();
// Once an event has been delivered to all waiting listeners,
// allow new listeners waiting to register to attempt registration,
// or process a new event/messages.
Global.EventBeingDelivered = false;
Monitor.PulseAll(Global.LockObj);
}
System.Diagnostics.Trace.WriteLine("SendEventObject - " +
"Released listeners and cleared lists");
}
如果您想知道为什么 Global.LockOb
j 不足以同步这两个方法,那是因为这两个方法都会退出锁块并进入等待状态。Listen
方法等待应用程序事件,而 SendEventObject
等待所有 Listen
方法完成后再继续进行一些维护。
事件缓存
如上所述,我们需要一种机制来缓存事件,直到我们可以确保所有客户端都已收到它们。这是通过逻辑实现的,而不是通过技术实现的。
所有事件都缓存在一个名为 EventList
的应用程序事件对象全局列表中。通过为每个事件保留一个增量 EventId
,以及我称之为 EventsListResetID
的东西(全局地和在被推回的事件对象上),我可以确定自上次 Listen
以来是否发生了事件。
PrepareEventForSending
方法将事件从客户端上次接收事件的点返回给客户端。
// <summary>
/// Each waiting response thread will call this method
/// to return the current and all outstanding events.
/// It's assummed that all events missing by the client
/// are retained in the events list since the list
/// is not cleared until all logged on users a fully notified.
/// </summary>
/// <param name="fromEventIndex"></param>
/// <returns></returns>
internal static EventObject[] PrepareEventsForSending(int myLastEventID,
int myLastEventsListResetId) {
// If the eventlist hasnt been reset
// since the last time the client received an event,
// return the next event in the list.
if (myLastEventsListResetId == Global.EventsListResetId) {
if (myLastEventID > Global.NewEventID)
myLastEventID = 0;
}
else
// List has been reset since client last
// got an event, so return all events from
// the top of the list.
myLastEventID = 0;
// Determine how many events the client is missing.
int numberOfEventsImMissing = Global.NewEventID - myLastEventID;
// Determine where in the list the missing events start from.
int startIndex = Global.EventsList.Count - numberOfEventsImMissing;
int returnIndex = 0;
// Assemble an array of missing events for this client.
EventObject[] eventToReturn = new EventObject[numberOfEventsImMissing];
for (int missingEventIndex = startIndex;
missingEventIndex < Global.EventsList.Count;
missingEventIndex++) {
eventToReturn[returnIndex] = Global.EventsList[missingEventIndex];
eventToReturn[returnIndex].EventID = missingEventIndex + 1;
eventToReturn[returnIndex].EventsListResetID = Global.EventsListResetId;
returnIndex++;
}
return eventToReturn;
每次 EventList
被清除时,EventsListResetId
都会递增。EventList
在所有注册的客户端接收到事件后被清除。
您会注意到,当客户端的最后一个事件列表 ID 与服务器上的全局 ID 不匹配时,我们会将整个列表返回给客户端。这是因为不匹配表明客户端的最后一个 EventID
(充当列表上的索引)不再有效,因为自客户端上次监听事件以来列表已被清除。因此,您的 EventID
不再有效,现在可以从零开始,这意味着,请给我新的列表开头的事件。
换句话说,客户端的最后一个 EventsListResetId
和服务器的 EventsListResetId
之间的不匹配意味着您上次获取事件时,所有人都收到了事件,我们清除了 EventList
。因此,您的 EventID
不再有效,现在可以从零开始,这意味着,请给我从新列表开始的事件。
列表的清除在 SendEventObject
方法的末尾完成,方法是将登录用户数与阻塞监听器的注册等待句柄数进行比较。如果两者相等,则假定所有登录用户都收到了最后一个事件以及任何未完成的事件,因此我们可以清除列表并递增 EventsListResetID
。
/// Before clearing the event and the listener
/// waithandles, wait for all the listeners to release.
// ***********************************************************
Global.AllListenersReleased.WaitOne();
// ***********************************************************
lock (Global.LockObj) {
Global.LastEventID = Global.NewEventID;
// If all logged-on users got the last event, clear the events list
if (Global.loggedInUsers.Count == Global.WaitingListenerList.Count) {
Global.EventsList.Clear();
Global.NewEventID = 0;
Global.EventsListResetId++;
}
// Clear all the listeners
Global.WaitingListenerList.Clear();
请求超时
这部分很简单。在 Global.asa 中,我确定请求时间,然后取其 90% 作为我的 EventWaitTimeout
。
protected void Application_Start(object sender, EventArgs e)
{
// Set the event timeout to be less than the page timeout.
Global.EventWaitTimeout =
(int)(HttpContext.Current.Server.ScriptTimeout * .90) * 1000;
}
在 Listen
Web 方法中,使用此值作为时间限制设置了一个阻塞。当达到限制时,方法停止阻塞,并使用我们用于发送真实事件的相同机制向所有人发送一个虚拟事件(这就是为什么每个人都会收到通知)。用于虚拟事件的事件类是我所有事件类的父类。客户端知道忽略这些事件,并重新执行 Listen
。
//********************************************************************
// Wait for an event, but continue before the request times out.
listenTimedOut = !waithandle.WaitOne(Global.EventWaitTimeout, true);
//********************************************************************
// When the code continues from here it's either because
// an event was recieved or the wait for one has timed out.
//********************************************************************
System.Diagnostics.Trace.WriteLine("Listen - Waithandle released");
if (listenTimedOut) {
// No events to return? Return an empty non-event.
if (Global.EventsList.Count == 0) {
EventObject dummyEvent = new EventObject();
lock (Global.LockObj) {
dummyEvent.EventID = myLastEventID;
dummyEvent.EventsListResetID = myLastEventsListResetId;
}
// Fire off the dummy event asynchrously
// and wait for it (short wait) so as to stay
// within the wait/event paradigm in this example.
SendDummyEventDelegate sendEventDel =
new SendDummyEventDelegate(EventUtils.SendEventObject);
sendEventDel.BeginInvoke(dummyEvent, null, null);
waithandle.WaitOne();
}
}
结论
只需少量代码,您就可以创建为客户端应用程序提供实时事件的服务器。虽然我没有尝试过,但我相信相同的技术可以用于使用 AJAX 和页面方法的 Web 客户端,或者至少是 Web 服务方法。
注意:如果您使用 ASP.NET 开发服务器运行示例应用程序,您需要在每次运行时停止它。