Azure 云服务:角色间通信





5.00/5 (12投票s)
展示了一种 Azure 云服务:角色间通信的方法。
引言
在本文中,我们将探讨在单个 Azure 云服务中,角色实例之间进行通信的一种(方法有很多)方式。本文将利用多种技术,例如 WCF / Azure ServiceBus / Azure Cloud Service / Azure 角色(我使用工作角色以方便使用,但也可以是不同角色类型的组合,例如 WebRole/WorkerRole 等等),最后但同样重要的是 Reactive Extensions (RX)。
代码在哪里?
您可以从我的 GitHub 帐户获取所有代码
https://github.com/sachabarber/AzureInterRole
Azure 云服务究竟是什么?
在开始之前,让我们先思考一下 Azure 云服务究竟是什么。 "Azure 云服务"是平台即服务 (PAAS) 的一个示例,可以看作是托管在 Microsoft 云中的一组虚拟机。这些虚拟机可以安装软件,也可以通过远程连接访问。
我认为在这种情况下,最好直接听官方说法,我们开始吧
更多的控制也意味着更低的易用性;除非您需要额外的控制选项,否则与 云服务 相比,通常能更快、更轻松地在 网站 中启动并运行 Web 应用程序。
该技术提供了两种略有不同的虚拟机选项:Web 角色实例运行 IIS 变体的 Windows Server,而工作角色实例运行不带 IIS 的同一 Windows Server 变体。云服务应用程序依赖于这两种选项的某种组合。
例如,一个简单的应用程序可能只使用一个 Web 角色,而一个更复杂的应用程序可能使用一个 Web 角色来处理来自用户的传入请求,然后将这些请求产生的处理工作传递给一个工作角色进行处理。(这种通信可以使用服务总线或 Azure 队列。)
如图所示,单个应用程序中的所有虚拟机都运行在同一个云服务中。因此,用户通过单个公共 IP 地址访问该应用程序,请求会自动在应用程序的虚拟机之间进行负载均衡。该平台将以避免单点硬件故障的方式在云服务应用程序中部署虚拟机。
尽管应用程序在虚拟机中运行,但重要的是要理解云服务提供的是 PaaS,而不是 IaaS。可以这样理解:使用 IaaS(如 Azure 虚拟机),您首先创建并配置应用程序将运行的环境,然后将应用程序部署到该环境中。您负责管理这个世界的很多部分,例如在每个虚拟机中部署操作系统的新修补程序版本。相比之下,在 PaaS 中,就好像环境已经存在了。您所要做的就是部署您的应用程序。它运行平台的管理(包括操作系统新版本的部署)由您处理。
使用云服务时,您不创建虚拟机。而是提供一个配置文件,告诉 Azure 您需要多少个虚拟机,例如三个 Web 角色实例和两个工作角色实例,平台会为您创建它们。您仍然可以选择虚拟机的大小——选项与 Azure 虚拟机相同——但您不会自己显式创建它们。如果您的应用程序需要处理更大的负载,您可以请求更多的虚拟机,Azure 将创建这些实例。如果负载降低,您可以关闭这些实例并停止付费。
云服务应用程序通常通过一个两步过程提供给用户。开发人员首先将应用程序上传到平台的暂存区域。当开发人员准备好上线应用程序时,她会使用 Azure 管理门户请求将其投入生产。这种在暂存和生产环境之间的切换可以实现零停机,从而允许正在运行的应用程序升级到新版本而不干扰其用户。
云服务还提供监控。与 Azure 虚拟机一样,它会检测到物理服务器故障,并在新机器上重新启动在其上运行的虚拟机。但云服务还会检测到虚拟机和应用程序故障,而不仅仅是硬件故障。与虚拟机不同,它在每个 Web 角色和工作角色中都有一个代理,因此能够启动新的虚拟机和应用程序实例来处理故障。
云服务的 PaaS 本质还有其他含义。其中最重要的一点是,基于此技术构建的应用程序应在任何 Web 角色或工作角色实例发生故障时都能正确运行。要实现这一点,云服务应用程序不应在其自身的虚拟机文件系统中维护状态。与使用 Azure 虚拟机创建的虚拟机不同,写入云服务虚拟机的操作不是持久的;没有像虚拟机数据磁盘这样的东西。相反,云服务应用程序应将所有状态显式写入 SQL 数据库、blob、表或某个其他外部存储。以这种方式构建应用程序可以使其更易于扩展,并且更能抵抗故障,这两者都是云服务的重要目标。
来源
http://azure.microsoft.com/en-gb/documentation/articles/fundamentals-application-models/ 发布于 2015/03/19
角色究竟是什么?
当我们谈论云服务角色时,我们实际上指的是虚拟机。目前只有几种不同的角色类型,其中最常见的是
- WebRole:带 IIS 的 Windows Server
- WorkerRole:不带 IIS 的 Windows Server 变体
Azure 云服务可以由这些角色的混合组成,最多限制为 25 个(当前)。您可以使用以下链接查看 Azure 云服务的限制
单个 Azure 云服务中包含的角色实例之间的通信方法
本节将讨论您可以使用到的各种技术,以及为什么我认为它们都不如本文演示代码中所示的方法有用。
WCF
一种解决方案是使用 WCF 终结点。让我们来谈谈终结点
角色终结点
每个 Azure 角色都可以公开内部/外部终结点,这些终结点可以是 Http/Tcp。您可以为这些终结点设置复杂的规则,规定允许什么。
您可以在 Azure 云服务中通过角色的属性来配置终结点。以下屏幕截图显示了您可以进行此操作的位置。
这将打开该角色的属性。一旦显示了属性,就只需告诉该角色您希望公开哪些终结点。以下屏幕截图进行了演示。
点击查看大图
MSDN 上还有一篇关于角色终结点的非常出色的文章,我强烈建议您阅读
一旦您设置了一些 InternalEndpoint(s) / ExternalEndpoint(s),就可以很容易地使用其中一个端口与您可能托管(使用经典的 ServiceHost
)在特定角色中的 WCF 服务进行通信。此方法的缺点是(在我看来)它扩展性不太好。因为每当您需要在一对角色之间进行通信时,都需要一端有一个托管的 WCF 服务,另一端(客户端)使用 WCF 代理来与 WCF 托管的服务(通信通道的另一端)进行通信,假设您需要大量地在不同角色之间通信,这很快就会变成一场维护噩梦。
Azure 队列/服务总线
另一种方法可能是完全绕过终结点,直接使用一些云端消息传递系统,例如 Azure 队列或服务总线消息。这确实是一个不错的想法,现在这种方法的问题在于您有非常通用的消息,它们是
- Azure 队列:
CloudQueueMessage
,它在内部存储序列化的消息数据 - Azure 服务总线:
BrokeredMessage
,它在内部存储序列化的消息数据
那么您如何知道哪些消息是为哪个角色准备的呢?当然,您可以 Peek,这是您必须做的。好的,您可以使用 Peek(Azure 队列和 Azure 服务总线都支持)来确定队列中是否有新消息可供角色监控,但您如何知道这条消息是该角色感兴趣的呢?
Azure 服务总线 BrokeredMessage(s) 通过使用 Properties 属性支持添加自定义元数据,这允许您执行类似的操作
BrokeredMessage message = new BrokenMessage(....)
message.Properties.Add("Source","Columbia");
message.Properties.Add("Weight","200g");
但据我所知(我可能弄错了),Azure 队列不允许向 CloudQueueMessage
添加元数据。
服务总线相对于 Azure 队列的另一个巨大优势是(至少在我看来)它支持基于主题的订阅,这使其在这场比较中明显胜出。您可以在此处阅读更多关于基于主题的 Azure 服务总线消息传递的内容
毫无疑问,使用服务总线可以实现良好的角色间通信,并且在不同 Azure 云服务中的角色之间通信时也能正常工作。因此,这无疑是一个不错/可行的方案。我喜欢这种方法但不太喜欢的是,它似乎需要大量的底层工作才能实现,所以我寻求了一种替代方法,这正是本文真正要介绍的。我们可以称之为更轻量级的东西。
另一种方法(本文的关键)
我预见了自己可能会利用 Azure 服务总线,同时使用 RX 来提供一些角色间消息传递。
这仅在单个 Azure 云服务实例内有效;如果您需要跨云服务通信,您将必须使用服务总线主题。
这种方法将利用 Azure 服务总线,但会利用它提供的 Azure 服务总线中继功能,将其用作常规 WCF 服务的绑定。事实证明,我不是第一个想到这一点的人,在我想到它之前,至少还有另外两个人做过此事
这两篇文章的共同点是它们都使用了 RX。我不喜欢它们的地方在于,它们创建了一个通用的 IObserver<T>
实现类,该类被传递给每个角色实例的 Observable
。结果是每个角色和处理消息的方式的逻辑都是通用的(相同的)。我不喜欢这样,因为我设想的工作方式是,中心化的消息传递层(使用 Azure 服务总线中继功能 的 WCF)将简单地公开一个 IObservable<T>
,供角色本身监听。
这样,角色本身就可以根据其自身的逻辑/标准来过滤/响应,甚至忽略某些消息。
因此,我开始重构我在这两篇文章中找到的代码,使其按照我想要的方式工作,其结果将在后面的部分中介绍。
消息
一切都始于您想要发送的消息,这是一个非常简单的类,它是一个简单的 DataContract 可序列化类,我们将与 WCF 服务一起使用它
using System.Runtime.Serialization;
namespace InterRoleBroadcast
{
[DataContract(Namespace = BroadcastNamespaces.DataContract)]
public class BroadcastEvent
{
public BroadcastEvent(string senderInstanceId, string message)
{
this.SenderInstanceId = senderInstanceId;
this.Message = message;
}
[DataMember]
public string SenderInstanceId { get; private set; }
[DataMember]
public string Message { get; private set; }
}
}
WCF服务
然后我们有这个令人难以置信的简单 WCF 服务,它允许发布者发布消息(如上所示),并且还公开了一个 IObservable<BroadcastEvent>
,以便订阅者可以使用 RX 收听通知。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.ServiceModel;
namespace InterRoleBroadcast
{
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
ConcurrencyMode = ConcurrencyMode.Multiple)]
public class BroadcastService : IBroadcastServiceContract
{
private object syncLock = new object();
private Subject<BroadcastEvent> eventStream =
new Subject<BroadcastEvent>();
public IObservable<BroadcastEvent> ObtainStream()
{
return eventStream.AsObservable();
}
public void Publish(BroadcastEvent e)
{
lock (syncLock)
{
try
{
eventStream.OnNext(e);
}
catch (Exception exception)
{
eventStream.OnError(exception);
}
}
}
}
}
客户端代理
由于存在 WCF 服务,因此有一个客户端代理是我们需要的,它如下所示
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.ServiceModel.Description;
using Microsoft.ServiceBus;
namespace InterRoleBroadcast
{
public class ServiceBusClient<T> where T :
class, IClientChannel, IDisposable
{
private ChannelFactory<T> _channelFactory;
private T _channel;
private bool _disposed = false;
public ServiceBusClient()
{
CreateChannel();
}
public void CreateChannel()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);
NetTcpRelayBinding binding = new NetTcpRelayBinding(EndToEndSecurityMode.None,
RelayClientAuthenticationType.None);
TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
EndpointInformation.KeyName, EndpointInformation.Key);
ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)),
binding, new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);
_channelFactory = new ChannelFactory<T>(endpoint);
_channel = _channelFactory.CreateChannel();
_channel.Faulted += Channel_Faulted;
}
void Channel_Faulted(object sender, EventArgs e)
{
ICommunicationObject theChannel = (ICommunicationObject) sender;
theChannel.Faulted -= Channel_Faulted;
KillChannel(theChannel);
KillChannelFactory(_channelFactory);
CreateChannel();
}
public T Client
{
get
{
if (_channel.State == CommunicationState.Opening)
{
return null;
}
if (_channel.State != CommunicationState.Opened)
{
_channel.Open();
}
return _channel;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void KillChannel(ICommunicationObject theChannel)
{
if (theChannel.State == CommunicationState.Opened)
{
theChannel.Close();
}
else
{
theChannel.Abort();
}
}
private void KillChannelFactory<T>(ChannelFactory<T> theChannelFactory)
{
if (theChannelFactory.State == CommunicationState.Opened)
{
theChannelFactory.Close();
}
else
{
theChannelFactory.Abort();
}
}
public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
KillChannel(_channel);
}
catch
{
// Ignore exceptions
}
try
{
KillChannelFactory(_channelFactory);
}
catch
{
// Ignore exceptions
}
_disposed = true;
}
}
}
~ServiceBusClient()
{
Dispose(false);
}
}
}
服务宿主
还需要考虑 WCF 服务的宿主,如下所示
using System;
using System.ServiceModel;
using System.ServiceModel.Description;
using Microsoft.ServiceBus;
namespace InterRoleBroadcast
{
public class ServiceBusHost<T> where T : class
{
private ServiceHost _serviceHost;
private bool _disposed = false;
public ServiceBusHost()
{
CreateHost();
}
private void CreateHost()
{
Uri address = ServiceBusEnvironment.CreateServiceUri("sb",
EndpointInformation.ServiceNamespace, EndpointInformation.ServicePath);
NetTcpRelayBinding binding = new NetTcpRelayBinding(
EndToEndSecurityMode.None, RelayClientAuthenticationType.None);
TransportClientEndpointBehavior credentialsBehaviour =
new TransportClientEndpointBehavior();
credentialsBehaviour.TokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(
EndpointInformation.KeyName, EndpointInformation.Key);
ServiceEndpoint endpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(T)), binding,
new EndpointAddress(address));
endpoint.Behaviors.Add(credentialsBehaviour);
_serviceHost = new ServiceHost(Activator.CreateInstance(typeof(T)));
_serviceHost.Faulted += ServiceHost_Faulted;
_serviceHost.Description.Endpoints.Add(endpoint);
_serviceHost.Open();
}
void ServiceHost_Faulted(object sender, EventArgs e)
{
ServiceHost host = (ServiceHost)sender;
host.Faulted -= ServiceHost_Faulted;
KillHost(host);
CreateHost();
}
public T ServiceInstance
{
get
{
return _serviceHost.SingletonInstance as T;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void KillHost(ServiceHost theHost)
{
if (theHost.State == CommunicationState.Opened)
{
theHost.Close();
}
else
{
theHost.Abort();
}
}
public void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
try
{
KillHost(_serviceHost);
}
catch
{
// Ignore exceptions
}
finally
{
_disposed = true;
}
}
}
}
~ServiceBusHost()
{
Dispose(false);
}
}
}
客户端代理和服务宿主共同点是它们都使用了 Azure 服务总线中继功能,这本质上允许 Azure 服务总线与 WCF 一起使用。
一些粘合剂
为了使通信更容易处理,还有一个简单的辅助类,它实际上只公开了发布方(客户端通道)和服务本身(订阅数据源)
using System;
namespace InterRoleBroadcast
{
public class BroadcastCommunicator : IDisposable
{
private ServiceBusClient<IBroadcastServiceChannel> _publisher;
private ServiceBusHost<BroadcastService> _subscriber;
private bool _disposed = false;
public void Publish(BroadcastEvent e)
{
if (this.Publisher.Client != null)
{
this.Publisher.Client.Publish(e);
}
}
public IObservable<BroadcastEvent> BroadcastEventsStream
{
get { return this.Subscriber.ServiceInstance.ObtainStream(); }
}
private ServiceBusClient<IBroadcastServiceChannel> Publisher
{
get
{
if (_publisher == null)
{
_publisher = new ServiceBusClient<IBroadcastServiceChannel>();
}
return _publisher;
}
}
private ServiceBusHost<BroadcastService> Subscriber
{
get
{
if (_subscriber == null)
{
_subscriber = new ServiceBusHost<BroadcastService>();
}
return _subscriber;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose(bool disposing)
{
if (!_disposed && disposing)
{
try
{
_subscriber.Dispose();
_subscriber = null;
}
catch
{
// Ignore exceptions
}
try
{
_publisher.Dispose();
_publisher = null;
}
catch
{
// Ignore exceptions
}
_disposed = true;
}
}
~BroadcastCommunicator()
{
Dispose(false);
}
}
}
角色代码
在这些部分到位后,只需要使用它。这就像这个 WorkerRole 代码一样简单
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using InterRoleBroadcast;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Diagnostics;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage;
using System.Reactive.Linq;
namespace WorkerRole1
{
public class WorkerRole : RoleEntryPoint
{
private volatile BroadcastCommunicator _broadcastCommunicator;
private volatile IDisposable _broadcastSubscription;
private volatile bool _keepLooping = true;
public override bool OnStart()
{
_broadcastCommunicator = new BroadcastCommunicator();
//worker1
_broadcastSubscription = _broadcastCommunicator.BroadcastEventsStream
.Where(x => x.SenderInstanceId != RoleEnvironment.CurrentRoleInstance.Id)
.Subscribe(
theEvent =>
{
Logger.AddLogEntry(
String.Format("{0} got message from {1} {2}",
RoleEnvironment.CurrentRoleInstance.Id,
theEvent.SenderInstanceId,
theEvent.Message));
},
ex =>
{
Logger.AddLogEntry(ex);
});
return base.OnStart();
}
public override void Run()
{
// Just keep sending messasges
while (_keepLooping)
{
int secs = 2;
Thread.Sleep(secs * 1000);
try
{
BroadcastEvent broadcastEvent =
new BroadcastEvent(RoleEnvironment.CurrentRoleInstance.Id,
"Hello world from WorkerRole1");
_broadcastCommunicator.Publish(broadcastEvent);
}
catch (Exception ex)
{
Logger.AddLogEntry(ex);
}
}
}
public override void OnStop()
{
_keepLooping = false;
if (_broadcastCommunicator != null)
{
_broadcastCommunicator.Dispose();
}
if (_broadcastSubscription != null)
{
_broadcastSubscription.Dispose();
}
base.OnStop();
}
}
}
如何在您自己的应用程序中使用此方法?
只需确保使用您自己的服务总线密钥更新 EndpointInformation
类。
不同云服务之间的通信方法
正如我之前所说,本文描述的方法不适用于在单独的云服务中进行角色之间的通信。对于这种情况,您需要使用以下方法之一
- WCF
- 使用 Azure 服务总线发送分布式消息(我会使用此方法并使用 基于主题的订阅)
就这些
好了,这次就说到这里,希望您从这次讨论中学到了一些东西,一如既往,欢迎评论/点赞