65.9K
CodeProject 正在变化。 阅读更多。
Home

WPF 和 WCF 之间的可伸缩异步双工通信

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (12投票s)

2011年9月1日

CPOL

5分钟阅读

viewsIcon

48891

downloadIcon

2016

WPF 和 WCF 之间的可伸缩全双工通信。

引言

本文旨在提供有关如何开发一个可工作的异步(可伸缩)全双工通信 WCF 服务示例的详细信息。

背景

与 JBoss\Jetty\Tomcat 等(使用 Java 的非阻塞 IO 类)相比,IIS 在处理开放连接的可伸缩性方面一直存在问题。

XP、Vista 或 Win7 最多允许 10 个开放连接,这些连接会很快被分配。为绕过此限制,建议使用 Windows 2008 并进行异步调用。要创建面向推送技术的服务,需要一种全双工机制。

但有一个问题,您需要以异步方式调用全双工 WCF 服务。请记住,异步不会占用线程,而全双工会保持通道开放以进行实时更新。

如何使您的全双工调用可伸缩

您需要将全双工 WCF 服务部署到 Windows 2008 服务器上。Windows 2008 服务器包含一个处理异步调用的机制,这样它们就不会占用新的连接(这是关键)。

如果您进行同步调用,一百(左右)个可用连接中的一个将被分配给您的请求;随着时间的推移,这些连接会被占用,您的请求将不得不等待直到连接被释放。

因此,要绕过连接分配,但仍能让服务与您的应用程序保持连接,您必须将您的全双工调用设为异步调用(全双工保持通道开放,异步阻止服务器分配新连接)。

应用程序截图

上图显示了三个打开的应用程序并显示相同的信息。这些数据正由 WCF 服务(使用计时器)推送到所有最初调用了该服务全双工连接的会话。

项目结构

WpfDuplex/project.jpg

上面,您可以看到该解决方案被拆分为多个项目。有一个项目用于托管服务,一个用于客户端,还有一个用于模型类。WpdClient 是启动项目,MainWindow 是启动页面。

代码解释

客户端代码 - MainWindow 类

private void SubscribeToStockServer()
{
    try
    {
        log.Info("Creating services");
        InstanceContext context = new InstanceContext(new CallbackController());
        client = new SessionsControllerClient(context);
        client.SubscribeToNotificationsAsync(SessionGuid.ToString());
        log.Info("Services created");
    }
    catch (Exception ex)
    {                
        log.Error("Services error: " + ex.Message);
    }
}

最初,客户端会调用服务,让服务知道它想要接收更新。这里重要的是名为 `CallbackController` 的类。此类实现了在服务中执行的回调方法(您将在服务中看到一个名为 `IStockSessionCallbackContract` 的接口)。`CallbackController` 代码实现了 `IStockSessionCallbackContract` 接口中的方法。

当客户端订阅全双工服务(以异步方式)时,它会传入一个新的 GUID 对象 - 此 GUID 将用于唯一标识服务器上的静态集合中的会话。

public MainWindow()
{
    try
    {
        InitializeComponent();
        this.InitialiseApplication();
        this.SubscribeToStockServer();
        Messenger.Default.Register<StockQuotes>(this, "ServerUpdateResponse", 
           data => { this.UpdateFromServer(data); }); // create a listener to update UI
    }
    catch (Exception ex)
    {                
        log.Error(ex.Message);
    }
}

这里需要注意的代码行是 MVVMLight Messenger 语句,它会监听来自 `CallbackController` 类的 `ServerUpdateResponse` 令牌键,然后将 `StockQuote` 对象转发给客户端处理方法 `UpdateFromServer`(如下所示)。

private void UpdateFromServer(StockQuotes data)
{
    try
    {
        switch (data.Exchange)
        {
            case StockQuotes.StockExchanges.Cac:
                Dispatcher.Invoke(
                System.Windows.Threading.DispatcherPriority.Normal,
                new Action(
                  delegate()
                  {
                      (this.MyLine.Series[0] as DataPointSeries).ItemsSource = 
                        data.UpdatedStockQuotes;
                  }
              ));
                break;
            case StockQuotes.StockExchanges.Dac:
                Dispatcher.Invoke(
               System.Windows.Threading.DispatcherPriority.Normal,
               new Action(
                 delegate()
                 {
                     (this.MyScatter.Series[0] as DataPointSeries).ItemsSource = 
                       data.UpdatedStockQuotes;
                 }
             ));
                break;
            case StockQuotes.StockExchanges.Dow:
                Dispatcher.Invoke(
               System.Windows.Threading.DispatcherPriority.Normal,
               new Action(
                 delegate()
                 {
                     (this.MyColumn.Series[0] as DataPointSeries).ItemsSource = 
                       data.UpdatedStockQuotes;
                 }
             ));

                break;
            case StockQuotes.StockExchanges.Ftse:
                Dispatcher.Invoke(
              System.Windows.Threading.DispatcherPriority.Normal,
              new Action(
                delegate()
                {
                    (this.MyPie.Series[0] as DataPointSeries).ItemsSource = 
                      data.UpdatedStockQuotes;
                }
            ));
                break;
        }
        this.UpdateNoticeBox(data);
    }
    catch (Exception ex)
    {
        log.Error(ex.Message);
    }
}

UpdateFromServer 方法简单地将新的 StockQuotes List 对象重新绑定到相应的图表(基于 Exchange 属性的值)。

客户端代码 - CallBackController 类

namespace WpfClient.Model
{
    class CallbackController : SessionService.ISessionsControllerCallback
    {
        public void SendNotificationToClients(StockQuotes message)
        {
            Messenger.Default.Send<StockQuotes>(message, "ServerUpdateResponse");
            // move onto the mian thread to perform a UI update
        }

        /*
         * The 'Begin' and 'End' methods associated with 
         * the 'SendNotificationToClients' are here for compiler declaration purposes only.
         * They have no functional purpose, since a channel has been opened
         * and the callback method (declared in the interface 'IGuitarSessionCallbackContract') 
         * will be used to communicate with the client not
         * the 'EndSendNotificationToClients'. They are here for the Async reason only.
         */
        public IAsyncResult BeginSendNotificationToClients(StockQuotes message, 
               AsyncCallback callback, object asyncState) { return null; }
        public void EndSendNotificationToClients(IAsyncResult result){ }
    }
}

在这里,项目告知编译器我们已经声明了与异步调用相关的“Begin”和“End”方法。但是,我们不会使用它们,因为服务将使用全双工回调方法将信息发送到客户端 - 而不是上面的“End”方法。出于编译器原因,我们必须声明它们。注意 `SendNotificationToClients` 方法,它在服务器上的 `IStockSessionCallbackContract` 接口中声明,但在客户端中编写。此外,这里也发生了 MVVMLight Messenger 发送。

添加服务

WpfDuplex/Adding_Service.jpg

请注意,您需要选中“生成异步操作”,以便调用是异步的,并且不会占用连接。

StockQuote 类

namespace Stocks
{
    /// <summary>
    /// 
    /// </summary>
    public class StockQuotes
    {
        /// <summary>
        /// 
        /// </summary>
        private StockExchanges exchange;

        /// <summary>
        /// Gets or sets the exchange.
        /// </summary>
        /// <value>
        /// The exchange.
        /// </value>
        public StockExchanges Exchange
        {
            get { return exchange; }
            set { exchange = value; }
        }

        /// <summary>
        /// 
        /// </summary>
        List<StockValue> updatedStockQuotes;

        /// <summary>
        /// Gets or sets the updated stock quotes.
        /// </summary>
        /// <value>
        /// The updated stock quotes.
        /// </value>
        public List<StockValue> UpdatedStockQuotes
        {
            get { return updatedStockQuotes; }
            set { updatedStockQuotes = value; }
        }

        /// <summary>
        /// 
        /// </summary>
        public enum StockExchanges
        {
            Ftse,
            Dow,
            Dac,
            Cac
        }
    }

    /// <summary>
    /// 
    /// </summary>
    public class StockValue
    {
        /// <summary>
        /// Initializes a new instance of the <see cref="StockValue"/> class.
        /// </summary>
        public StockValue() { }
        /// <summary>
        /// Initializes a new instance of the <see cref="StockValue"/> class.
        /// </summary>
        /// <param name="stockName">Name of the stock.</param>
        /// <param name="stockValue">The stock value.</param>
        public StockValue(string stockName, int stockValue)
        {
            Symbol = stockName;
            Value = stockValue;
        }
        /// <summary>
        /// Gets or sets the symbol.
        /// </summary>
        /// <value>
        /// The symbol.
        /// </value>
        public string Symbol { get; set; }
        /// <summary>
        /// Gets or sets the value.
        /// </summary>
        /// <value>
        /// The value.
        /// </value>
        public int Value { get; set; }
    }

上面的类只是一个普通的 C# 类,用于包含股票报价。此类由服务器传递给客户端,并在客户端代码中进行处理,以绑定到图表。

WCF 服务代码

Session Helper 类

static public class SessionHelper
{
    static private string browserValue;
    static Dictionary<string, IStockSessionCallbackContract> clients;  

    /// <summary>
    /// Class constructor.
    /// </summary>
    static SessionHelper()
    {
        clients = new Dictionary<string, IStockSessionCallbackContract>();
    }

    /// <summary>
    /// Gets or sets the browser value.
    /// </summary>
    /// <value>The browser value.</value>
    public static string BrowserValue
    {
        get { return browserValue; }
        set { browserValue = value; }
    }      

    /// <summary>
    /// Stores given client callback channel
    /// </summary>
    /// <param name="clientId">Id of client's session</param>
    /// <param name="callbackChannel">Callback channel for given client</param>
    public static void AddCallbackChannel(string clientId, 
           IStockSessionCallbackContract callbackChannel)
    {
        if (clients.Where(c => c.Key == clientId).Count() == 0)
        {
            object syncRoot = new Object();
            lock (syncRoot)
            {
                if (clients.Where(c => c.Key == clientId).Count() == 0)
                    clients.Add(clientId, callbackChannel);
            }
        }
    }      

    /// <summary>
    /// Checks whether given client is connected.
    /// </summary>
    /// <param name="clientId">Id of client's session</param>
    /// <returns>Whether client is connected</returns>
    public static bool IsClientConnected(string clientId)
    {            
        return clients.Where(c => c.Key == clientId).Count() > 0;
    }

    /// <summary>
    /// Returns all available callback channels except sender
    /// </summary>
    /// <returns>Callback channels</returns>
    public static IEnumerable<IStockSessionCallbackContract> 
                  GetCallbackChannels(string sessionId)
    {

        var filteredChannels = from channel in clients
                               where channel.Key != sessionId
                               select channel; 
        
        return filteredChannels.Select(c => c.Value);
    }

    /// <summary>
    /// Returns all available callback channels
    /// </summary>
    /// <returns>Callback channels</returns>
    public static IEnumerable<IStockSessionCallbackContract> GetCallbackChannels()
    {            
        return clients.Select(c => c.Value);
    }

    /// <summary>
    /// Returns callback channel for given client
    /// </summary>
    /// <param name="clientId">Id of client's session</param>
    /// <returns>Callback channel for given client</returns>
    public static IStockSessionCallbackContract GetCallbackChannel(string clientId)
    {            
        return clients.Where(c => c.Key == clientId).Single().Value;
    }
  
    /// <summary>
    /// Deletes callback channel for given client 
    /// </summary>
    /// <param name="clientId">Id of client's session</param>
    public static bool DeleteClient(string clientId)
    {
        if (clients.Where(c => c.Key == clientId).Count() > 0)
        {
            object syncRoot = new Object();
            lock (syncRoot)
            {
                if (clients.Where(c => c.Key == clientId).Count() > 0)
                    return clients.Remove(clientId);
            }
        }
        return false;
    }
}

上面的静态类将维护所有会话,对实际会话执行 CRUD 操作。您会注意到它在字典集合中使用 `IStockSessionCallbackContract` 接口,因为该接口包含了回调方法的声明(在客户端中编写)。

ISessionsController 服务接口

namespace StockService.Interfaces
{
    /// <summary>
    /// This interface contains methods that client
    /// can call to subscribe or unsubscrube to notifications from database
    /// </summary>
    [ServiceContract(Namespace = "StockService.Services.Interfaces", 
           CallbackContract = typeof(IStockSessionCallbackContract))]
    public interface ISessionsController
    {
        /// <summary>
        /// This method should be executed to subscribe to notifications from server.
        /// </summary>
        [OperationContract(IsOneWay = true)]
        void SubscribeToNotifications(string browserName);

        /// <summary>
        /// This method should be executed to unsubscribe to notifications from server.
        /// </summary>
        [OperationContract(IsOneWay = true)]
        void UnsubscribeToNotifications();
    }
}

以上操作允许客户端连接和断开会话集合,从而断开与服务器的任何通信。

IStockSessionCallbackContract 服务接口

namespace StockService.Interfaces
{
    /// <summary>
    /// Interface declarations for the notification callback to browser
    /// </summary>
    [ServiceContract]
    public interface IStockSessionCallbackContract
    {
        /// <summary>
        /// Sends the notification to clients.
        /// </summary>
        /// <param name="data">The data.</param>
        [OperationContract(IsOneWay = true)]
        void SendNotificationToClients(StockQuotes data);
    }
}

上述接口签名是在服务器上声明的,但由于全双工编码的回调性质,它是在客户端中编写的。在 `ISessionsController` 接口中,您会看到注解 `[ServiceContract(Namespace = "StockService.Services.Interfaces", CallbackContract = typeof(IStockSessionCallbackContract))]`,并且您会看到 `CallbackContract` 属性,它指定了 `IStockSessionCallbackContract` 接口。这就是我们如何完成通信的闭环。服务方法知道调用在客户端中编写和执行的特定接口方法。

SessionsController 服务方法

public void SubscribeToNotifications(string clientGuid)
{
    IStockSessionCallbackContract ch = 
      OperationContext.Current.GetCallbackChannel<IStockSessionCallbackContract>();
    // string sessionId = OperationContext.Current.Channel.SessionId;

    //Any message from a client we haven't seen before
    //causes the new client to be added to our list
    lock (syncRoot)
    {
        SessionHelper.BrowserValue = clientGuid;

        if (!SessionHelper.IsClientConnected(clientGuid))
        {
            SessionHelper.AddCallbackChannel(clientGuid, ch);
            OperationContext.Current.Channel.Closing += new EventHandler(Channel_Closing);
            OperationContext.Current.Channel.Faulted += new EventHandler(Channel_Faulted);
        }
    }
}

当客户端发出初始异步调用到服务时,它将进入 `SubscribeToNotifications` 方法(传递一个 GUID)。然后,获取其回调合同接口 `IStockSessionCallbackContract` 并将其存储在一个集合中。它还将关联关闭事件处理程序,以便在完成后执行任何维护过程。

private void ProcessUpdate(object sender, EventArgs e)
{
    try
    {
        StockQuotes data = GenerateStockQuotes();
        
        // loop through channels and make a call to their callback method
        if (SessionHelper.GetCallbackChannels().Count() > 0)
        {
            lock (syncRoot)
            {
                IEnumerable<IStockSessionCallbackContract> 
                       allChannels = SessionHelper.GetCallbackChannels();
                allChannels.ToList().ForEach(c => c.SendNotificationToClients(data));
            }
        }
    }
    catch (Exception) { }
}

上面的方法将执行将信息发布到所有客户端的操作。

public SessionsController()
{
    timer.Elapsed += new ElapsedEventHandler(this.ProcessUpdate);
    timer.Interval = 2000;
    timer.Enabled = true;
    timer.Start();
}

服务中会执行一个简单的计时器来模拟数据的生成(例如,数据更新)。每两秒钟,都会调用 `ProcessUpdate` 方法,并将新的股票报价推送到所有已连接的会话。

改进

  • 在客户端序列化数据并在发送到服务之前进行压缩。服务反过来将处理(压缩后的)数据并发送到开放的通道。
© . All rights reserved.