65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 SignalR 2 进行广播和通知,并结合自托管 Windows 服务

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.86/5 (21投票s)

2015年4月2日

Apache

5分钟阅读

viewsIcon

96861

downloadIcon

550

本文介绍如何使用 SignalR 在自托管的 Windows 服务上进行广播和通知。

引言

本文是系列文章的第二部分,介绍了如何使用 SignalR 在自托管的 Windows 服务上进行广播和通知。请参阅我之前关于 SignalR 与 Web 应用程序(第一部分)的提示 此处,其中也包含有价值的入门信息,以及关于 SignalR 与服务器广播 ASP 的文章 此处

要使用 SignalR 2,您的应用程序必须使用 .NET Framework 4.5。如果您使用 .NET Framework 4.0,则必须使用 SignalR 1.x。

为了说清楚,上一篇文章中的 SignalR 应用程序允许点对点通信,而这个 Windows 服务应用程序将允许广播通知。

SignalR 在 Web 应用程序环境中 certainly 非常有用,但如果您在工业环境中需要在 Windows 桌面实现实时通信,它也同样非常有用。

您可以从 此处 下载示例项目。

创建服务

首先,在 Visual Studio 中创建一个 Windows 服务,确保您以管理员权限运行,并且您的项目使用 .NET 4.5 或更高版本。服务创建后,将其重命名为 CurrencyExchangeService

然后,在程序包管理器控制台中输入此命令,确保默认项目是您的服务项目。

      PM> Install-Package Microsoft.AspNet.SignalR.SelfHost
      PM> Install-Package ServiceProcess.Helpers
      PM> Install-Package Microsoft.Owin.Cors

后者对于跨域支持是必需的,适用于应用程序在不同域中托管 SignalR 和网页的情况——在本例中,SignalR 服务器和客户端将位于不同的端口上。

确保您的Program.cs 包含以下代码,这允许您在 Visual Studio 中调试服务,或者在安装后像普通服务一样运行它。

using ServiceProcess.Helpers;
using System;
using System.Collections.Generic;
using System.Data;
using System.ServiceProcess;

namespace SignalRBroadcastServiceSample
{
    static class Program
    {
        private static readonly List<ServiceBase> _servicesToRun = new List<ServiceBase>();

        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        static void Main()
        {
            _servicesToRun.Add(CurrencyExchangeService.Instance);

            if (Environment.UserInteractive)
            {
                _servicesToRun.ToArray().LoadServices();
            }
            else
            {
                ServiceBase.Run(_servicesToRun.ToArray());
            }
        }
    }
}

注册 SignalR 中间件

将以下类添加到您的服务项目中。

using System;
using System.Threading.Tasks;
using Microsoft.Owin;
using Owin;

[assembly: OwinStartup(typeof(SignalRBroadcastServiceSample.Startup))]

namespace SignalRBroadcastServiceSample
{
    public class Startup
    {
        public void Configuration(IAppBuilder app)
        {
            // Add configuration code or hub wire up here if needed
            app.MapSignalR();
        }
    }
}

上面的 MapSignalR 代码定义了客户端用于连接到您的 Hub 的路由。

客户端用于连接到您的 Hub 的默认路由 URL 是“/signalr”。假设您的项目中有一个名为 signalr 的文件夹,因此您不想使用此 URL,那么您可以通过在服务器上执行此操作来创建自定义 URL。

 	app.MapSignalR("/mycustomurl", new HubConfiguration()); 	

然后,您将使用此命令在客户端上指定自定义 URL。

 	var hubConnection = new HubConnection("http://contoso.com/mycustomurl", useDefaultUrl: false); 	

将 SignalR 代码添加到服务

将此 Currency.cs 类添加到单独的库项目中。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SignalrDomain
{
    public class Currency
    {
        private decimal _usdValue;
        public string CurrencySign { get; set; }
        public decimal Open { get; private set; }
        public decimal Low { get; private set; }
        public decimal High { get; private set; }
        public decimal LastChange { get; private set; }

        public decimal RateChange
        {
            get
            {
                return USDValue - Open;
            }
        }

        public double PercentChange
        {
            get
            {
                return (double)Math.Round(RateChange / USDValue, 4);
            }
        }

        public decimal USDValue
        {
            get
            {
                return _usdValue;
            }
            set
            {
                if (_usdValue == value)
                {
                    return;
                }

                LastChange = value - _usdValue;
                _usdValue = value;

                if (Open == 0)
                {
                    Open = _usdValue;
                }
                if (_usdValue < Low || Low == 0)
                {
                    Low = _usdValue;
                }
                if (_usdValue > High)
                {
                    High = _usdValue;
                }
            }
        }
    }
}

Hub 对象

Hub 对象由 SignalR Hubs 管道为您实例化,因此您无需在服务器上从自己的代码中实例化 Hub 类或调用其方法。

Hub 类实例是瞬时的,因此您不能使用它们来维护从一个方法调用到另一个方法的状态。如果需要,您可以在数据库、static 变量或另一个类中维护状态。

如果您想将消息广播到特定的命名组,那么这些命名组将在您的 Hub 类中定义。

您的 Hub 类中的 public 方法可以由客户端调用。

您可以在应用程序中定义多个 Hub 类,其中连接是共享的。另一方面,组对于每个 Hub 类都是分开的,并且应该在 Hub 中定义。

如果您想使用与您的 Hub 类名称不同的 Hub 名称,则使用此属性。

[HubName("PascalCaseMyChatHub")]

将此 CurrencyExchangeHub.cs 文件添加到您的服务项目中,其中 public 方法是您的客户端可以调用的方法。数据使用 JSON 在服务器和客户端之间进行通信,SignalR 会自动处理复杂对象的绑定。

CurrencyExchangeHub 类继承自 SignalR Hub 类,将处理接收来自客户端的连接和方法调用。

using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Microsoft.Owin;
using SignalrDomain;
using System;
using System.Collections.Generic;
using System.Linq;

namespace SignalRBroadcastServiceSample
{
    public class CurrencyExchangeHub : Hub
    {
        private readonly CurrencyExchangeService _currencyExchangeHub;

        public CurrencyExchangeHub() :
            this(CurrencyExchangeService.Instance)
        {

        }

        public CurrencyExchangeHub(CurrencyExchangeService currencyExchange)
        {
            _currencyExchangeHub = currencyExchange;
        }

        public IEnumerable<Currency> GetAllCurrencies()
        {
            return _currencyExchangeHub.GetAllCurrencies();
        }

        public string GetMarketState()
        {
            return _currencyExchangeHub.MarketState.ToString();
        }

        public bool OpenMarket()
        {
            _currencyExchangeHub.OpenMarket();
            return true;
        }

        public bool CloseMarket()
        {
            _currencyExchangeHub.CloseMarket();
            return true;
        }

        public bool Reset()
        {
            _currencyExchangeHub.Reset();
            return true;
        }
    }
}

请注意,如果您预计某些调用需要相当长的时间才能完成,那么您可以执行异步调用,以确保应用程序保持响应。

        public async IEnumerable<Currency> GetAllCurrencies()
        {  
            IEnumerable<Currency> currencies = new IEnumerable<Currency>();
            Task loadCurrenciesTask = Task.Factory.StartNew(() => LoadCurrencies(currencies));
            await loadCurrenciesTask;
            
            return currencies;
        }
        
        private static void LoadCurrencies(IEnumerable<Currency> currencies)
        {
	       currencies = _currencyExchangeHub.GetAllCurrencies();
        }

将此添加到您的 CurrencyExchangeService.cs 文件中。

using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using Microsoft.Owin.Hosting;
using SignalrDomain;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ServiceProcess;
using System.Threading;

namespace SignalRBroadcastServiceSample
{
    public partial class CurrencyExchangeService : ServiceBase
    {
        private Thread mainThread;
        private bool isRunning = true;
        private Random random = new Random();

        protected override void OnStart(string[] args)
        {
            WebApp.Start("https://:8083"); // Must be 
            	//@"http://+:8083" if you want to connect from other computers
            LoadDefaultCurrencies();

            // Start main thread
            mainThread = new Thread(new ParameterizedThreadStart(this.RunService));
            mainThread.Start(DateTime.MaxValue);
        }

        protected override void OnStop()
        {
            mainThread.Join();
        }

        public void RunService(object timeToComplete)
        {
            DateTime dtTimeToComplete = timeToComplete != null ? 
            	Convert.ToDateTime(timeToComplete) : DateTime.MaxValue;

            while (isRunning && DateTime.UtcNow < dtTimeToComplete)
            {
                Thread.Sleep(15000);
                NotifyAllClients();
            }
        }

        // This line is necessary to perform the broadcasting to all clients
        private void NotifyAllClients()
        {
            Currency currency = new Currency();
            currency.CurrencySign = "CAD";
            currency.USDValue = random.Next();
            BroadcastCurrencyRate(currency);
            Clients.All.NotifyChange(currency);
        }

        #region "SignalR code"

        // Singleton instance
        private readonly static Lazy<CurrencyExchangeService> 
        	_instance = new Lazy<CurrencyExchangeService>(
            () => new CurrencyExchangeService
            (GlobalHost.ConnectionManager.GetHubContext<CurrencyExchangeHub>().Clients));

        private readonly object _marketStateLock = new object();
        private readonly object _updateCurrencyRatesLock = new object();

        private readonly ConcurrentDictionary<string, 
        	Currency> _currencies = new ConcurrentDictionary<string, Currency>();

        // Currency can go up or down by a percentage of this factor on each change
        private readonly double _rangePercent = 0.002;

        private readonly TimeSpan _updateInterval = TimeSpan.FromMilliseconds(250);

        public TimeSpan UpdateInterval
        {
            get { return _updateInterval; }
        } 

        private readonly Random _updateOrNotRandom = new Random();

        private Timer _timer;
        private volatile bool _updatingCurrencyRates;
        private volatile MarketState _marketState;

        public CurrencyExchangeService(IHubConnectionContext<dynamic> clients)
        {
            InitializeComponent();

            Clients = clients;
        }

        public static CurrencyExchangeService Instance
        {
            get
            {
                return _instance.Value;
            }
        }

        private IHubConnectionContext<dynamic> Clients
        {
            get;
            set;
        }

        public MarketState MarketState
        {
            get { return _marketState; }
            private set { _marketState = value; }
        }

        public IEnumerable<Currency> GetAllCurrencies()
        {
            return _currencies.Values;
        }

        public bool OpenMarket()
        {
            bool returnCode = false;

            lock (_marketStateLock)
            {
                if (MarketState != MarketState.Open)
                {
                    _timer = new Timer(UpdateCurrencyRates, null, _updateInterval, _updateInterval);

                    MarketState = MarketState.Open;

                    BroadcastMarketStateChange(MarketState.Open);
                }
            }
            returnCode = true;

            return returnCode;
        }

        public bool CloseMarket()
        {
            bool returnCode = false;

            lock (_marketStateLock)
            {
                if (MarketState == MarketState.Open)
                {
                    if (_timer != null)
                    {
                        _timer.Dispose();
                    }

                    MarketState = MarketState.Closed;

                    BroadcastMarketStateChange(MarketState.Closed);
                }
            }
            returnCode = true;

            return returnCode;
        }

        public bool Reset()
        {
            bool returnCode = false;

            lock (_marketStateLock)
            {
                if (MarketState != MarketState.Closed)
                {
                    throw new InvalidOperationException
                    	("Market must be closed before it can be reset.");
                }
                
                LoadDefaultCurrencies();
                BroadcastMarketReset();
            }
            returnCode = true;

            return returnCode;
        }

        private void LoadDefaultCurrencies()
        {
            _currencies.Clear();

            var currencies = new List<Currency>
            {
                new Currency { CurrencySign = "USD", USDValue = 1.00m },
                new Currency { CurrencySign = "CAD", USDValue = 0.85m },
                new Currency { CurrencySign = "EUR", USDValue = 1.25m }
            };

            currencies.ForEach(currency => _currencies.TryAdd(currency.CurrencySign, currency));
        }

        private void UpdateCurrencyRates(object state)
        {
            // This function must be re-entrant as it's running as a timer interval handler
            lock (_updateCurrencyRatesLock)
            {
                if (!_updatingCurrencyRates)
                {
                    _updatingCurrencyRates = true;

                    foreach (var currency in _currencies.Values)
                    {
                        if (TryUpdateCurrencyRate(currency))
                        {
                            BroadcastCurrencyRate(currency);
                        }
                    }

                    _updatingCurrencyRates = false;
                }
            }
        }

        private bool TryUpdateCurrencyRate(Currency currency)
        {
            // Randomly choose whether to update this currency or not
            var r = _updateOrNotRandom.NextDouble();
            if (r > 0.1)
            {
                return false;
            }

            // Update the currency price by a random factor of the range percent
            var random = new Random((int)Math.Floor(currency.USDValue));
            var percentChange = random.NextDouble() * _rangePercent;
            var pos = random.NextDouble() > 0.51;
            var change = Math.Round(currency.USDValue * (decimal)percentChange, 2);
            change = pos ? change : -change;

            currency.USDValue += change;
            return true;
        }

        private void BroadcastMarketStateChange(MarketState marketState)
        {
            switch (marketState)
            {
                case MarketState.Open:
                    Clients.All.marketOpened();
                    break;
                case MarketState.Closed:
                    Clients.All.marketClosed();
                    break;
                default:
                    break;
            }
        }

        private void BroadcastMarketReset()
        {
            Clients.All.marketReset();
        }

        private void BroadcastCurrencyRate(Currency currency)
        {
            Clients.All.updateCurrencyRate(currency);
        }
    }

    public enum MarketState
    {
        Closed,
        Open
    }

    #endregion
}

Clients.All 表示广播给所有客户端。您也可以通过调用 Clients.AllExcept(connectionId1, connectionId2).updateCurrencyRate(currency) 来向除指定连接 ID 之外的所有人发送消息。要了解如何指定哪些客户端或客户端组,请参阅 此处此处

如果您希望允许 SignalR 客户端从其他计算机连接,请将 WebApp.Start 中传递的 URL 中的 localhost 更改为 +。

接下来,向您的解决方案添加一个单元测试库,首先从程序包管理器控制台添加 SignalRPackage

      PM> Install-Package Microsoft.AspNet.SignalR.SelfHost      
      PM> Install-Package  Microsoft.AspNet.SignalR.Client      

现在添加以下代码。

using System;
using System.ServiceProcess;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using SignalRBroadcastServiceSample;

namespace UnitTests
{
    [TestClass]
    public class TestCurrencyExchangeService
    {
        #region Additional test attributes
        // 
        //You can use the following additional attributes as you write your 
        //tests:
        //Use ClassInitialize to run code before running the first test in the
        //class
        [ClassInitialize()]
        public static void MyClassInitialize(TestContext testContext)
        {
        }

        //Use ClassCleanup to run code after all tests in a class have run
        [ClassCleanup()]
        public static void MyClassCleanup()
        {
        }

        //Use TestInitialize to run code before running each test
        [TestInitialize()]
        public void MyTestInitialize()
        {

        }

        //Use TestCleanup to run code after each test has run
        [TestCleanup()]
        public void MyTestCleanup()
        {

        }

        #endregion

        [TestMethod]
        public void TestClientGetMarketStateFromHub()
        {
            // Make sure to call WebApp.Start:
            PrivateObject privateObject = new PrivateObject(_service);
            privateObject.Invoke("OnStart", new object[] { null });

            // Create client proxy and call hub method
            using (HubConnection hub = new HubConnection(String.Format("http://{0}:8084", "localhost")))
            {
                IHubProxy proxy = hub.CreateHubProxy("CurrencyExchangeHub");
                hub.Start().Wait();

                var state = proxy.Invoke("GetMarketState").Result;
                Assert.IsNotNull(state);
                Assert.IsTrue(state.Length > 0);
            }
        }

        [TestMethod]
        public void TestClientGetAllCurrenciesFromHub()
        {
            // Make sure to call WebApp.Start:
            PrivateObject privateObject = new PrivateObject(_service);
            privateObject.Invoke("OnStart", new object[] { null });

            // Create client proxy and call hub method
            using (HubConnection hub = new HubConnection(String.Format("http://{0}:8084", "localhost")))
            {
                IHubProxy proxy = hub.CreateHubProxy("CurrencyExchangeHub");
                hub.Start().Wait();

                var currencies = proxy.Invoke>("GetAllCurrencies").Result;
                Assert.IsNotNull(currencies);
                Assert.IsTrue(currencies.ToString().Length > 0);
            }
        }

        [TestMethod]
        public void TestClientOpenCloseMarketFromHub()
        {
            // Make sure to call WebApp.Start:
            PrivateObject privateObject = new PrivateObject(_service);
            privateObject.Invoke("OnStart", new object[] { null });

            // Create client proxy and call hub method
            using (HubConnection hub = new HubConnection(String.Format("http://{0}:8084", "localhost")))
            {
                IHubProxy proxy = hub.CreateHubProxy("CurrencyExchangeHub");
                hub.Start().Wait();

                var state = proxy.Invoke("OpenMarket").Result;
                Assert.IsNotNull(state);
                Assert.IsTrue(state == true);

                state = proxy.Invoke("CloseMarket").Result;
                Assert.IsNotNull(state);
                Assert.IsTrue(state == true);
            }
        }

        [TestMethod]
        public void TestGetMarketStateFromHub()
        {
            CurrencyExchangeHub hub = new CurrencyExchangeHub(CurrencyExchangeService.Instance);
            var state = hub.GetMarketState();
            Assert.IsNotNull(state);
        }

        [TestMethod]
        public void TestOpenCloseMarket()
        {
            var currencies = CurrencyExchangeService.Instance.GetAllCurrencies();
            Assert.IsNotNull(currencies);
            bool expected = true;
            bool actual = CurrencyExchangeService.Instance.OpenMarket();
            Assert.AreEqual(expected, actual);
            actual = CurrencyExchangeService.Instance.OpenMarket();
            Assert.AreEqual(expected, actual);
        }

        [TestMethod]
        public void TestOpenCloseMarketFromHub()
        {
            var hub = new CurrencyExchangeHub(CurrencyExchangeService.Instance);
            var currencies = hub.GetAllCurrencies();
            Assert.IsNotNull(currencies);
            bool expected = true;
            bool actual = hub.OpenMarket();
            Assert.AreEqual(expected, actual);
            actual = hub.OpenMarket();
            Assert.AreEqual(expected, actual);
        }
    }
}

如果构建并运行上述测试,它们应该会通过。

接下来,向您的解决方案添加一个控制台项目,并将其命名为 Client。然后打开 NuGet 程序包管理器控制台并键入此命令。

     PM> Install-Package  Microsoft.AspNet.SignalR.Client

现在将以下 CommunicationHandler 类添加到您的项目中。

using Microsoft.AspNet.SignalR.Client;
using SignalrDomain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Client
{
    public static class CommunicationHandler
    {
        public static string ExecuteMethod(string method, string args, string serverUri, string hubName)
        {
            var hubConnection = new HubConnection("https://:8083");
            IHubProxy currencyExchangeHubProxy = hubConnection.CreateHubProxy("CurrencyExchangeHub");

            // This line is necessary to subscribe for broadcasting messages
            currencyExchangeHubProxy.On<Currency>("NotifyChange", HandleNotify);

            // Start the connection
            hubConnection.Start().Wait();

            var result = currencyExchangeHubProxy.Invoke<string>(method).Result;

            return result;
        }

        private static void HandleNotify(Currency currency)
        {
            Console.WriteLine("Currency " + currency.CurrencySign + ", Rate = " + currency.USDValue);
        }
    }
}

另外,更新您的控制台项目中的 Program 类,如下所示。

using System;
using System.Diagnostics;
using System.Net;

namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var state = CommunicationHandler.ExecuteMethod("GetMarketState", 
            	"", IPAddress.Any.ToString(), "CurrencyExchangeHub");
            Console.WriteLine("Market State is " + state);

            if (state == "Closed")
            {
                var returnCode = CommunicationHandler.ExecuteMethod
                	("OpenMarket", "", IPAddress.Any.ToString(), "CurrencyExchangeHub");
                Debug.Assert(returnCode == "True");
                Console.WriteLine("Market State is Open");
            }

            Console.ReadLine();
        }
    }
}

要查看应用程序运行情况,首先在一个 Visual Studio 实例中将 SignalRBroadcastServiceSample 项目设置为启动项目,然后按下出现的运行按钮。然后打开另一个 Visual Studio 实例,这次将 Client 项目设置为启动项目。现在按 F5 测试应用程序。每十五秒,控制台将收到一次广播的货币汇率更新。

历史

  • 2015.04.02:添加了关于在 Hub 中使用异步 Task 的详细信息。

  • 2015.04.14:添加了更多单元测试以验证客户端代理。

致谢

请注意,本文的许多想法都来源于 Patrick Fletcher 的文章 此处 以及 Tom Dykstra 和 Tom FitzMacken 的文章 此处

参考文献

© . All rights reserved.