65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 MSMQ/双工 WCF/SignalR/jQuery 将实时结果流式传输到网站

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (81投票s)

2012年2月4日

CPOL

18分钟阅读

viewsIcon

274134

downloadIcon

4473

使用 MSMQ/双工 WCF/SignalR/jQuery 将实时结果流式传输到网站

目录

引言

场景设定……我工作的地方是一家外汇(Foreign Exchange)公司,我们为世界各地的客户进行外汇交易。有一天,我的老板找到我,说他希望能够实时地可视化交易发生地点,但他希望它看起来很酷,像一个闪亮的展示品(我相信你懂我的意思)。他明确表示不要网格。我很高兴。

我的队友 Richard 和我接手了这个任务,我们思考了一下,查看了我们手头有哪些信息,并想知道我们是否能制作一个通用的实时事件观察器,同时也能生成一个闪亮的界面供我们展示。

我们可用的信息并不多,基本上只有以下几项:

  1. Tcpip 地址
  2. 一个描述实时事件类型的任意字符串,例如 "ClientDeal"、"ExchangeFund" 等等
  3. 客户名称

于是我们又思考了一下,得出了类似这样的方案。

  1. 我们可以扩展我们的日志框架(我们使用 Log4Net),创建一个自定义的 MessageQueue (MSMQ) appender,它会将某些事件和一些额外数据(如 Tcpip 地址)记录到 MessageQueue 中。显然,我们不能分享整个应用程序,所以我们提供了一个测试消息发布器,它只是简单地将测试消息写入 MessageQueue。如果你想自己生成实时事件,这部分应该很容易搞定。
  2. 我们可以让一个 WCF 服务实时读取这些 MessageQueue 条目。该服务可以接收订阅者,每个订阅者可以使用事件名称订阅单个事件,或者通过在订阅时传递事件名称数组来订阅多个事件。
  3. 我们可以让 WCF 服务使用回调(callbacks)向订阅者实时推送通知。
  4. 我们还可以使用 Google Earth 来实时显示这些事件(如果我们能获取到事件的地理位置数据)。

这些大部分都是相当标准的东西,真正有趣的部分是将通知从服务器端的 Web 代码实时推送到浏览器。我不知道你们中有多少人见过这个,但这就像谷歌在你打开一个谷歌搜索页面并搜索一些热门内容(比如某个新闻事件)时所做的那样,谷歌会实时地将结果流式传输到你打开的搜索页面中。

这非常酷,通常使用长轮询(long polling)或各种其他使用 Ajax/Comet 技术的技巧来实现,所有这些都很难设置和运行(至少在我们看来是这样)。

我们最终使用的是一个相当新的库,叫做 SignalR,我得说它真是太酷了。

我们将在文章的后面部分详细介绍我们实现这一目标的所有不同组成部分。需要记住的一点是,我们的需求是在 Google Earth 上展示东西,你可能不想这样做。然而,SignalR 的用法可以应用于任何你想要将实时数据直接流式传输到用户浏览器的场景,比如搜索结果、某种流式变化的数据,如实时市场汇率,或者说来也巧,外汇汇率。真有趣。

演示视频

这个演示视频展示了 Web 项目从测试发布者项目实时接收事件。整个过程的完整路径是这样的:

Msmq -> WCF -> Xml 解析 -> 地理位置查询 -> WCF 回调 -> 网站 -> SignalR -> Javascript -> Google Earth API

需要注意的是,测试发布器是从一小组已知的 TcpIp 地址中随机选择的,所以你可能会看到同一个 TcpIp 地址(因此是同一个地理位置)被连续选中。这是在小数据集上随机性的本质。

无论如何,点击下面的图片下载视频(大约 180MB,抱歉,屏幕录制软件生成的文件很大),视频应该很清楚地展示了正在发生的事情,基本上,测试发布者项目正在向一个 MessageQueue 发布事件,然后这些消息通过 WCF 和 SignalR 库的使用实时推送到 Google Earth(流式传输)。

如何运行演示

演示代码实际上包含几个项目,必须按特定顺序运行,所以以下是正确运行演示代码需要做的步骤:

  1. 运行 Codeproject.EventBroker.TestMessagePublisher 项目并选择自动模式(Automatic mode)
  2. 运行 Codeproject.EventBroker.Host 项目(在 DEBUG 模式下运行,这样它就会是一个自托管的控制台 WCF 应用程序)
  3. 运行 Codeproject.EventBroker.WebUI 项目,稍等片刻,你应该会看到 Google Earth 导航到世界的不同地方。

先决条件

有一些先决条件,但大多数都包含在附加的演示代码中。下表列出了所有先决条件,并告诉您它们是否包含在附加的演示代码中,或者您是否**必须**安装它们才能运行代码。

项目已包含
MSMQ

您**必须**开启并运行此项(它是标准的 Windows 组件)
命名的 MSMQ 私有队列

您**必须**有一个名为 "eventbroker" 的队列(或者您在 TestMessagePublisher 和 WebUI 项目配置文件中配置的任何名称)
IIS Express 7.5

您**必须**从微软下载页面安装此程序:http://www.microsoft.com/download/en/details.aspx?id=1038
Castle Windsor

见 Lib\Castle\1.2.0.6623 文件夹
Log4Net

见 Lib\Log4Net\1.2.10.0\log4net.dll
SignalR

见 Lib\Microsoft\SignalR\SignalR.dll

总体设计

我认为最好的入门方式是参考下图,该图试图概述所附演示代码的不同部分。

每个黑色轮廓的框代表演示项目中的一个项目,而橙色轮廓的框代表通过使用 SignalR dll 暴露的功能。

我们将在下面更详细地介绍这些项目以及 SignalR 的使用,但现在,这里是对每个项目功能的简短描述。

  • Codeproject.EventBroker.TestMessagePublisher:这是一段一次性的代码。这个项目的唯一工作就是模拟实时消息的生成。
  • Codeproject.EventBroker.Service:这是一个双工 WCF 服务,它从 TestMessagePublisher 正在写入的 MessageQueue 中读取消息。要运行此服务,您需要启动 WCF 主机项目 Codeproject.EventBroker.Host
  • Codeproject.EventBroker.WebUI:这是一个标准的 ASP.NET 项目,它在一个网页中托管了 Google Earth 的一个实例。该网页还调用了一些服务器端的 SignalR 代码,该代码随后订阅 WCF 服务,并接受提供实时值的回调,这些值然后通过 SignalRGoogle Earth 上实时显示。

测试发布者

正如我们已经说过的,Codeproject.EventBroker.TestMessagePublisher 项目是一段一次性的代码,仅用于模拟实时消息的发生。

当您运行此项目时,它看起来会是这样。

可以看到有两个单选按钮和一个开始按钮。这两个单选按钮用于确定如何将测试消息写入 MessageQueue

  • 自动(Automatic):点击“开始”后,每 x 秒创建一条新消息。
  • 手动(Manual):只有当您点击“开始”时才会创建一条新消息。

这里是写入 MessageQueue 的相关代码

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Configuration;
using System.Data;
using System.Diagnostics;
using System.Drawing;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading;
using System.Windows.Forms;
using System.Threading.Tasks;
using Message = System.Windows.Forms.Message;




namespace Codeproject.EventBroker.TestMessagePublisher
{
    public partial class MainWindow : Form
    {
        private enum RunMode { Automatic = 1, Manual }
        private RunMode CurrentRunMode = RunMode.Automatic;
        private string inputQueueName = ConfigurationManager.AppSettings["eventBrokerQueueName"];
        private List<string> places = new List<string>();
        private List<int> waits = new List<int>();
        private Random rand = new Random();
        private bool listenToSelectionChanges = true;
        private bool stopAuto = false;

        public MainWindow()
        {
            InitializeComponent();
            places.Add("220.233.19.142");
            places.Add("64.233.160.0");
            places.Add("91.135.229.5");

            waits.Add(1000);
            waits.Add(2000);
            waits.Add(4000);
            waits.Add(5000);
            waits.Add(8000);
        }

        public void SendMessages()
        {
            Task.Factory.StartNew(() =>
            {
                while (!stopAuto)
                {
                    SendMessage();
                    Thread.Sleep(10000);
                }
            }, TaskCreationOptions.LongRunning);
        }
        
        public string GetXmlData(string tcpIpAddress)
        {
            return string.Format(
                "<realtimeEvent>" +
                  "<originatingIp>{0}</originatingIp>" +
                  "<eventName>ClientDealEvent</eventName>" +
                  "<entityIdType>ClientDeal</entityIdType>" +
                  "<description>Someone bought something</description>" +
                  "<date>{1}</date>" +
                  "<additionalData></additionalData>" +
                "</realtimeEvent>", tcpIpAddress, DateTime.Now);
        }

        private void btnCreateManual_Click(object sender, EventArgs e)
        {
            if (radAuto.Checked)
            {
                SendMessages();
            }
            else
            {
                SendMessage();
            }
        }

        private void SendMessage()
        {
            using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Send))
            {
                queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
                try
                {
                    System.Messaging.Message message = new System.Messaging.Message(
                        GetXmlData(places[rand.Next(places.Count)]));
                    Debug.WriteLine("Producing message {0}", message.Body.ToString());
                    queue.Send(message);
                }
                catch (MessageQueueException mex)
                {
                    if (mex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
                    {
                        Debug.WriteLine("Message queue exception occured", mex);
                    }
                }
                catch (Exception ex)
                {
                    // Write the message details to the Error queue
                    Debug.WriteLine("Exception occured", ex);
                }
            }
        }

        private void CheckedChanged(object sender, EventArgs e)
        {
            if (radAuto.Checked)
            {
                stopAuto = false;
            }
            else
            {
                stopAuto = true;
            }
        }
    }
}

可以看到,我们选择的消息结构是一小段 XML,看起来像这样:

<realtimeEvent>
   <originatingIp>192.168.0.1</originatingIp>
   <eventName>ClientDealEvent</eventName>
   <entityIdType>ClientDeal</entityIdType>
   <description>Someone bought something</description>
   <date>02/01/2012</date>
   <additionalData></additionalData>
</realtimeEvent>

我们在 Codeproject.EventBroker.TestMessagePublisher 项目的 App.Config 文件中配置使用哪个 MessageQueue 队列。

<?xml version="1.0"?>
<configuration>
  <appSettings>
    <add key="eventBrokerQueueName" value="FormatName:Direct=OS:localhost\private$\eventbroker"/>
  </appSettings>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/>
  </startup>
</configuration>

可以看到,默认的队列名称是 "eventbroker",它被期望是本地机器上的一个私有队列。但这个队列可以位于任何地方,这里只是为了向您展示可以在哪里进行配置。

另一个重要的注意事项是,"eventbroker" MessageQueue 队列**不能**是事务性的。因为演示代码假定它不是事务性的,如果您想使队列成为事务性的,您需要修改 Codeproject.EventBroker.TestMessagePublisher MessageQueue 的写入代码和 Codeproject.EventBroker.Service MessageQueue 的读取代码。

如果你想让它成为事务性的,那没问题,但你**必须**修改代码来实现。另外请注意,你还需要为新创建的 "eventbroker" MessageQueue 队列授予用户访问权限。我通常会使用我自己的登录名并授予所有权限。

双工 WCF 服务

Codeproject.EventBroker.Service 是一个双工 WCF 服务,它基本上具有以下客户端可以调用的契约:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using Codeproject.EventBroker.Contracts.Faults;

namespace Codeproject.EventBroker.Contracts.Service
{
    [ServiceContract(Namespace = "http://Codeproject.EventBroker.Contracts", 
                     SessionMode=SessionMode.Required,
                     CallbackContract=typeof(IEventBrokerCallback))]
    public interface IEventBroker
    {
        [OperationContract(IsOneWay = false)]
        [FaultContract(typeof(EventBrokerException))]
        void Subscribe(Guid subscriptionId, string[] eventNames);

        [OperationContract(IsOneWay = true)]
        void EndSubscription(Guid subscriptionId);
    }
}

此服务还期望客户端提供一个满足此接口的回调契约:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using Codeproject.EventBroker.Contracts.Data;

namespace Codeproject.EventBroker.Contracts.Service
{
    public interface IEventBrokerCallback
    {
        [OperationContract(IsOneWay = true)]
        void ReceiveStreamingResult(RealTimeEventMessage streamingResult);
    }
}

该服务托管在 Codeproject.EventBroker.Host 项目中,当在 RELEASE 模式下运行时,它将是 Codeproject.EventBroker.Contracts.Service.EventBroker 的 Windows 服务主机,而在 DEBUG 模式下运行时,它将是一个托管 Codeproject.EventBroker.Contracts.Service.EventBroker WCF 服务的简单控制台应用程序。

Codeproject.EventBroker.Contracts.Service.EventBroker 服务的骨架实现如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using Codeproject.EventBroker.Contracts.Service;
using Codeproject.EventBroker.Service.Data;
using System.Threading.Tasks;
using System.Configuration;
using Codeproject.EventBroker.Contracts.Faults;
using System.Messaging;
using Codeproject.EventBroker.Common;
using Codeproject.EventBroker.Service.Utils;
using Codeproject.EventBroker.Contracts.Data;
using Codeproject.EventBroker.Service.Extensions;
using Codeproject.EventBroker.Service.Services.Contracts;
using System.Threading;

namespace Codeproject.EventBroker.Service
{
    [ServiceBehavior(    InstanceContextMode = InstanceContextMode.Single, 
            ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class EventBroker : IEventBroker
    {
        ....
        ....
        ....

        public EventBroker()
        {
            inputQueueName = ConfigurationManager.AppSettings["eventBrokerQueueName"].ToString();
            StartCollectingMessage();
            xmlParser = IOCManager.Instance.Container.Resolve<IXmlParser>();
           }

        public void StartCollectingMessage()
        {
         ....
         ....
         ....
        }

        public void Subscribe(Guid subscriptionId, string[] eventNames)
        {
         ....
         ....
         ....
        }

        public void EndSubscription(Guid subscriptionId)
        {
         ....
         ....
         ....
        }
    }
}

接受订阅者

当新的订阅者订阅此 WCF 时,会发生以下情况:

  1. 对于订阅者希望订阅的每个事件名称,执行以下操作:
    1. 如果当前没有该事件名称的订阅,则创建一个空的订阅列表。
    2. 查看是否已存在该事件名称的订阅列表,如果存在,则将订阅者的 ID 和回调上下文(IEventBrokerCallback)添加到该事件名称的全局订阅者字典中。

以下是订阅发生时最相关的代码:

private void CreateSubscription(Guid subscriptionId, string[] eventNames)
{
    //Ensure that a subscription is created for each message type the subscriber wants to receive
    lock (syncObj)
    {
        foreach (string eventName in eventNames)
        {
            if (!eventNameToCallbackLookups.ContainsKey(eventName))
            {
                List<UniqueCallbackHandle> currentCallbacks = new List<UniqueCallbackHandle>();
                eventNameToCallbackLookups[eventName] = currentCallbacks;
            }
            eventNameToCallbackLookups[eventName].Add(
                new UniqueCallbackHandle(subscriptionId, 
            OperationContext.Current.GetCallbackChannel<IEventBrokerCallback>()));
        }
    }
}

移除订阅

一旦订阅者选择结束其订阅,他们可以使用 void EndSubscription(Guid subscriptionId) 操作契约来完成。

当订阅者结束订阅时,会发生以下情况:

  1. 对于全局订阅者字典中每个事件名称:
    1. 获取所有与正在取消订阅的订阅者 subscriptionId 不同的订阅。
    2. 在移除因订阅者结束订阅而不再需要的所有订阅后,用剩余的订阅创建一个新的全局订阅者字典。

以下是当 EndSubscription 发生时最相关的代码:

public void EndSubscription(Guid subscriptionId)
{
    lock (syncObj)
    {
        //create new dictionary that will be populated by those remaining
        Dictionary<string, List<UniqueCallbackHandle>> remainingEventNameToCallbackLookups = 
            new Dictionary<string, List<UniqueCallbackHandle>>();

        foreach (KeyValuePair<string,List<UniqueCallbackHandle>>  kvp in eventNameToCallbackLookups)
        {
            //get all the remaining subscribers whos session id is not the same as the one we wish to remove
            List<UniqueCallbackHandle> remainingMessageSubscriptions = 
                kvp.Value.Where(x => x.CallbackSessionId != subscriptionId).ToList();
            if (remainingMessageSubscriptions.Any())
            {
                remainingEventNameToCallbackLookups.Add(kvp.Key, remainingMessageSubscriptions);
            }
        }
        //now left with only the subscribers that are subscribed
        eventNameToCallbackLookups = remainingEventNameToCallbackLookups;
    }
}

执行回调

WCF 服务中有趣的部分是实际对订阅者的回调。但这个回调应该在什么时候发生呢?

嗯,只有当我们有东西要传递给订阅者时,才应该对他们进行回调,也就是当我们从 MessageQueue 接收到一条消息,并且它符合订阅者的订阅需求时(基本上是传入消息的 EventName 与订阅者在订阅时使用的 EventName 相匹配)。

由于此 WCF 服务旨在被许多客户端使用,因此有多个线程在运行。有主 WCF 线程,还有一个新启动的线程,用于处理从 MessageQueue 读取数据,并在传入事件的 EventName 有活动订阅者时将消息分发回给他们。

这个过程基本上可以在以下两个 WCF 方法中看到:

读取传入的 MessageQueue 消息

private void GetMessageFromQueue()
{
    try
    {
        Task messageQueueReaderTask = Task.Factory.StartNew(() =>
            {
                using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Receive))
                {
                    queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });

                    while (shouldRun)
                    {
                        Message message = null;
                        try
                        {
                            if (!queue.IsEmpty())
                            {
                                LogManager.Log.Debug("Receiving queue message");
                                message = queue.Receive(queueReadTimeOut);
                                ProcessMessage(message);
                            }
                        }
                        catch (MessageQueueException e)
                        {
                            if (e.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
                            {
                                LogManager.Log.Warn("Message queue exception occured", e);
                            }
                        }
                        catch (Exception e)
                        {
                            // Write the message details to the Error queue
                            LogManager.Log.Warn("Exception occured", e);
                        }
                    }
                }
            }, TaskCreationOptions.LongRunning);
    }
    catch (AggregateException ex)
    {
        throw;
    }
}

对订阅者进行回调

这段代码做的唯一另一件聪明的事情是,如果在尝试向订阅者发送消息时遇到 CommunicationObjectAbortedException,那么该订阅者将被假定为出现故障并被移除。你会看到订阅者也有处理故障通道的机制,这在发布/订阅模式下并不那么容易。例如,一个订阅者可能出现故障,但其他所有订阅者都可能正常,那么我们是否应该重启 ServiceHost 呢?可能不应该。这就是我们在这里采取的方法,我们试图尽可能地容错,只有在通道完全故障时才诉诸于重启 ServiceHost

private void ProcessMessage(Message msmqMessage)
{
    string messageBody = (string)msmqMessage.Body;
    LogManager.Log.DebugFormat("ProcessMessage : {0}", messageBody);

    RealTimeEventMessage messageToSendToSubscribers = xmlParser.ParseRawMsmqXml(messageBody);
    if (messageToSendToSubscribers != null)
    {
        lock (syncObj)
        {
            List<Guid> deadSubscribers = new List<Guid>();
            if (eventNameToCallbackLookups.ContainsKey(messageToSendToSubscribers.EventName))
            {
                List<UniqueCallbackHandle> uniqueCallbackHandles = 
                    eventNameToCallbackLookups[messageToSendToSubscribers.EventName];
                foreach (UniqueCallbackHandle uniqueCallbackHandle in uniqueCallbackHandles)
                {
                    try
                    {
                        uniqueCallbackHandle.Callback.ReceiveStreamingResult(messageToSendToSubscribers);
                    }
                    catch(CommunicationObjectAbortedException coaex)
                    {
                        deadSubscribers.Add(uniqueCallbackHandle.CallbackSessionId);
                    }
                }
            }

            //end all subcriptions for dead subscribers
            foreach (Guid deadSubscriberId in deadSubscribers)
            {
                EndSubscription(deadSubscriberId);
            }
        }
    }
}

可以看到,处理传入的 MessageQueue 消息的代码在将其发送给订阅者之前,将传输的 xml 消息体解析成一个实际的 RealTimeEventMessage 对象。这个 xml 解析代码如下所示:

public class XmlParser : IXmlParser
{
    private IGeoLocator geoLocator;

    public XmlParser(IGeoLocator geoLocator)
    {
        this.geoLocator = geoLocator;
    }

    public RealTimeEventMessage ParseRawMsmqXml(string messageBody)
    {
        //<realtimeEvent>
        //  <originatingIp></originatingIp>
        //  <eventName>ClientDealEvent</eventName>
        //  <entityIdType>ClientDeal</entityIdType>
        //  <description>Someone bought something</description>
        //  <date>2012-01-16T15:31:31</date>
        //  <additionalData></additionalData>
        //</realtimeEvent>

        try
        {
            RealTimeEventMessage info = new RealTimeEventMessage();
            XElement xelement = XElement.Parse(messageBody);
            string ipAddress = GetSafeString(xelement, "originatingIp"); 
            if (!string.IsNullOrEmpty(ipAddress))
            {
                info.Location = geoLocator.ObtainLocationForIPAddress(ipAddress);
            }
            info.EventName = GetSafeString(xelement, "eventName");
            info.EntityIdType = GetSafeString(xelement, "entityIdType");
            info.Description = GetSafeString(xelement, "description").Replace("\n\n", "\n\r");
            info.Date = GetSafeDate(xelement, "date");
            info.AdditionalData = GetSafeString(xelement, "additionalData");
            return info;
        }
        catch (Exception ex)
        {
            LogManager.Log.Error(ex);
            return null;
        }
    }

    public static Int32 GetSafeInt32(XElement root, string elementName)
    {
        try
        {
            XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
            return Convert.ToInt32(element.Value);
        }
        catch
        {
            return 0;
        }
    }

    private static DateTime? GetSafeDate(XElement root, string elementName)
    {
        try
        {
            XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
            return DateTime.Parse(element.Value);
        }
        catch
        {
            return null;
        }
    }

    public static String GetSafeString(XElement root, string elementName)
    {
        try
        {
            XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();
            return element.Value;
        }
        catch
        {
            return String.Empty;
        }
    }       
}

这里的 XML 解析代码还利用了另一段实用代码,该代码从 TcpIp 地址获取地理位置信息。这项服务是免费的,但偶尔会漏掉一些 TcpIp 地址。在工作中,我们实际上使用由 MindMap 提供的 Web 服务,它非常可靠,10,000 次查询收费 20 美元,并且是一个简单的 GET http 请求。然而,对于本文的演示代码,我们为您提供了免费但略不可靠的版本,对此我们深表歉意。

话虽如此,TestMessageQueuePublisher 总是从我们知道可以与本演示代码使用的免费地理位置查询服务配合使用的 TcpIp 地址中随机挑选,所以您应该不会有问题。

无论如何,这里是免费(但有点不可靠)的地理位置查询代码。

public class GeoLocator : IGeoLocator
{
    public GeoLocation ObtainLocationForIPAddress(string ipAddress)
    {
        try
        {
            WebClient client = new WebClient();
            string locationDump = client.DownloadString(
                string.Format("http://api.hostip.info/get_html.php?ip={0}&position=true", 
                    ipAddress));
            string[] locationDumpSplit = locationDump.Split(
                new string[] { @"\r\n", @"\n" }, StringSplitOptions.RemoveEmptyEntries);

            decimal latitude = -1;
            decimal longitude = -1;
            int found=0;

            using (StringReader sr = new StringReader(locationDump)) 
            {
                found = 0;
                while (sr.Peek() >= 0) 
                {
                    string line = sr.ReadLine().ToLower();
                    if (line.StartsWith("latitude:"))
                    {
                        line = line.Replace("latitude:","").Trim();
                        latitude = decimal.Parse(line);
                        found++;
                    }
                    if (line.StartsWith("longitude:"))
                    {
                        line = line.Replace("longitude:", "").Trim();
                        longitude = decimal.Parse(line);
                        found++;
                    }
                }
            }

            if (found == 2)
            {
                return new GeoLocation(latitude, longitude);
            }
            else
                return null;
        }
        catch (Exception ex)
        {
            LogManager.Log.ErrorFormat(
                "Could not obtain Latitude/Longitude data for IpAddress {0}\r\n Exception : {1}",
                    ipAddress, ex);
            return null;
        }
    }
}

Web UI

这个谜题的最后一块是一个简单的网站,用于显示实时(或者说近乎实时,经过各层会有轻微的延迟)数据。事实上,这些是实时事件经过的层级,只是为了让您能看到网站在其中的位置:

Msmq -> WCF -> Xml 解析 -> 地理位置查询 -> WCF 回调 -> 网站 -> SignalR -> Javascript -> Google Earth API

路径相当长,不是吗!

不管怎么说,这个网站非常简单,唯一稍微特别的地方是它使用了一个相当新的免费库,叫做 SignalR,我们将在下面更详细地讨论它。本质上,这个网站所做的是在一个标准的 HTML 页面中托管 Google Earth 插件的实例,并通过一些 jQuery Javascript 来操作它。它还利用了 SignalR 库来实现向浏览器推送通知。

SignalR Hub

SignalR 本质上是一个用于 ASP.NET 的异步信令库,用于帮助构建实时的、多用户的交互式 Web 应用程序。它通过一种非常聪明的方式实现这一点。它基本上允许你编写继承自 SignalR Hub 对象的服务器端代码。然后你还可以创建与服务器端 Hub 对象进行通信的 Javascript,反之亦然。

是的,没错,我们可以通过服务器端代码写入一个 Javascript 方法,这真是太疯狂了。

当然,这里面有些魔法,但一旦你弄清楚了其中的原理,它就不那么神奇了,而是相当的聪明。以下是幕后发生的事情:

  • SignalR 会创建一个轻量级的 Javascript 代理,允许 Javascript 与服务器端代码通信。
  • SignalR 还会在您的 hub 中创建动态的“Clients”和“Caller”对象,以便您可以通过服务器端代码直接调用用 Javascript 编写的客户端方法。
  • SignalR 会检查你浏览器的代理能力,并执行以下操作之一:
    • 将首先尝试使用 WebSockets 来允许 Javascript SignalR 代理与服务器端代码通信。
    • 如果 WebSockets 不可用,SignalR 将会退回到使用长轮询来允许 Javascript SignalR 代理与服务器端代码通信。

所有这些都是相当隐藏的,所以感觉有点神奇。

Hub 快速入门示例中有一个很好的 SignalR 快速入门,我们建议您在继续之前阅读它。一旦您阅读了它,您就会理解下面的代码片段。

因此,对于演示项目,我们有以下 SignalR Hub:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using SignalR.Hubs;




namespace Codeproject.EventBroker.WebUI.GeoLocation
{
    [HubName("eventTicker")]
    public class EventTickerHub : Hub
    {
        private int counter;
        private readonly EventTicker eventTicker;

        public EventTickerHub() : this(EventTicker.Instance) { }

        public EventTickerHub(EventTicker eventTicker)
        {
            this.eventTicker = eventTicker;
            eventTicker.Subscribe();
        }

        public void Register()
        {
            //Do nothing, but is crucially important to establish comms
        }
    }
}

其中,自定义的 SignalR Hub 也使用了一个 EventTicker 对象。我们稍后会更多地了解该对象。目前,这就是创建自定义 SignalR Hub 所需要知道的全部内容。

额外材料

Scott Hanselman 在这个链接上有一篇关于 SignalR 的优秀博客,非常值得一读:http://www.hanselman.com/blog/AsynchronousScalableWebApplicationsWithRealtimePersistentLongrunningConnectionsWithSignalR.aspx

CodeProject 自己的 Anoop Madhusudanan 也刚刚抢先我们一步,撰写了第一篇关于 SignalR 的 CodeProject 文章,也值得一读:https://codeproject.org.cn/Articles/322154/ASP-NET-MVC-SIngalR-and-Knockout-based-Real-time-U

Javascript Hub 通信

与自定义 SignalR Hub 的 JavaScript 通信是其余魔法发生的地方,但在我们看那个之前,让我们看看你需要在宿主页面上做什么,在我们的例子中是 HTML 页面(也可能是 ASP/ASPX/CSHTML 等等)。

可以看到,我们在启用了 SignalR 的页面上有以下脚本标签:

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" 
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <script src="../Scripts/jquery-1.6.4.js"></script>
    <script src="../Scripts/jquery.signalR.js"></script>
    <script type="text/javascript" src="https://www.google.com/jsapi?key=ABCDEFG"> </script>
    <script src="../Scripts/jquery.color.js" type="text/javascript"></script>
    <script src="../signalr/hubs"></script>
    <script src="GeoLocationView.js"></script>
</head>
<body>
    <div id="map3d" style="height: 100%; width: 100%;">
    </div>
</body>
</html>

现在如果你查看演示项目的文件夹,你将不会看到 signalr/hubs 文件夹。这就是魔法,你**必须**接受这一点,并知道 SignalR 会在那里放东西。诚然,需要一定程度的信任。

所以,一旦你接受了编码世界里有独角兽、小精灵和精灵的存在,我们现在就可以专注于现实了,那就是如何让 JavaScript 与 SignalR Hub 通信。在演示代码中,如果你检查文件 "GeoLocationView.js",你会看到以下一段 JavaScript 代码,它负责启动与 SignalR Hub 的通信。

var eventHub = $.connection.eventTicker;




//*************************************************
// Initialise SignalR Hub
//*************************************************
function InitialiseSignalRHub() {


    eventHub = $.connection.eventTicker;

    // Declare a function on the eventHub so the server can invoke it
    eventHub.addMessage = function (message) {
        ProcessGeoLocationCallbackMessage(message);
    }

    //callback that does nothing, simply here to establish link to Hub
    eventHub.registerCallback = function () {
    };


    // Start the connection
    $.connection.hub.start();

    //wait for 3s before we register with Hub 
    window.setTimeout(function () {
        eventHub.register();
    }, 3000)
}

还有一个从 SignalR Hub 到 JavaScript 的回调,但我们稍后会看到这个。

订阅 WCF 服务

订阅双工 WCF 服务是一件相当标准的事情,我们只需要做类似这样的事情:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using SignalR.Hubs;
using System.Timers;
using Codeproject.EventBroker.Contracts.Data;
using Codeproject.EventBroker.Contracts.Service;
using System.ServiceModel;
using Codeproject.EventBroker.WebUI.Wcf;
using System.Threading;
using System.ServiceModel.Channels;

namespace Codeproject.EventBroker.WebUI.GeoLocation
{
    public class EventTicker :  IEventBrokerCallback
    {
        private InstanceContext instanceContext;
        private Guid subscriptionId;
        EventBrokerProxy proxy;

        public EventTicker()
        {
            instanceContext = new InstanceContext(this);
            CreateProxy();
        }

        public void CreateProxy()
        {
            proxy = new EventBrokerProxy(instanceContext);
        }

        public void Subscribe()
        {
          ThreadPool.QueueUserWorkItem(x => 
            {
                try
                {
                    subscriptionId = Guid.NewGuid();
                    proxy.Subscribe(subscriptionId, 
            new string[] { "ClientDealEvent", "PaymentOutEvent" });
                    isSubscribed = true;
                }
                catch
                {
                }
            });
        }
    }
}

其中唯一重要的部分是:

  1. 我们使用一个新的 InstanceContext 来为 WCF 服务提供一个回调对象上下文。
  2. 我们**必须**使用一个新的线程来进行订阅。这**非常**重要,因为我们需要保持 ASP.NET 工作线程的空闲,否则回调将完全无法工作。

如果您好奇,这里是网站代码用来与双工 WCF 服务通信的代理代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;

namespace Codeproject.EventBroker.WebUI.Wcf
{
    public partial class EventBrokerProxy : 
        System.ServiceModel.DuplexClientBase<Codeproject.EventBroker.Contracts.Service.IEventBroker>,
        Codeproject.EventBroker.Contracts.Service.IEventBroker
    {
        public EventBrokerProxy(System.ServiceModel.InstanceContext callbackInstance) :
            base(callbackInstance)
        {
        }

        public EventBrokerProxy(System.ServiceModel.InstanceContext callbackInstance, 
            string endpointConfigurationName) :
            base(callbackInstance, endpointConfigurationName)
        {
        }

        public EventBrokerProxy(System.ServiceModel.InstanceContext callbackInstance, 
            string endpointConfigurationName, string remoteAddress) :
            base(callbackInstance, endpointConfigurationName, remoteAddress)
        {
        }

        public EventBrokerProxy(System.ServiceModel.InstanceContext callbackInstance, 
            string endpointConfigurationName, System.ServiceModel.EndpointAddress remoteAddress) :
            base(callbackInstance, endpointConfigurationName, remoteAddress)
        {
        }

        public EventBrokerProxy(System.ServiceModel.InstanceContext callbackInstance, 
            System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress remoteAddress) :
            base(callbackInstance, binding, remoteAddress)
        {
        }

        public void Subscribe(Guid subscriptionId, string[] eventNames)
        {
            base.Channel.Subscribe(subscriptionId, eventNames);
        }

        public void EndSubscription(Guid subscriptionId)
        {
            base.Channel.EndSubscription(subscriptionId);
        }
    }
}

响应回调

在我们看来,这是该解决方案中真正有趣的部分。发生的情况是,当双工 WCF 服务使用订阅者提供的 InstanceContext 调用 EventTicker 时,通过使用 SignalR Hub,我们能够直接调用到一个 Javascript 方法。

以下是相关的 Codeproject.EventBroker.WebUI 服务器端代码(见 EventTicker),这是双工 WCF 回调通过订阅者提供的初始 InstanceContext 调用的内容。

public void ReceiveStreamingResult(RealTimeEventMessage streamingResult)
{
    if (streamingResult.Location != null)
    {
        Hub.GetClients<EventTickerHub>().addMessage(streamingResult);
    }
}

提醒一下,订阅者回调接口如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ServiceModel;
using Codeproject.EventBroker.Contracts.Data;

namespace Codeproject.EventBroker.Contracts.Service
{
    public interface IEventBrokerCallback
    {
        [OperationContract(IsOneWay = true)]
        void ReceiveStreamingResult(RealTimeEventMessage streamingResult);
    }
}

这里是相关的 Codeproject.EventBroker.WebUI 客户端 JavaScript 代码:

//*************************************************
// Initialise SignalR Hub
//*************************************************
function InitialiseSignalRHub() {

    ....
    ....
    ....
    // Declare a function on the eventHub so the server can invoke it
    eventHub.addMessage = function (message) {
        ProcessGeoLocationCallbackMessage(message);
    }
    ....
    ....
    ....
}




//*************************************************
// Process SignalR Hub callback
//*************************************************
function ProcessGeoLocationCallbackMessage(message) {

        //now add the items to the earth
        ShowPosition(message.Location.Latitude, message.Location.Longitude);
        CreateMarker(message.Location.Latitude, message.Location.Longitude, message.Description);
}

请特别注意 JavaScript 函数名,并看看服务器端的 SignalR Hub 代码是如何能够直接调用它,并传递 .NET 对象,然后这些对象被客户端 JavaScript 作为 JSON 接收的。我们认为这相当疯狂。确实相当疯狂,向 SignalR 的开发团队致敬。

正如我们之前所说,SignalR 足够聪明,能够检测您浏览器的能力,并将尝试以下操作:

  1. 首先尝试使用 WebSockets(如果支持的话)
  2. 如果不支持 Web Sockets,则退而求其次使用长轮询

自动取消订阅

做发布/订阅的一个问题是,特定订阅者的通道可能会出现故障,该订阅者及其回调的通道基本上就没用了,但订阅者无法知道这一点。那么我们如何解决这个问题呢?

好吧,如果我们查看 Codeproject.EventBroker.WebUI 项目的 Web.Config 文件,你会看到以下 WCF 配置:

<system.serviceModel>

  <client>
    <endpoint name="Codeproject.EventBroker.Service.EventBroker"
              address="net.tcp://:63747/EventBroker"
              binding="netTcpBinding"
              bindingConfiguration="DuplexBinding"
              contract="Codeproject.EventBroker.Contracts.Service.IEventBroker"/>
  </client>

  <bindings>
    <netTcpBinding>
      <binding name="DuplexBinding"
                sendTimeout="00:00:10"
                receiveTimeout="00:00:10">
        <reliableSession enabled="true"/>
        <security mode="None"/>
      </binding>
    </netTcpBinding>
  </bindings>

</system.serviceModel>

我们看到两个超时值 Send 和 Receive,都设置为 10 分钟。所以我们采取的方法是:从 WCF 绑定中获取 Receive 超时值,然后启动一个计时器,当计时器到期时,我们自动取消对 WCF 双工服务的订阅,然后再次订阅。通过这种方法,如果订阅者的通道出现故障,我们最多只会丢失 Receive 超时值设置的数据量。

EventTicker 最相关的代码如下所示。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using SignalR.Hubs;
using System.Timers;
using Codeproject.EventBroker.Contracts.Data;
using Codeproject.EventBroker.Contracts.Service;
using System.ServiceModel;
using Codeproject.EventBroker.WebUI.Wcf;
using System.Threading;
using System.ServiceModel.Channels;

namespace Codeproject.EventBroker.WebUI.GeoLocation
{
    public class EventTicker :  IEventBrokerCallback
    {
        private InstanceContext instanceContext;
        private Guid subscriptionId;
        private bool isSubscribed;
        private TimeSpan receiveTimeout;
        private System.Timers.Timer subscriberLeaseRenewalTimer;
        EventBrokerProxy proxy;

        public EventTicker()
        {
            instanceContext = new InstanceContext(this);
            CreateProxy();
            Binding binding = proxy.Endpoint.Binding;
            receiveTimeout = binding.ReceiveTimeout;
            subscriberLeaseRenewalTimer = new System.Timers.Timer(receiveTimeout.TotalMilliseconds);
            subscriberLeaseRenewalTimer.Enabled = true;
            subscriberLeaseRenewalTimer.Start();
            subscriberLeaseRenewalTimer.Elapsed += SubscriberLeaseRenewalTimer_Elapsed;    
                
        }

        private void SubscriberLeaseRenewalTimer_Elapsed(object sender, ElapsedEventArgs e)
        {
            subscriberLeaseRenewalTimer.Enabled = false;
            subscriberLeaseRenewalTimer.Stop();
            EndSubscription();
            CreateProxy();
            Subscribe();
            subscriberLeaseRenewalTimer.Enabled = true;
            subscriberLeaseRenewalTimer.Start();
        }

        public void CreateProxy()
        {
            proxy = new EventBrokerProxy(instanceContext);
        }

        public void Subscribe()
        {
            ....
            ....
            ....
            ....
        }

        public void EndSubscription()
        {
            ....
            ....
            ....
            ....
        }
    }
}

Google Earth 集成

Google Earth 的集成都是非常标准的东西,你可以通过阅读各种 Google Earth 文档/API 参考页面来学习。然而,为了完整起见,这里是 Google Earth 集成的代码。

正如我们所说,如果你使用 Google Earth API,这些都是非常标准的东西。基本上,我们在 Codeproject.EventBroker.WebUI 项目的 GeoLocationView.js 文件中使用以下代码。

  1. 初始化 Google Earth 插件
  2. 创建 Google Earth 插件导航控件
  3. Google Earth 插件导航到特定的纬度/经度(这是通过订阅者的 WCF 回调上下文回调给 SignalR Hub 提供的)
  4. 为当前纬度/经度显示一个 Google Earth 插件地标(这是通过订阅者的 WCF 回调上下文回调给 SignalR Hub 提供的)
//************************************************
// Global Vars
//************************************************

google.load('earth', '1');
var ge = null;
var placemark;

//************************************************
// Document Ready
//************************************************
$(function () {
    GlobalInit();
});


//*************************************************
// Global initialisation, hooks up Google Earth
// callback
//*************************************************
function GlobalInit() {
    google.setOnLoadCallback(EarthInit);
}

//*************************************************
// Initialise Googe Earth
//*************************************************
function EarthInit() {

    google.earth.createInstance(
        'map3d',
        function InitialisationPassed(instance) {
            ge = instance;
            console.log("InitialisationPassed " + ge);
            InitialiseSignalRHub();
            ge.getWindow().setVisibility(true);
            CreateNavigationControl();
        },
        function InitialisationFailed(errorCode) {
            console.log("InitialisationFailed " + errorCode);
            alert("there was an error initialising Google Earth\r\n" + errorCode);
        });
}


//*************************************************
// Creates Google Earth Navigation Control
//*************************************************
function CreateNavigationControl() {

    console.log("CreateNavigationControl " + ge);
    var geNavigationControl = ge.getNavigationControl();
    geNavigationControl.setVisibility(true);
    geNavigationControl.setStreetViewEnabled(true);
}

//*************************************************
// Navigates Google Earth To Particular Lat/Long
//*************************************************
function ShowPosition(lat, long) {
    // Get the current view
    var lookAt = ge.getView().copyAsLookAt(ge.ALTITUDE_RELATIVE_TO_GROUND);
    lookAt.setRange(lookAt.getRange() * 0.25);

    // Set new latitude and longitude values
    lookAt.setLatitude(lat);
    lookAt.setLongitude(long);

    // Update the view in Google Earth
    ge.getView().setAbstractView(lookAt);

    // Get the current view
    var camera = ge.getView().copyAsCamera(ge.ALTITUDE_RELATIVE_TO_GROUND);

    // Zoom out to twice the current distance
    camera.setAltitude(5000000);
    camera.setLatitude(lat);
    camera.setLongitude(long);

    // Update the view in Google Earth
    ge.getView().setAbstractView(camera);
}


//*************************************************
// Creates Google Earth Marker for Lat/Long 
// with description
//*************************************************
function CreateMarker(lat, long, desc) {

    //remove last placemark, only want to show 1 at a time
    if (placemark != undefined) {
        ge.getFeatures().removeChild(placemark);
    }

    placemark = ge.createPlacemark('');
    placemark.setName(desc);
    placemark.setDescription("Some cool stuff right here");

    // Define a custom icon.
    var icon = ge.createIcon('');

    var imageUrl = window.location.protocol + "//" + 
    window.location.host + "/content/Images/person.png";
    console.log(imageUrl);

    icon.setHref(imageUrl);
    var style = ge.createStyle(''); //create a new style
    style.getIconStyle().setIcon(icon); //apply the icon to the style
    style.getIconStyle().setScale(1.5);
    style.getLabelStyle().setScale(2.0);    
    placemark.setStyleSelector(style); //apply the style to the placemark

    // Set the placemark's location.  
    var point = ge.createPoint('');
    point.setLatitude(lat);
    point.setLongitude(long);
    placemark.setGeometry(point);

    // Add the placemark to Earth.
    ge.getFeatures().appendChild(placemark);
}

就这些

总之,这就是我们这次真正想说的全部内容。Richard 和我可能在某个时候会给你们带来更多的东西,但我现在要回去完成我和 Pete O'Hanlon 正在做的这个开源项目的最后 5%。问题是那最后的 5% 是最难的部分,但我们俩都投入其中,所以期待它很快就会出现在这里。虽然花了一段时间,但我们俩都喜欢它,并觉得它会有用。所以在那之前……噔噔噔。

然而,如果您喜欢这篇文章,并且愿意花时间发表评论/投票,我们将不胜感激。感谢您的阅读。再见!

使用 MSMQ/Duplex WCF/SignalR/jQuery 将实时结果流式传输到网站 - CodeProject - 代码之家
© . All rights reserved.