65.9K
CodeProject 正在变化。 阅读更多。
Home

Android:如何通过 Websocket 从多个 .NET 应用程序接收通知消息

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (17投票s)

2012年8月2日

CPOL

8分钟阅读

viewsIcon

78207

downloadIcon

2539

一个简单的示例,展示了 Android 应用程序如何使用 Websocket 订阅来自多个 .NET 应用程序的通知消息。

相关文章

引言

这是文章 Android:如何通过 TCP 与 .NET 应用程序通信 的免费续篇,该文章描述了 Android 和 .NET 应用程序之间的请求-响应通信。

下面的文章演示了一种不同的场景。它展示了 发布-订阅通信,其中 Android 注册以接收可能来自多个 .NET 应用程序的通知消息
这意味着 Android 应用程序是一个订阅者,异步接收来自发布 .NET 应用程序的指定通知。

在此场景中的主要一点是保持订阅者和发布者解耦,以便它们之间没有假设。订阅应用程序不必了解发送通知消息的应用程序,发布应用程序也不必了解其通知的接收者。
为了保持它们的解耦,示例实现了一个代理服务,该服务能够注册订阅者并将消息从发布者转发给它们。然后,订阅者将使用此代理来订阅它们想要的任何消息,而发布者将仅将通知消息发送给此代理,而不知道它们将转发给谁。

433513/PublishSubcribeBrokerAndroid.gif

示例中的通信基于 WebSocket 协议,该协议在 允许仅 Internet HTTP 连接的环境中 提供真正的 全双工通信(没有轮询或长轮询开销)。
(WebSocket 以普通 HTTP 请求开始,然后 TCP 连接保持打开状态,这允许将消息推送到客户端。)


下面的示例使用 Eneter Messaging Framework,它提供了跨平台进程间通信功能,能够连接 Android 应用程序和 .NET 应用程序。它还支持使用 WebSocket 进行通信。

(该框架是免费的,可以从 http://www.eneter.net/ProductDownload.htm 下载。您需要下载 Eneter for .NET 和 Eneter for Android。
更多技术细节可以在 技术信息 中找到。)

Android 特性

在 Android 设备上实现发布-订阅场景比在独立的个人计算机上要复杂得多。Android 实现必须考虑以下特性:

  • 当配置更改时(例如屏幕方向更改时),Android 可能会重启应用程序。
  • Android 可能会切换到睡眠模式,CPU 会关闭。
  • 根据设备设置,WiFi 可能会在几分钟不活动后关闭(例如在睡眠模式期间)。

当配置更改时,Android 可能会重启应用程序

Android 应用程序的生命周期不像独立的桌面应用程序那样简单。Android 应用程序的 生命周期 经历几个状态,并且应用程序在其生命周期中可能会被重启。
典型的例子是当 Android 设备更改方向时,例如从纵向切换到横向。应用程序将被重启,以便为横向位置提供布局。然后,应用程序有责任处理这种情况并从重启中恢复。

在我们的场景中,我们需要确保在重启过程中(例如,当用户更改设备方向时),网络连接不会丢失,并且应用程序将继续接收已订阅的消息。为了确保此行为,我们将使用名为 Fragments 的功能。Fragment 是一个类,其实例可以设置为在重启过程中“不”被销毁。因此,我们将实现一个非 UI Fragment,其中将包含网络通信逻辑。重启后,应用程序的 Activity 可以获取包含网络连接的 Fragment,从而继续处理通知消息。

Fragment 的一个小小不便之处在于,此功能是从 Android 3.0 (API Level 11) 开始提供的,因此对于市场上大多数 设备 来说,它还不可用。
幸运的是,有一个库可以从 Android 1.6 提供此功能。您可以使用“Android SDK Manager”(请参见下图)进行安装,然后将 android-support-v13.jar 库直接包含到您的项目中。

433513/InstallAndroidSupportLibrary.jpg

Android 可能会切换到睡眠模式,CPU 会关闭

为了节省电池电量,Android 设备可能会切换到睡眠模式,关闭 CPU。在此状态下,应用程序未运行,因此无法处理传入的通知消息。为了解决这种情况,可以考虑以下替代方案:

  1. 忽略睡眠模式。虽然 CPU 已关闭,但 WiFi 天线仍然开启(取决于设置,它可以保持开启几分钟或永远开启),并且传入的消息会被接收和缓冲。然后,当设备被唤醒时,通知消息会被推送到应用程序进行处理。
  2. 使用 AlarmManager 定期唤醒 CPU 并调用代码执行某些活动。然而,在我们的例子中,Android 应用程序是等待代理推送通知消息的订阅者,并且没有明确的活动可以安排来接收它们(在我们的场景中我们不进行轮询)。因此,使用 AlarmManager 可能对我们的通信没有用。
  3. 使用 PowerManager 和 PARTIAL_WAKE_LOCK 来保持 CPU 运行。此替代方案可行,但您应该记住,这会对电池消耗产生重大影响。
  4. 使用 GCM (以前的 C2DM)。这是 Android 平台支持的 Google 服务。它提供了向 Android 应用程序发送简短通知消息的可能性。通常,此通知消息旨在告知 Android 应用程序服务器上有可用数据。然后,Android 应用程序负责连接其服务器并请求数据。优点是,如果在睡眠模式下收到通知消息,CPU 可以被唤醒,应用程序可以获得控制权来处理传入的通知。此替代方案的缺点可能是:
    • 性能 - 获取数据到 Android 可能需要跨越四个进程/网络边界
      服务 --通知--> GCM --通知--> Android --请求数据--> 服务 --响应数据--> Android
    • 依赖第三方服务 - 使用 GCM 会使软件依赖于 Google 服务。
对于下面的示例,我选择了第一个替代方案。这意味着如果 CPU 处于睡眠状态,消息将被接收,但稍后在 CPU 被唤醒时处理。
(说实话,我最初想实现当 WiFi 接收到某些数据时唤醒 CPU。例如,如果设备通过 WiFi 建立 TCP 连接并进入睡眠状态,那么当接收到 TCP 数据包时,CPU 会被唤醒,以便 Android 应用程序可以处理接收到的数据。不幸的是,我没有找到实现它的方法,也不确定是否可能。如果您有任何想法,我将非常感兴趣。)

WiFi 可能会在几分钟不活动后关闭

根据 WiFi 睡眠策略设置,WiFi 可能会在几分钟不活动后关闭(也可以设置为从不睡眠)。如果 WiFi 天线关闭,连接会中断,然后当 CPU 被唤醒时,应用程序必须与代理重新建立连接,并且还必须重新订阅以接收通知消息。

下面的示例显示了如何获得连接变化的通知并在屏幕上显示状态。它还显示了如何重新建立与代理的连接。

其他 Android 特性

  1. 不要忘记为您的 Android 应用程序设置 INTERNET 权限。
  2. 如果 Android 模拟器需要访问在本地计算机上运行并通过 127.0.0.1 (loopback) 暴露的服务,请使用特殊 IP 地址 10.0.2.2。
有关更多详细信息,请参阅我先前的文章 Android:如何通过 TCP 与 .NET 应用程序通信

代理服务

代理是一个简单的 .NET 控制台应用程序(服务),它接收来自发布应用程序的消息并将它们转发给所有已订阅的接收者。在本场景中,已订阅的接收者是 Android 应用程序。

using System;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.WebSocketMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace BrokerApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Create the broker.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory();
            IDuplexBroker aBroker = aBrokerFactory.CreateBroker();

            // Create messaging based on Websockets.
            IMessagingSystemFactory aMessaging = new WebSocketMessagingSystemFactory();
            IDuplexInputChannel aBrokerInputChannel = 
              aMessaging.CreateDuplexInputChannel("ws://127.0.0.1:8095/MyBroker/");

            // Attach the input channel to the broker and start listening.
            aBroker.AttachDuplexInputChannel(aBrokerInputChannel);

            // Clients can use the broker now to publish and subscribe messages.
            // Note: When the broker receives a message it forwards it to clients that are
            //       subscribed for this type of message.
            //       Clients that are not subscribed to that type of message will not receive it.
            Console.WriteLine("The broker is running. Press ENTER to stop.");
            Console.ReadLine();

            // Detach the input channel and stop listening.
            aBroker.DetachDuplexInputChannel();
        }
    }
}

发布者 - .NET 应用程序

发布者是一个 .NET 应用程序,提供简单的 UI,用于向代理发送三种类型的通知消息。代理接收消息并将其转发给已订阅的客户端。

using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.WebSocketMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Publisher
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public string CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }


        // Broker client is used to send messages to the broker,
        // that forwards messages to subscribers.
        private IDuplexBrokerClient myBrokerClient;

        // Connects broker client with the broker.
        private IDuplexOutputChannel myOutputChannel;

        // Serializer used to serialize notification messages.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();


        public Form1()
        {
            InitializeComponent();

            // Create broker client responsible for sending messages to the broker.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory();
            myBrokerClient = aBrokerFactory.CreateBrokerClient();

            // Create output channel to send messages via websockets.
            IMessagingSystemFactory aMessaging = new WebSocketMessagingSystemFactory();
            myOutputChannel = 
              aMessaging.CreateDuplexOutputChannel("ws://127.0.0.1:8095/MyBroker/");

            // Register to be informed about connection status with the broker.
            myOutputChannel.ConnectionOpened += OnConnectionOpened;
            myOutputChannel.ConnectionClosed += OnConnectionClosed;

            try
            {
                // Attach the output channel to the broker client to be able to send messages.
                myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
            }
            catch
            {
                // if opening connection failed then it can be reopen with
                // the 'Reconnect' button.
            }
        }

        // Correctly close the output channel.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            // Detach the input channel and stop the listening.
            // Note: It releases the listening thread.
            myBrokerClient.DetachDuplexOutputChannel();
        }

        // Indicates the connection with the broker was open.
        private void OnConnectionOpened(object sender, DuplexChannelEventArgs e)
        {
            DisplayConnectionStatus(true);
        }

        // Indicates the connection with the broker was closed.
        private void OnConnectionClosed(object sender, DuplexChannelEventArgs e)
        {
            DisplayConnectionStatus(false);
        }

        private void DisplayConnectionStatus(bool isConnected)
        {
            string aStatus = isConnected ? "Connected" : "Disconnected";

            // Enforce manipulating UI from the UI thread.
            UI(() => StatusLabel.Text = aStatus);
        }

        // Send NotifyMsg1
        private void Notify1Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg1 aMsg = new NotifyMsg1();
            aMsg.CurrentTime = DateTime.Now.ToString();

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
        }

        // Send NotifyMsg2
        private void Notify2Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg2 aMsg = new NotifyMsg2();
            aMsg.Number = 12345;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
        }

        // Send NotifyMsg3
        private void Notify3Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg3 aMsg = new NotifyMsg3();
            aMsg.TextMessage = "My notifying text message.";

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
        }

        // Reestablishes the connection with the broker.
        private void ReconnectBtn_Click(object sender, EventArgs e)
        {
            myBrokerClient.DetachDuplexOutputChannel();
           
            try
            {
                // Attach the output channel to the broker client to be able to send messages.
                myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
            }
            catch
            {
                // if opening connection failed then it can be reopen with
                // the 'Reconnect' button.
            }
        }

        // Helper method to invoke some functionality in UI thread.
        private void UI(Action uiMethod)
        {
            // If we are not in the UI thread then we must synchronize via the invoke mechanism.
            if (InvokeRequired)
            {
                Invoke(uiMethod);
            }
            else
            {
                uiMethod();
            }
        }

    }
}

订阅者 - Android 应用程序

Android 应用程序实现了从代理服务接收推送的通知消息的功能。它提供了一个简单的 UI,允许订阅特定消息。负责网络通信的功能实现在 Fragment 中,从而确保在应用程序重启时(例如,当设备方向从纵向更改为横向时)网络连接不会中断。如果设备进入睡眠模式,消息会被接收(直到 WiFi 处于睡眠状态),但不会被处理。然后,当设备被唤醒时,接收到的消息将被应用程序处理。如果 WiFi 也进入睡眠状态,连接将中断,这将在设备被唤醒时被应用程序检测到。然后用户可以恢复与代理的连接。

这里是 Fragment 的实现,它在重启过程中不会被销毁。

package eneter.android;

import java.util.*;

import android.os.Bundle;
import android.support.v4.app.Fragment;
import eneter.messaging.diagnostic.EneterTrace;
import eneter.messaging.messagingsystems.messagingsystembase.*;
import eneter.messaging.messagingsystems.websocketmessagingsystem.WebSocketMessagingSystemFactory;
import eneter.messaging.nodes.broker.*;
import eneter.net.system.*;

// Implements the communication with the broker as a non-UI fragment.
// It ensures the communication will not be interrupted e.g. in case
// the device changed the screen orientation.
public class BrokerClientFragment extends Fragment
{
    // Eneter communication.
    private IDuplexBrokerClient myBrokerClient;
    private IDuplexOutputChannel myOutputChannel;
    
    // Notification messages subscribed in the broker.
    private Set<String> mySubscribedNotifications = Collections.synchronizedSet(new HashSet<String>());

    // Events that will be pushed to the activity.
    private EventImpl<Boolean> myConnectionStatusChangedEvent = new EventImpl<Boolean>();
    private EventImpl<BrokerMessageReceivedEventArgs> myBrokerMessageReceived =
            new EventImpl<BrokerMessageReceivedEventArgs>();
    
    
    public BrokerClientFragment() throws Exception
    {
        // Instantiate the broker client.
        IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory();
        myBrokerClient = aBrokerFactory.createBrokerClient();
        
        // Handle notification messages from the bropker.
        myBrokerClient.brokerMessageReceived().subscribe(myOnBrokerMessageReceived);
        
        // Let's use Websocket messaging.
        // Note: 10.0.2.2 is a special alias to access the loopback (127.0.0.1)
        //       on the development machine from the emulator.
        IMessagingSystemFactory aMessaging = new WebSocketMessagingSystemFactory();
        myOutputChannel = 
                aMessaging.createDuplexOutputChannel("ws://10.0.2.2:8095/MyBroker/");

        // Observe the connection status.
        myOutputChannel.connectionClosed().subscribe(myOnConnectionClosed);
        myOutputChannel.connectionOpened().subscribe(myOnConnectionOpened);
    }

    // Activity registers here to observe the connection
    // with the broker.
    public Event<Boolean> connectionStatusChanged()
    {
        return myConnectionStatusChangedEvent.getApi();
    }
    
    // Activity registers here to process notification messages
    // received from the broker.
    public Event<BrokerMessageReceivedEventArgs> notifyMessageReceived()
    {
        return myBrokerMessageReceived.getApi();
    }
    
    // Called when the Activity.onCreate() has returned.
    // It opens the connection with the broker.
    @Override
    public void onCreate(Bundle savedInstanceState)
    {
        super.onCreate(savedInstanceState);
        
        // Set to keep this fragment during the restart.
        setRetainInstance(true); 
        
        // Open connection with the broker.
        try
        {
            // Attach the output channel and be able to communicate with broker.
            myBrokerClient.attachDuplexOutputChannel(myOutputChannel);
        }
        catch (Exception err)
        {
            EneterTrace.warning("Opening the connection failed.", err);
        }
    }
    
    // Called when the Activity is destroyed.
    // Note: setRetainInstance(true) was applied, so it is not called
    //       when the Activity is just restarted. 
    //       (e.g. if the device changed the orientation)
    @Override
    public void onDestroy()
    {
        // Close websocket connection.
        // Stop listening to response messages.
        myBrokerClient.detachDuplexOutputChannel();
        
        super.onDestroy();
    }
    
    // Subscribes in the broker for specified message type.
    public void subscribeInBroker(String eventType) throws Exception
    {
        // Subscribe for events in the broker.
        myBrokerClient.subscribe(eventType);
        
        // Store the subscribed event type.
        mySubscribedNotifications.add(eventType);
    }
    
    // Unsubscribes from the broker the specified event type.
    public void unsubscribeFromBroker(String eventType) throws Exception
    {
        // Unsubscribe the given event from the broker.
        myBrokerClient.unsubscribe(eventType);
        
        // Remove the event from the storage.
        mySubscribedNotifications.remove(eventType);
    }
    
    // Unsubscribes from the broker all events.
    public void unsubscribe() throws Exception
    {
        // Unsubscribe all events from the broker.
        myBrokerClient.unsubscribe();
        
        // Clear the storage.
        mySubscribedNotifications.clear();
    }

    // Recovers the connection with the broker.
    public void recoverConnection()
    {
        try
        {
            myBrokerClient.detachDuplexOutputChannel();
            myBrokerClient.attachDuplexOutputChannel(myOutputChannel);
            
            // if messages were subscribed before the connection
            // got broken then recover the subscription too.
            if (!mySubscribedNotifications.isEmpty())
            {
                String[] aSubscribedEvents = mySubscribedNotifications.toArray(new String[0]);
                myBrokerClient.subscribe(aSubscribedEvents);
            }
        }
        catch (Exception err)
        {
            EneterTrace.warning("Recovering the connection failed.", err);
        }
    }
    
    // Returns true if the connection with the broker is established.
    public boolean isConnected()
    {
        return myBrokerClient.isDuplexOutputChannelAttached() &&
                myBrokerClient.getAttachedDuplexOutputChannel().isConnected();
    }
    
 
    private void onConnectionChanged(boolean isConnectionOpen)
    {
        try
        {
            // Push information about broken connection to the subscribed activity.
            myConnectionStatusChangedEvent.raise(this, isConnectionOpen);
        }
        catch (Exception err)
        {
            EneterTrace.error("ConnectionChanged handler detected error.", err);
        }
    }
    
    
    // Processes when open connection is indicated.
    private EventHandler<DuplexChannelEventArgs> myOnConnectionOpened =
            new EventHandler<DuplexChannelEventArgs>()
            {
                @Override
                public void onEvent(Object sender, DuplexChannelEventArgs e)
                {
                    onConnectionChanged(true);
                }
            };
    
    // Processes when close connection is indicated.
    private EventHandler<DuplexChannelEventArgs> myOnConnectionClosed =
            new EventHandler<DuplexChannelEventArgs>()
            {
                @Override
                public void onEvent(Object sender, DuplexChannelEventArgs e)
                {
                    onConnectionChanged(false);
                }
            };
            
    // Processes when a notfication message from the broker is received.
    private EventHandler<BrokerMessageReceivedEventArgs> myOnBrokerMessageReceived =
            new EventHandler<BrokerMessageReceivedEventArgs>()
            {
                @Override
                public void onEvent(Object sender, BrokerMessageReceivedEventArgs e)
                {
                    try
                    {
                        // Push the message to the subscribed activity.
                        myBrokerMessageReceived.raise(this, e);
                    }
                    catch (Exception err)
                    {
                        EneterTrace.error("MessageReceived handler detected error.", err);
                    }
                }
            };
}

这里是使用 Fragment 的 Activity,它在重启后仍然存在。
package eneter.android;

import eneter.messaging.dataprocessing.serializing.*;
import eneter.messaging.diagnostic.EneterTrace;
import eneter.messaging.nodes.broker.*;
import eneter.net.system.*;
import android.os.*;
import android.support.v4.app.*;
import android.view.View;
import android.view.View.OnClickListener;
import android.widget.*;

// Note: The example is based on API Level 7.
//       So if we want to use the Fragment functionality, we need
//       to use 'Android Compatibility Package' and its library
//       android-support-v13.jar.
//       Therefore the Activity is derived from FragmentActivity
//       instead of Activity.
public class AndroidConsumerActivity extends FragmentActivity
{
    // Notification message 1
    public static class NotifyMsg1
    {
        public String CurrentTime;
    }

    // Notification message 2
    public static class NotifyMsg2
    {
        public int Number;
    }

    // Notification message 3
    public static class NotifyMsg3
    {
        public String TextMessage;
    }
    
    
    // For marshaling into the UI thread.
    private Handler myRefresh = new Handler();
    
    // UI control displaying incoming messages.
    private EditText myMessage1EditText;
    private EditText myMessage2EditText;
    private EditText myMessage3EditText;
    private TextView myConnectionStatusTextView;
    
    // Fragment surviving restarts.
    private BrokerClientFragment myBrokerClientFragment;
    
    // For deserializing incoming messages.
    private ISerializer mySerializer = new XmlStringSerializer();
    
    
    // Called when the Activiry is first time created or restarted.
    // (e.g. when the device changed the screen orientation) 
    @Override
    public void onCreate(Bundle savedInstanceState)
    {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.main);
        
        // Get UI controls.
        myMessage1EditText = (EditText) findViewById(R.id.received1EditText);
        myMessage2EditText = (EditText) findViewById(R.id.received2EditText);
        myMessage3EditText = (EditText) findViewById(R.id.received3EditText);
        myConnectionStatusTextView = (TextView) findViewById(R.id.statusTextView);
        Button aSubscribe1Btn = (Button) findViewById(R.id.subscribe1Btn);
        Button anUnsubscribe1Btn = (Button) findViewById(R.id.unsubscribe1Btn);
        Button aSubscribe2Btn = (Button) findViewById(R.id.subscribe2Btn);
        Button anUnsubscribe2Btn = (Button) findViewById(R.id.unsubscribe2Btn);
        Button aSubscribe3Btn = (Button) findViewById(R.id.subscribe3Btn);
        Button anUnsubscribe3Btn = (Button) findViewById(R.id.unsubscribe3Btn);
        Button aReconnectBtn = (Button) findViewById(R.id.reconnectBtn);
        
        try
        {
            // Try to get the broker fragment.
            FragmentManager aFragmentManager = getSupportFragmentManager();
            myBrokerClientFragment = 
                    (BrokerClientFragment) aFragmentManager.findFragmentByTag("MyBroker");
            
            // If the broker fragment is not there then this is the
            // first time start.
            if (myBrokerClientFragment == null)
            {
                // Create and register the broker fragment that will survive
                // restarting this activity.
                myBrokerClientFragment = new BrokerClientFragment();
                
                FragmentTransaction aTransaction = aFragmentManager.beginTransaction();
                aTransaction.add(myBrokerClientFragment, "MyBroker");
                aTransaction.commit();
            }
            else
            {
                // This is not the first time, so it is the restart.
                // Therefore check the connection with the broker.
                if (!myBrokerClientFragment.isConnected())
                {
                    myBrokerClientFragment.recoverConnection();
                }
            }
            
            // Display the connection status.
            displayConnectionStatus(myBrokerClientFragment.isConnected());
            
            // Register to be informed about the connection.
            myBrokerClientFragment.connectionStatusChanged().subscribe(myOnConnectionChanged);
            
            // Register to be informed about the notification messages
            // received from the broker.
            myBrokerClientFragment.notifyMessageReceived().subscribe(myOnBrokerMessageReceived);
                
            // Subscribe to handle clicks on UI buttons.
            aSubscribe1Btn.setOnClickListener(mySubscribe1BtnClick);
            anUnsubscribe1Btn.setOnClickListener(myUnsubscribe1BtnClick);
            aSubscribe2Btn.setOnClickListener(mySubscribe2BtnClick);
            anUnsubscribe2Btn.setOnClickListener(myUnsubscribe2BtnClick);
            aSubscribe3Btn.setOnClickListener(mySubscribe3BtnClick);
            anUnsubscribe3Btn.setOnClickListener(myUnsubscribe3BtnClick);
            aReconnectBtn.setOnClickListener(myReconnectBtnClick);
        }
        catch (Exception err)
        {
            EneterTrace.error("Creating of activity failed.", err);
        }
    }
    
    // It is called when the Activity is destroyed or restarted.
    // e.g. when the device changed the screen orientation.
    @Override
    public void onDestroy()
    {
        // This activity ends so we need to unregister this activity
        // from events pushed from the broker fragment.
        // Note: If not unsubscribed then after restart the fragment would push events also
        //       into this already destroyed activity.
        myBrokerClientFragment.notifyMessageReceived().unsubscribe(myOnBrokerMessageReceived);
        myBrokerClientFragment.connectionStatusChanged().unsubscribe(myOnConnectionChanged);
        
        super.onDestroy();
    } 
    
    // Processes notification messages received from the broker.
    private void onBrokerMessageReceived(Object sender, final BrokerMessageReceivedEventArgs e)
    {
        if (e.getReceivingError() == null)
        {
            // Display the message using the UI thread.
            myRefresh.post(new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        // Process notifications.
                        if (e.getMessageTypeId().equals("MyNotifyMsg1"))
                        {
                            NotifyMsg1 aMessage = mySerializer.deserialize(e.getMessage(), NotifyMsg1.class);
                            myMessage1EditText.setText(aMessage.CurrentTime);
                        }
                        else if (e.getMessageTypeId().equals("MyNotifyMsg2"))
                        {
                            NotifyMsg2 aMessage = mySerializer.deserialize(e.getMessage(), NotifyMsg2.class);
                            myMessage2EditText.setText(Integer.toString(aMessage.Number));
                        }
                        else if (e.getMessageTypeId().equals("MyNotifyMsg3"))
                        {
                            NotifyMsg3 aMessage = mySerializer.deserialize(e.getMessage(), NotifyMsg3.class);
                            myMessage3EditText.setText(aMessage.TextMessage);
                        }
                    }
                    catch (Exception err)
                    {
                        EneterTrace.error("Processing message from the broker failed.", err);
                    }
                }
            });
        }
    }
    
    // Subscribes for specified notification message in the broker.
    private void subscribe(String eventType)
    {
        try
        {
            // Send request to the broker to subscribe.
            myBrokerClientFragment.subscribeInBroker(eventType);
        }
        catch (Exception err)
        {
            EneterTrace.error("Subscribing to broker failed.", err);
        }
    }
    
    // Unsubscribes specified notification message from the broker.
    private void unsubscribe(String eventType)
    {
        try
        {
            // Send request to the broker to unsubscribe.
            myBrokerClientFragment.unsubscribeFromBroker(eventType);
        }
        catch (Exception err)
        {
            EneterTrace.error("Unsubscribing from broker failed.", err);
        }
    }
    
    // Displays the connection status.
    private void displayConnectionStatus(boolean isConnected)
    {
        // Get the connection status.
        final String aStatus = isConnected ? "Connected" : "Disconnected";
        
        // This event can come from various threads, so
        // enforce using the UI thread for displaying.
        myRefresh.post(new Runnable()
            {
                @Override
                public void run()
                {
                    myConnectionStatusTextView.setText(aStatus);
                }
            });
    }
    
    // Handler processing connection changes.
    private EventHandler<Boolean> myOnConnectionChanged =
            new EventHandler<Boolean>()
    {
        @Override
        public void onEvent(Object sender, Boolean e)
        {
            displayConnectionStatus(e);
        }
    };
    
    // Helper processing notifications received from the broker.
    private EventHandler<BrokerMessageReceivedEventArgs> myOnBrokerMessageReceived =
            new EventHandler<BrokerMessageReceivedEventArgs>()
    {
        @Override
        public void onEvent(Object sender, BrokerMessageReceivedEventArgs e)
        {
            onBrokerMessageReceived(sender, e);
        }
    };
    
    private OnClickListener mySubscribe1BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            subscribe("MyNotifyMsg1");
        }
    };
    
    private OnClickListener myUnsubscribe1BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            myMessage1EditText.setText("");
            unsubscribe("MyNotifyMsg1");
        }
    };
    
    private OnClickListener mySubscribe2BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            subscribe("MyNotifyMsg2");
        }
    };
    
    private OnClickListener myUnsubscribe2BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            myMessage2EditText.setText("");
            unsubscribe("MyNotifyMsg2");
        }
    };
    
    private OnClickListener mySubscribe3BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            subscribe("MyNotifyMsg3");
        }
    };
    
    private OnClickListener myUnsubscribe3BtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            myMessage3EditText.setText("");
            unsubscribe("MyNotifyMsg3");
        }
    };
    
    private OnClickListener myReconnectBtnClick = new OnClickListener()
    {
        @Override
        public void onClick(View v)
        {
            myBrokerClientFragment.recoverConnection();
        }
    };
}

现在来看看应用程序如何协同工作。三个发布者将通知消息发送给代理,代理将它们转发给已订阅的 Android 应用程序。整个通信通过 WebSockets 实现。

433513/NotificationsToAndroidUI.jpg

© . All rights reserved.