65.9K
CodeProject 正在变化。 阅读更多。
Home

消息总线/代理的几种实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.18/5 (5投票s)

2019年9月4日

CPOL

3分钟阅读

viewsIcon

16845

downloadIcon

281

重温 C#.NET 中的事件聚合器/消息总线/代理

引言

当项目开始增长时,始终变得越来越重要的一种需求是组件之间的通信方式。 事件聚合器(或消息总线/代理)用于此目的。

几年前,我一直在使用 .NET 事件和 Prism 的事件聚合器,如我的文章所示。 对于委托 EventHandler<TEventArgs>,它最适合小型项目(由于发布者保持对订阅者处理程序的引用而导致的紧密耦合)。 对于 Prism 的事件聚合器,虽然它很棒,但我希望实现自己的轻量级聚合器,以满足我自己的需求。

虽然有许多基于不同方法/关注点/复杂性的不同实现,但我想介绍几个极简的实现,暂时忽略线程处理。

实现

假设我们有 3 个组件:PublisherSubscriber1Subscriber2。 这三个组件正在使用 Person 对象。 发布者将在每次创建/修改/删除新人员时发布消息,并且两个订阅者都希望订阅以接收这些消息,从而做出相应的反应。

我们将引入一个“消息传递”服务,该服务被注入到发布者和订阅者中以供他们使用。 事情会像这样运作

// Then on Publisher side: 
this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
{
    // Property setters
});

// Subscribers receive and react:
messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);

private void OnPersonCreated(PersonCreatedMessage message)
{
    // Access to message's properties
}

实现 #1:通过 Action<TMessage> 委托进行消息传递

并非所有类型的消息订阅者都需要接收,最好明确指定我们要订阅的消息类型。 这些消息是 IMessage

public interface IMessage
{
    string Description { get; } // Can remove or add more
}

对于每个订阅者,我们可以以各种形式识别“收据”,例如 GUID、数据库中唯一的递增整数(如 ID),或如下所示的不同方式。 这是为了使订阅者能够取消订阅接收消息。

public interface ISubscription<T> where T : IMessage
{
    Action<T> ActionHandler { get; }
}

public class Subscription<T> : ISubscription<T> where T : IMessage
{
    public Action<T> ActionHandler { get; private set; }

    public Subscription(Action<T> action)
    {
        ActionHandler = action;
    }
}

我们的消息总线提供以下功能

public interface IMessageBus
{
    void Publish<T>(T message) where T : IMessage;

    ISubscription<T> Subscribe<T>(Action<T> actionCallback) where T : IMessage;

    bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage;

    void ClearAllSubscriptions();
}

其实现如下所述

public class MessageBus : IMessageBus
{
    private readonly Dictionary<Type, List<object>> _observers
        = new Dictionary<Type, List<object>>();
}

此处的 _observers 具有 IMessage 类型的键,并且关联的值是订阅者的操作处理程序方法列表,这些方法在收到消息后会响应。

Subscribe 方法的实现如下

public ISubscription<T> Subscribe<T>(Action<T> callback) where T : IMessage
{
    ISubscription<T> subscription = null;

    Type messageType = typeof(T);
    var subscriptions = _observers.ContainsKey(messageType) ?
        _observers[messageType] : new List<object<();

    if (!subscriptions
        .Select(s => s as ISubscription<T>)
        .Any(s => s.ActionHandler == callback))
    {
        subscription = new Subscription<T>(callback);
        subscriptions.Add(subscription);
    }

    _observers[messageType] = subscriptions;

    return subscription;
}

然后是 Publish 消息

public void Publish<T>(T message) where T : IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        var subscriptions = _observers[messageType];
        if (subscriptions == null || subscriptions.Count == 0) return;
        foreach (var handler in subscriptions
            .Select(s => s as ISubscription<T>)
            .Select(s => s.ActionHandler))
        {
            handler?.Invoke(message);
        }
    }
}

UnSubscribe 的实现如下

public bool UnSubscribe<T>(ISubscription<T> subscription) where T : IMessage
{
    bool removed = false;

    if (subscription == null) return false;

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        removed = _observers[messageType].Remove(subscription);

        if (_observers[messageType].Count == 0)
            _observers.Remove(messageType);
    }
    return removed;
}

最后,我们可以通过清除 _observers 来删除所有订阅者:_observers.Clear();

如何使用

首先定义消息类型,例如

public class PersonCreatedMessage : IMessage
{
    public string Description { get; set; }

    public Person Person { get; set; }
}

public class PersonDeletedMessage : IMessage
{
    public string Description { get; set; }
}

我们的模型 (Person) 具有属性 GivenName

订阅者基本上设置如下

public class Subscriber1
{
    private MessageBus messageBus;
    private ISubscription<PersonCreatedMessage> personCreatedSubscription;

    public Subscriber1(MessageBus messageBus)
    {
        this.messageBus = messageBus;

        personCreatedSubscription = 
              this.messageBus.Subscribe<PersonCreatedMessage>(OnPersonCreated);
    }

    private void OnPersonCreated(PersonCreatedMessage message)
    {
        // Access message.Description, message.Person.GivenName,... here
    }

    private void Unsubscribe()
    {
        this.messageBus.UnSubscribe(personCreatedSubscription);
    }
}

以及发布者

public class Publisher
{
    private MessageBus messageBus;

    public Publisher(MessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public void CreatePerson()
    {
        this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
        {
            Person = new Person { GivenName = "John" },
            Description = "[Demo 1] A new person has been created."
        });
    }
}

我们可以这样开始

MessageBus messageBus = new MessageBus();

Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);

publisher.CreatePerson();

实现 #2:通过弱引用进行消息传递

当系统变得如此庞大,并且消息传递系统变得如此复杂,以至于有许多发布者和订阅者时,记住取消订阅每个订阅很不方便。 就我个人而言,我想明确地这样做。 但是,这是我使用上述操作委托的弱引用版本的实现。 目的是让订阅者在没有对其的强引用时,有资格被垃圾回收。

在此实现中,我们将操作处理程序包装在 WeakSubscription

public class WeakSubscription<T> where T : IMessage
{
    private WeakReference _weakAction;

    public WeakSubscription(Action<T> action)
    {
        _weakAction = new WeakReference(action);
    }

    public bool IsAlive
    {
        get { return _weakAction.IsAlive; }
    }

    public object Target
    {
        get { return _weakAction.Target; }
    }

    public void OnMessageReceived(T message)
    {
        var action = _weakAction.Target as Action<T>;
        action?.Invoke(message);
    }
}

由于使用了弱引用,因此我们不会在此处实现 UnSubscribe 方法。 因此,我们的 MessageBus 具有 PublishSubscribe 作为其两个主要功能

public class MessageBus
{
    private readonly Dictionary<Type, List<object>> _observers
            = new Dictionary<Type, List<object>>();
}
public void Subscribe<T>(Action<T> callback) where T : IMessage
{
    Type messageType = typeof(T);

    var subscriptions = _observers.ContainsKey(messageType) ?
        _observers[messageType] : new List<object>();

    if (!subscriptions
        .Select(s => s as WeakSubscription<T>)
        .Any(s => s.Target == new WeakReference(callback).Target))
        subscriptions.Add(new WeakSubscription<T>(callback));

    _observers[messageType] = subscriptions;

    var deadSubscriptionsRemoved = CleanupSubscriptions<T>();
}
public void Publish<T>(T message) where T : IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    Type messageType = typeof(T);
    if (_observers.ContainsKey(messageType))
    {
        var subscriptions = _observers[messageType];
        List<WeakSubscription<T>> deadSubscriptions = new List<WeakSubscription<T>>();

        foreach (var subscription in subscriptions
            .Select(s => s as WeakSubscription<T>))
        {
            if (subscription.IsAlive)
                subscription?.OnMessageReceived(message);
            else
                deadSubscriptions.Add(subscription);

            subscriptions.RemoveAll(s => deadSubscriptions.Contains(s));
            if (subscriptions.Count == 0)
                _observers.Remove(messageType);
        }
    }
}

最后,我们可以像这样清除所有订阅

private int CleanupSubscriptions<T>() where T : IMessage
{
    return _observers[typeof(T)].RemoveAll((s) =>
    {
        WeakSubscription<T> subRef = s as WeakSubscription<T>;
        if (subRef != null)
            return !subRef.IsAlive;
        return true;
    });
}

如何使用

与实现 #1 相同,只是它更简单! 无需跟踪订阅者以取消订阅。

实现 #3:通过接口进行消息传递

这是 Glenn Block 在 PluralSight 上对事件聚合器模式的实现的修改,该实现没有使用委托,而是使用接口作为接收消息时响应的方式。 每个订阅者可以实现尽可能多的不同类型消息的 ISubscribe 接口,以此来订阅这些消息。 该接口仅声明一个操作方法,供感兴趣的订阅者在收到消息后响应

public interface ISubscribe<T> where T: IMessage
    {
        void OnMessageReceived(T message);
    }

MessageBus 被设计为具有以下功能

public interface IMessageBus
{
    void Publish<T>(T message) where T : IMessage;

    // Returns number of messages that this subscriber wants to listen to.
    int Subscribe(object subscriber, bool reSubscribable = false);

    // Returns number of messages that this subscriber has topped listening to.
    int UnSubscribe(object subscriber);

    // Return if successfully un-subscribed all subscribers from specified kind of message T.
    bool UnSubscribeTo<T>() where T:IMessage;
}

其实现如下

public class MessageBus : IMessageBus 
{
    private readonly Dictionary<Type, 
        List<WeakReference>> _observers = new Dictionary<Type, List<WeakReference>>();

    ...

    private IEnumerable<Type> GetSubscriberTypes(object subscriber)
    {
        // Returns: ISubscribe<T1>, ISubscribe<T2>, etc. 
        // where Ti is message type to be sent/received.
        // In other words, the kinds of message that THIS subscriber is subscribed to.
        // That is the interface(s) that this subscriber class implements.
        // For example: Subscriber: ISubscribe<T1>, ISubscribe<T2>, etc.
        return subscriber
            .GetType()
            .GetInterfaces()
            .Where(i => i.IsGenericType && 
                   i.GetGenericTypeDefinition() == typeof(ISubscribe<>));
    }
}
public int Subscribe(object subscriber, bool reSubscribable = false)
{
    if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));

    WeakReference subscriberRef = new WeakReference(subscriber);
    var subscriberTypes = GetSubscriberTypes(subscriber);

    foreach (var subscriberType in subscriberTypes)
    {
        if (_observers.ContainsKey(subscriberType))
        {
            _observers[subscriberType].RemoveAll(s => !s.IsAlive);

            if (!_observers[subscriberType].Any
               (s => s.Target == subscriberRef.Target) || reSubscribable)
                _observers[subscriberType].Add(subscriberRef);
        }
        else
            _observers.Add(subscriberType, new List<WeakReference> { subscriberRef });
    }

    return subscriberTypes.ToList().Count;
}

然后是 Publish 方法

public void Publish<T>(T message) where T: IMessage
{
    if (message == null) throw new ArgumentNullException(nameof(message));

    var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>

    if (_observers.ContainsKey(subscriberType))
    {
        List<WeakReference> subscriberRefs = _observers[subscriberType];
        List<WeakReference> deadSubscriberRefs=new List<WeakReference>();

        foreach (var subscriberRef in subscriberRefs)
        {
            if (subscriberRef.IsAlive)
            {
                var subscriber = subscriberRef.Target as ISubscribe<T>;
                subscriber?.OnMessageReceived(message);
            }
            else
                deadSubscriberRefs.Add(subscriberRef); // Remove this reference
        }

        subscriberRefs.RemoveAll(s => deadSubscriberRefs.Contains(s));
        if (subscriberRefs.Count == 0)
            _observers.Remove(subscriberType);
    }
}

我们可以取消订阅某种消息,或一个对象订阅者

public int UnSubscribe(object subscriber)
{
    if (subscriber == null) throw new ArgumentNullException(nameof(subscriber));

    var subscriberRef = new WeakReference(subscriber);
    var subscriberTypes = GetSubscriberTypes(subscriber);
    var emptyKeys = new List<Type>();

    int unSubscribedTypeCount = 0;

    foreach (var subscriberType in subscriberTypes)
    {
        if (_observers.ContainsKey(subscriberType))
        {
            List<WeakReference> subscriberRefs = _observers[subscriberType];
            unSubscribedTypeCount += subscriberRefs.RemoveAll(s => s.Target == subscriber);

            if (subscriberRefs.Count == 0)
                emptyKeys.Add(subscriberType);
        }
    }

    foreach (var key in emptyKeys)
        _observers.Remove(key);

    return unSubscribedTypeCount;
}
public bool UnSubscribeTo<T>() where T: IMessage
{
    var subscriberType = typeof(ISubscribe<>).MakeGenericType(typeof(T)); // --> ISubscribe<T>
    if (_observers.ContainsKey(subscriberType))
        return _observers.Remove(subscriberType);
    else
        throw new KeyNotFoundException(subscriberType.ToString());
}

如何使用

订阅者需要通过接口订阅他们想要接收的消息类型,这些接口承诺在收到消息时执行的操作

public class Subscriber1 : ISubscribe<PersonCreatedMessage>, ISubscribe<PersonDeletedMessage>
{
    private MessageBus messageBus;

    public Subscriber1(MessageBus messageBus)
    {
        this.messageBus = messageBus;

        var subscription = this.messageBus.Subscribe(this);
    }

    public void OnMessageReceived(PersonCreatedMessage message)
    {
        // Access message.Description, message.Person.GivenName,... here
    }

    public void OnMessageReceived(PersonDeletedMessage message)
    {
        // Access message properties here
    }

    public int UnSubscribe()
    {
        return this.messageBus.UnSubscribe(this);
    }

    ...
    // Un-subscribe to a certain kind of message as followed:
    messageBus.UnSubscribeTo<PersonCreatedMessage<Person>>();
}

发布者与实现 #1 和 #2 非常相似

public class Publisher
{
    private MessageBus messageBus;

    public Publisher(MessageBus messageBus)
    {
        this.messageBus = messageBus;
    }

    public void CreatePerson()
    {
        this.messageBus.Publish<PersonCreatedMessage>(new PersonCreatedMessage
        {
            Person = new Person { GivenName = "Mike" },
            Description = "[Demo 3] A new person has been created."
        });
    }

    public void DeletePerson()
    {
        this.messageBus.Publish<PersonDeletedMessage>(new PersonDeletedMessage
        {
            Description = "Person has been deleted."
        });
    }
}

这是开始

MessageBus messageBus = new MessageBus();

Publisher publisher = new Publisher(messageBus);
Subscriber1 subscriber1 = new Subscriber1(messageBus);
Subscriber2 subscriber2 = new Subscriber2(messageBus);

publisher.CreatePerson();
publisher.DeletePerson();

历史

  • 2019 年 9 月 16 日:将 IMessage 应用于所有适用的消息类型
  • 2019 年 9 月 4 日:初始版本
© . All rights reserved.