65.9K
CodeProject 正在变化。 阅读更多。
Home

构建集成原则驱动的通知引擎

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2017年10月24日

CPOL

5分钟阅读

viewsIcon

11120

关于如何通过集成应用程序构建通知引擎的演练。

在我为一家运输公司工作时,我需要让许多 Web 应用程序之间能够双向发送通知。正如任何人所期望的那样,我开始在网上搜索一个好的工具或任何解释如何做到这一点的文章。令我惊讶的是,我什么也没找到,考虑到我的需求非常简单且常见。有几个博客确实提供了一些指导和工具,但没有一个真正完整且可运行。所以我决定自己构建。不幸的是,我不能与你分享我最终的产品,因为它属于我的客户,正如我的合同所规定。但我将尝试通过我走过的一些步骤让你做好准备……

正如标题所示,我所说的通知引擎是指一种允许应用程序之间发送数据的软件。这实际上是集成。所以我要做的是构建集成。因此,最初需求的范围正在扩大,并且对于我当时“意大利面条式”处理的许多其他事情来说,变得更加有用。所以,要进行集成,你需要知道谁将发送什么给谁,以及谁将监听谁。这可以通过绘制应用程序数据流的草图来完成。这张草图帮助我指出了我们许多企业应用程序之间的意大利面条式通信。

考虑到大局,我决定专注于并开始实现通知的发送。这时消息队列代理就派上用场了。消息队列代理,一个通常被称为消息总线(即使存在区别)的工具,是一种让应用程序发送消息而其他应用程序可以监听传入消息的技术。许多产品有助于实现这一点,并且值得花时间去研究它们。我最初选择的产品有 MSMQ、ActiveMQ、ZeroMQ、RabbitMQ。我有很多标准帮助我选择了 RabbitMQ,我认为它是一款出色的开源免费产品。

一旦选择了消息队列代理工具的步骤完成,又出现了一个问题,导致了另一个决定:如何将我的应用程序连接到 RabbitMQ。当然 RabbitMQ 有一个 .net 客户端,但使用它意味着必须为不同的目的多次编写相同的代码。所以我再次上网搜索,并选择了一些好工具:MassTransit、EasynetQ 等。我脑海中有很多标准,但最突出的简单性和文档是 EasynetQ。Masstransit 也是 .net 世界中一个不错的选择。我开始使用 EasynetQ 来实现用于通知目的的发布-订阅设计模式,但很快就对这种模式的实现方式感到失望。所以我又回到了 RabbitMQ 提供的 .net 客户端,我一点也不后悔。因为这个出色而简单的 .net 客户端在实现过程中给了我控制权和更多的自由。是的,我不得不做一些抽象来避免冗余,但一旦正确完成,你最终将对幕后发生的一切拥有细粒度的控制。

一旦选定了正确的工具,我就在我的机器上安装了它们,然后把它们放在一边。我开始写下和绘制我的用例。这一重要步骤有助于弄清楚使用 RabbitMQ 的正确配置和方法。它阐明并定义了与队列通信的模式:主题(topic)、扇出(fanout)、直连(direct)、头(headers)等。我应该使用虚拟主机吗?在什么情况下我需要持久化和过期?失败和死信怎么办?错误、重试和重新排队怎么办?在此过程中出现了许多问题。我花必要的时间来思考我的需求以及如何处理所有这些问题。

现在我已经准备好整合这些东西并构建我的通知引擎了。让我设定技术背景。我需要从一个 MVC 5 应用程序通过 Web API 2 向另外两个 MVC 5 应用程序发送消息。所以基本上我有一个 API 端点,它将通过一个 post http 操作将消息推送到消息队列代理,然后消息队列代理将其传回给订阅者。除了通知之外,这种架构还可以用于应用程序之间的许多其他数据流。有时,应根据用例和需求探索其他模式。对于我的通知,我选择了主题模式。

因此,为了完成通知流,需要在 RabbitMQ 服务器中配置

  1. 两个队列,每个应用程序一个,
  2. 创建一个主题模式的交换机
  3. 创建到队列的绑定。绑定基于目标应用程序名称。所以要发送一条消息,我使用了这个简化的代码版本
   public void Produce(IProducerMessage message)
        {
           SetModul();
           var properties = GetProperties();
 
            //Serialize
           var pm = Serialize(message);
           byte[] messageBuffer = Encoding.Default.GetBytes(pm);
           
           //Send message
           _model.BasicPublish(this._mySetting.ExchangeName, this._mySetting.RoutingKey, properties, messageBuffer);
        }
 
  private string Serialize(IProducerMessage message)
        {
            var settings = new JsonSerializerSettings {ContractResolver = new CamelCasePropertyNamesContractResolver()};
            return JsonConvert.SerializeObject(message, Formatting.Indented, settings);
        }
 
  private IBasicProperties GetProperties()
        {
            //Setup properties
            var properties = _model.CreateBasicProperties();
            properties.DeliveryMode = 2;//2 persistent, 1 non-persistent
            properties.ContentType = _mySetting.ContentType;
            properties.ContentEncoding = "base 64 utf8";
            properties.CorrelationId = "m2a";
            properties.MessageId = Guid.NewGuid().ToString();
            properties.Timestamp = new AmqpTimestamp(GetUnixTime());
            //properties.ReplyTo
            properties.Expiration =_mySetting.Expiration;//ms
         
            return properties;
        }
 
  private void SetModul()
        {
            var myconnector = new myConnector(_mySetting);
            var conn = myconnector.GetConnection();
            _model = conn.CreateModel();
        }
 
 
 public class myConnector
    {
       
        private ConnectionFactory _connectionFactory;
        private IConnection _connection;
        private readonly mySettingProducer _mySetting;
 
        public myConnector(mySettingProducer mySetting)
        {
            _mySetting = mySetting;
        }
 
        public IConnection GetConnection()
        {
            SetupConnection();
            return _connection;
        }
 
      private void SetupConnection()
        {
           // ValidateSettings();
            _connectionFactory = new ConnectionFactory
            {
                HostName = this._mySetting.Host,
                UserName = this._mySetting.Username,
                Password = this._mySetting.Password
            };
 
            if (!string.IsNullOrEmpty(this._mySetting.VirtualHost)) _connectionFactory.VirtualHost = this._mySetting.VirtualHost;
            if (this._mySetting.Port > default(int)) _connectionFactory.Port = this._mySetting.Port;
 
            _connection = _connectionFactory.CreateConnection();
          
        }     
}

要将应用程序订阅到特定队列,我使用了以下简化的代码版本

public class myConsumer : IDisposable
    {
 
        private string _hostName = "my ip address";
        private string _userName = "username";
        private string _password = "password";
        private string _queueName = "queuename";
 
        private const string VirtualHost = "virtualhostname";//if any
        private int Port = 0;//specify the port if any
 
        public delegate void OnReceiveMessage(string message);
 
        public bool Enabled { get; set; }
 
        private ConnectionFactory _connectionFactory;
        private readonly IConnection _connection;
        private readonly IModel _model;
        private Subscription _subscription;
        private readonly myNotificationsender _myNotificationHub;//for signalR
 
        public myConsumer()
        {
                     this._connectionFactory = new ConnectionFactory
                                     {
                                         HostName = this._hostName,
                                         UserName = this._userName,
                                         Password = this._password
                                     };
 
             this._myNotificationHub = new myNotificationsender();//for signalR
 
            if (!string.IsNullOrEmpty(VirtualHost))
                this._connectionFactory.VirtualHost = VirtualHost;
            if (this.Port > 0)
                this._connectionFactory.Port = this.Port;
 
            this._connection = this._connectionFactory.CreateConnection();
            this._model = this._connection.CreateModel();
            this._model.BasicQos(0, 1, false);
 
        }
 
       
       public void Start()
        {
            this._subscription = new Subscription(this._model, this._queueName, false);
 
            var consumer = new ConsumeDelegate(this.Pull);
            consumer.Invoke();
        }
        private delegate void ConsumeDelegate();
 
 
        private void Pull()
        {
 
            var consumer = new EventingBasicConsumer(this._model);
            consumer.Received += (o, e) =>
                {
                    var data = Encoding.UTF8.GetString(e.Body);
                    this._myNotificationHub.PushNotificationsToClients("", data);//for signalR
                  
                };
 
            var consumerTag = this._model.BasicConsume(this._queueName, true, consumer);// true is aknowledge that the message is received
      
        }
   }

最后剩下的是一种在不刷新页面 的情况下将通知显示给用户的方法。为此我需要 Signalr。Signalr 将使用 JavaScript 向视图通知任何传入的消息。请参阅 SignalR 的注释掉的代码。您仍然需要以下服务器端代码

public class myNotificationsender
    {
        public void PushNotificationsToClients(string name, string data)
        {
            try
            {
                var context = GlobalHost.ConnectionManager.GetHubContext<myNotificationHub>();
                context.Clients.All.addNewMessageToPage(data);
            }
            catch (System.Exception)
            {
                // do something with the exception as logging
                return;
            }
        }
    }

最后,在布局视图或任何视图中使用以下 javascript 来连接 SignalR 代码

                var notHub = $.connection.myNotificationHub;
                notHub.client.addNewMessageToPage = function(message) {
                    // Add the message to the page.
                    $('#notifbell').css("display", "");
                    $('#notif').html(message);
                };
                $.connection.hub.start();

当然,这只是一个引言,因为如果您需要的话,您仍然需要配置死信、路由失败和重试策略。序列化和大消息也应该被考虑在内。如果您必须将您的消息队列代理暴露给外部世界,您还应该注意 TLS 以保护数据交换。

我希望这个介绍能为您开始集成实现提供一些感觉和指导。

© . All rights reserved.