消息总线/代理的几种实现






4.18/5 (5投票s)
重温 C#.NET 中的事件聚合器/消息总线/代理
引言
当项目开始增长时,始终变得越来越重要的一种需求是组件之间的通信方式。 事件聚合器(或消息总线/代理)用于此目的。
几年前,我一直在使用 .NET 事件和 Prism 的事件聚合器,如我的文章所示。 对于委托 EventHandler<TEventArgs>
,它最适合小型项目(由于发布者保持对订阅者处理程序的引用而导致的紧密耦合)。 对于 Prism 的事件聚合器,虽然它很棒,但我希望实现自己的轻量级聚合器,以满足我自己的需求。
虽然有许多基于不同方法/关注点/复杂性的不同实现,但我想介绍几个极简的实现,暂时忽略线程处理。
实现
假设我们有 3 个组件:Publisher
、Subscriber1
和 Subscriber2
。 这三个组件正在使用 Person
对象。 发布者将在每次创建/修改/删除新人员时发布消息,并且两个订阅者都希望订阅以接收这些消息,从而做出相应的反应。
我们将引入一个“消息传递”服务,该服务被注入到发布者和订阅者中以供他们使用。 事情会像这样运作
// Then on Publisher side:
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
// Property setters
});
// Subscribers receive and react:
messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
private void OnPersonCreated(PersonCreatedMessage message)
{
// Access to message's properties
}
实现 #1:通过 Action<TMessage> 委托进行消息传递
并非所有类型的消息订阅者都需要接收,最好明确指定我们要订阅的消息类型。 这些消息是 IMessage
public interface IMessage
{
string Description { get; } // Can remove or add more
}
对于每个订阅者,我们可以以各种形式识别“收据”,例如 GUID、数据库中唯一的递增整数(如 ID),或如下所示的不同方式。 这是为了使订阅者能够取消订阅接收消息。
public interface ISubscription<T> where T : IMessage
{
Action<T> ActionHandler { get; }
}
public class Subscription<T> : ISubscription<T> where T : IMessage
{
public Action<T> ActionHandler { get; private set; }
public Subscription(Action<T> action)
{
ActionHandler = action;
}
}
我们的消息总线提供以下功能
public interface IMessageBus
{
void Publish<T>(T message) where T : IMessage;
ISubscription<T> Subscribe<T>(Action<T> actionCallback) where T : IMessage;
bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage;
void ClearAllSubscriptions();
}
其实现如下所述
public class MessageBus : IMessageBus
{
private readonly Dictionary<Type, List<object>> _observers
= new Dictionary<Type, List<object>>();
}
此处的 _observers
具有 IMessage
类型的键,并且关联的值是订阅者的操作处理程序方法列表,这些方法在收到消息后会响应。
Subscribe
方法的实现如下
public ISubscription<T> Subscribe<T>(Action<T> callback) where T : IMessage
{
ISubscription<T> subscription = null;
Type messageType = typeof(T);
var subscriptions = _observers.ContainsKey(messageType) ?
_observers[messageType] : new List<object<();
if (!subscriptions
.Select(s => s as ISubscription<T>)
.Any(s => s.ActionHandler == callback))
{
subscription = new Subscription<T>(callback);
subscriptions.Add(subscription);
}
_observers[messageType] = subscriptions;
return subscription;
}
然后是 Publish
消息
public void Publish<T>(T message) where T : IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
var subscriptions = _observers[messageType];
if (subscriptions == null || subscriptions.Count == 0) return;
foreach (var handler in subscriptions
.Select(s => s as ISubscription<T>)
.Select(s => s.ActionHandler))
{
handler?.Invoke(message);
}
}
}
UnSubscribe
的实现如下
public bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage
{
bool removed = false;
if (subscription == null) return false;
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
removed = _observers[messageType].Remove(subscription);
if (_observers[messageType].Count == 0)
_observers.Remove(messageType);
}
return removed;
}
最后,我们可以通过清除 _observers 来删除所有订阅者:_observers.Clear();
。
如何使用
首先定义消息类型,例如
public class PersonCreatedMessage : IMessage
{
public string Description { get; set; }
public Person Person { get; set; }
}
public class PersonDeletedMessage : IMessage
{
public string Description { get; set; }
}
我们的模型 (Person
) 具有属性 GivenName
。
订阅者基本上设置如下
public class Subscriber1
{
private MessageBus messageBus;
private ISubscription<PersonCreatedMessage> personCreatedSubscription;
public Subscriber1(MessageBus messageBus)
{
this.messageBus = messageBus;
personCreatedSubscription =
this.messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
}
private void OnPersonCreated(PersonCreatedMessage message)
{
// Access message.Description, message.Person.GivenName,... here
}
private void Unsubscribe()
{
this.messageBus.UnSubscribe(personCreatedSubscription);
}
}
以及发布者
public class Publisher
{
private MessageBus messageBus;
public Publisher(MessageBus messageBus)
{
this.messageBus = messageBus;
}
public void CreatePerson()
{
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
Person = new Person { GivenName = "John" },
Description = "[Demo 1] A new person has been created."
});
}
}
我们可以这样开始
MessageBus messageBus = new MessageBus();
Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);
publisher.CreatePerson();
实现 #2:通过弱引用进行消息传递
当系统变得如此庞大,并且消息传递系统变得如此复杂,以至于有许多发布者和订阅者时,记住取消订阅每个订阅很不方便。 就我个人而言,我想明确地这样做。 但是,这是我使用上述操作委托的弱引用版本的实现。 目的是让订阅者在没有对其的强引用时,有资格被垃圾回收。
在此实现中,我们将操作处理程序包装在 WeakSubscription
中
public class WeakSubscription<T> where T : IMessage
{
private WeakReference _weakAction;
public WeakSubscription(Action<T> action)
{
_weakAction = new WeakReference(action);
}
public bool IsAlive
{
get { return _weakAction.IsAlive; }
}
public object Target
{
get { return _weakAction.Target; }
}
public void OnMessageReceived(T message)
{
var action = _weakAction.Target as Action<T>;
action?.Invoke(message);
}
}
由于使用了弱引用,因此我们不会在此处实现 UnSubscribe
方法。 因此,我们的 MessageBus
具有 Publish
和 Subscribe
作为其两个主要功能
public class MessageBus
{
private readonly Dictionary<Type, List<object>> _observers
= new Dictionary<Type, List<object>>();
}
public void Subscribe<T>(Action<T> callback) where T : IMessage
{
Type messageType = typeof(T);
var subscriptions = _observers.ContainsKey(messageType) ?
_observers[messageType] : new List<object>();
if (!subscriptions
.Select(s => s as WeakSubscription<T>)
.Any(s => s.Target == new WeakReference(callback).Target))
subscriptions.Add(new WeakSubscription<T>(callback));
_observers[messageType] = subscriptions;
var deadSubscriptionsRemoved = CleanupSubscriptions<T>();
}
public void Publish<T>(T message) where T : IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
Type messageType = typeof(T);
if (_observers.ContainsKey(messageType))
{
var subscriptions = _observers[messageType];
List<WeakSubscription<T>> deadSubscriptions = new List<WeakSubscription<T>>();
foreach (var subscription in subscriptions
.Select(s => s as WeakSubscription<T>))
{
if (subscription.IsAlive)
subscription?.OnMessageReceived(message);
else
deadSubscriptions.Add(subscription);
subscriptions.RemoveAll(s => deadSubscriptions.Contains(s));
if (subscriptions.Count == 0)
_observers.Remove(messageType);
}
}
}
最后,我们可以像这样清除所有订阅
private int CleanupSubscriptions<T>() where T : IMessage
{
return _observers[typeof(T)].RemoveAll((s) =>
{
WeakSubscription<T> subRef = s as WeakSubscription<T>;
if (subRef != null)
return !subRef.IsAlive;
return true;
});
}
如何使用
与实现 #1 相同,只是它更简单! 无需跟踪订阅者以取消订阅。
实现 #3:通过接口进行消息传递
这是 Glenn Block 在 PluralSight 上对事件聚合器模式的实现的修改,该实现没有使用委托,而是使用接口作为接收消息时响应的方式。 每个订阅者可以实现尽可能多的不同类型消息的 ISubscribe
接口,以此来订阅这些消息。 该接口仅声明一个操作方法,供感兴趣的订阅者在收到消息后响应
public interface ISubscribe<T> where T: IMessage
{
void OnMessageReceived(T message);
}
MessageBus
被设计为具有以下功能
public interface IMessageBus
{
void Publish<T>(T message) where T : IMessage;
// Returns number of messages that this subscriber wants to listen to.
int Subscribe(object subscriber, bool reSubscribable = false);
// Returns number of messages that this subscriber has topped listening to.
int UnSubscribe(object subscriber);
// Return if successfully un-subscribed all subscribers from specified kind of message T.
bool UnSubscribeTo<T>() where T:IMessage;
}
其实现如下
public class MessageBus : IMessageBus
{
private readonly Dictionary<Type,
List<WeakReference>> _observers = new Dictionary<Type, List<WeakReference>>();
...
private IEnumerable<Type> GetSubscriberTypes(object subscriber)
{
// Returns: ISubscribe<T1>, ISubscribe<T2>, etc.
// where Ti is message type to be sent/received.
// In other words, the kinds of message that THIS subscriber is subscribed to.
// That is the interface(s) that this subscriber class implements.
// For example: Subscriber: ISubscribe<T1>, ISubscribe<T2>, etc.
return subscriber
.GetType()
.GetInterfaces()
.Where(i => i.IsGenericType &&
i.GetGenericTypeDefinition() == typeof(ISubscribe<>));
}
}
public int Subscribe(object subscriber, bool reSubscribable = false)
{
if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));
WeakReference subscriberRef = new WeakReference(subscriber);
var subscriberTypes = GetSubscriberTypes(subscriber);
foreach (var subscriberType in subscriberTypes)
{
if (_observers.ContainsKey(subscriberType))
{
_observers[subscriberType].RemoveAll(s => !s.IsAlive);
if (!_observers[subscriberType].Any
(s => s.Target == subscriberRef.Target) || reSubscribable)
_observers[subscriberType].Add(subscriberRef);
}
else
_observers.Add(subscriberType, new List<WeakReference> { subscriberRef });
}
return subscriberTypes.ToList().Count;
}
然后是 Publish
方法
public void Publish<T>(T message) where T: IMessage
{
if (message == null) throw new ArgumentNullException(nameof(message));
var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>
if (_observers.ContainsKey(subscriberType))
{
List<WeakReference> subscriberRefs = _observers[subscriberType];
List<WeakReference> deadSubscriberRefs=new List<WeakReference>();
foreach (var subscriberRef in subscriberRefs)
{
if (subscriberRef.IsAlive)
{
var subscriber = subscriberRef.Target as ISubscribe<T>;
subscriber?.OnMessageReceived(message);
}
else
deadSubscriberRefs.Add(subscriberRef); // Remove this reference
}
subscriberRefs.RemoveAll(s => deadSubscriberRefs.Contains(s));
if (subscriberRefs.Count == 0)
_observers.Remove(subscriberType);
}
}
我们可以取消订阅
某种消息,或一个对象订阅者
public int UnSubscribe(object subscriber)
{
if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));
var subscriberRef = new WeakReference(subscriber);
var subscriberTypes = GetSubscriberTypes(subscriber);
var emptyKeys = new List<Type>();
int unSubscribedTypeCount = 0;
foreach (var subscriberType in subscriberTypes)
{
if (_observers.ContainsKey(subscriberType))
{
List<WeakReference> subscriberRefs = _observers[subscriberType];
unSubscribedTypeCount += subscriberRefs.RemoveAll(s => s.Target == subscriber);
if (subscriberRefs.Count == 0)
emptyKeys.Add(subscriberType);
}
}
foreach (var key in emptyKeys)
_observers.Remove(key);
return unSubscribedTypeCount;
}
public bool UnSubscribeTo<T>() where T: IMessage
{
var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>
if (_observers.ContainsKey(subscriberType))
return _observers.Remove(subscriberType);
else
throw new KeyNotFoundException(subscriberType.ToString());
}
如何使用
订阅者需要通过接口订阅他们想要接收的消息类型,这些接口承诺在收到消息时执行的操作
public class Subscriber1 : ISubscribe<PersonCreatedMessage>, ISubscribe<PersonDeletedMessage>
{
private MessageBus messageBus;
public Subscriber1(MessageBus messageBus)
{
this.messageBus = messageBus;
var subscription = this.messageBus.Subscribe(this);
}
public void OnMessageReceived(PersonCreatedMessage message)
{
// Access message.Description, message.Person.GivenName,... here
}
public void OnMessageReceived(PersonDeletedMessage message)
{
// Access message properties here
}
public int UnSubscribe()
{
return this.messageBus.UnSubscribe(this);
}
...
// Un-subscribe to a certain kind of message as followed:
messageBus.UnSubscribeTo<PersonCreatedMessage<Person>>();
}
发布者与实现 #1 和 #2 非常相似
public class Publisher
{
private MessageBus messageBus;
public Publisher(MessageBus messageBus)
{
this.messageBus = messageBus;
}
public void CreatePerson()
{
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
Person = new Person { GivenName = "Mike" },
Description = "[Demo 3] A new person has been created."
});
}
public void DeletePerson()
{
this.messageBus.Publish<PersonDeletedMessage>(new PersonDeletedMessage
{
Description = "Person has been deleted."
});
}
}
这是开始
MessageBus messageBus = new MessageBus();
Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);
publisher.CreatePerson();
publisher.DeletePerson();
历史
- 2019 年 9 月 16 日:将 IMessage 应用于所有适用的消息类型
- 2019 年 9 月 4 日:初始版本