65.9K
CodeProject 正在变化。 阅读更多。
Home

SW 消息总线

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.74/5 (21投票s)

2014 年 2 月 10 日

CPOL

3分钟阅读

viewsIcon

60507

downloadIcon

875

SW 消息总线代表复杂的多线程软件系统中消息流的抽象机制。

引言

SW 消息总线代表复杂的多线程软件系统中消息流的抽象机制。

之所以选择“总线”这个术语,是因为其主要思想与硬件数据总线非常相似。一旦硬件单元放入数据,另一个对此数据感兴趣的硬件单元就会从总线上获取它。

在 SW 消息总线的情况下,其工作方式相同。一个 SW 模块将消息放到总线上,另一个对此消息感兴趣的模块从总线上获取它。发布者(放置消息的人)和订阅者(获取消息的人)之间的绑定仅通过消息类型完成。无需硬编码,无需运行时,无需订阅者和发布者之间的配置注册。任何发布者和任何订阅者之间唯一共享的信息是消息的类型。只有类型。没有其他任何东西。

另一个巨大的优势是从发布者订阅者的整个过程都进行了强类型检查。没有类型转换。绝对没有。

想法

此基础设施背后的主要思想是为任何已发布的消息使用运行时类型唯一标识,并调用订阅了此消息类型的任何订阅者。在 C++ 中,可以使用来自 typeinfotypeid 来完成,但我更喜欢更简单、更有效的方法,该方法不涉及 RTTI。

using TypeId = uintptr_t; 

template < typename T >
static TypeId GetTypeId()
{
  static uint32_t placeHolder;
  return (reinterpret_cast<TypeId>(&placeHolder));
} 

一旦我们知道如何获取消息类型的唯一标识 - 其余的就很简单了。

发布者将消息放到总线上,使用消息的唯一 ID,我们知道(在内部消息总线存储库中)找到所有注册到此消息类型的订阅者。

就是这样。

订阅者

谁可以订阅消息?

任何可调用目标。这可以是函数、lambda 表达式、仿函数或绑定表达式。订阅者类型定义为

template < typename MSG_TYPE > using Subscriber = std::function<void(MSG_TYPE)>;  

出版社

谁可以发布消息?

所有人。没有限制。可以发布任何消息类型的消息。如果没有人订阅某个类型,则没有人会收到此消息,但在任何情况下都可以发布消息。

消息总线 API

消息总线的 API 非常简单

template < int BUS_ID = 0 > class MsgBus
{
public:

  /*!***************************************************************************
  * @brief   Subscribe for receiving messages of the specific Message Type.
  *
  * @tparam  MSG_TYPE     Type for which new subscriber will be added.
  *
  * @param   subscriber   Callable target.
  *
  * @return  Handle associated with a registered subscriber. Use IsValidHandle()
  *          for operation success checking.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber );

  /*!***************************************************************************
  * @brief   UnSubscribe from receiving messages of the specific Message Type.
  *
  * @param   handle      Subscriber handle.
  *
  *****************************************************************************/

  static void UnSubscribe( SubscriberHandle& handle );

  /*!***************************************************************************
  * @brief   Publish message by blocking call. The method will return only
  *          when all subscribers  will receive published message.
  *
  * @tparam  MSG_TYPE    Message type - optional, will be deducted by compiler.
  *
  * @param   msg         Message to be published.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static void PublishBlocking( const MSG_TYPE& msg );

  /*!***************************************************************************
  * @brief   Publish message by asynchronous call. The method will return
  *          immediately, the message will be delivered asynchronously.
  *
  * @tparam  MSG_TYPE    Message type - optional, will be deducted by compiler.
  *
  * @param   msg         Message to be published.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  static void PublishAsync( const MSG_TYPE& msg );

  /*!***************************************************************************
  * @brief   Check Subscriber handle validity.
  *
  * @param   handle      Subscriber handle.
  *
  * @return  true - valid handle, false else.
  *
  *****************************************************************************/

  static bool IsValidHandle( SubscriberHandle& handle );

private:

 static MsgBusRepository msgBusRepository;

private:

  /// Instantiation, coping, moving and deleting of MsgBus class is prohibited.

  MsgBus() = delete;
  ~MsgBus() = delete;
  MsgBus( MsgBus& ) = delete;
  MsgBus( MsgBus&& ) = delete;
  MsgBus& operator= ( MsgBus& ) = delete;
  MsgBus& operator= ( MsgBus&& ) = delete;
};

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
SubscriberHandle MsgBus< BUS_ID >::Subscribe( Subscriber< MSG_TYPE > subscriber)
{
  return msgBusRepository.Subscribe< MSG_TYPE >( subscriber );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
void MsgBus< BUS_ID >::UnSubscribe( SubscriberHandle& handle )
{
  msgBusRepository.UnSubscribe( handle );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishBlocking( const MSG_TYPE& msg )
{
  msgBusRepository.Publish( msg );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
template < typename MSG_TYPE >
void MsgBus< BUS_ID >::PublishAsync( const MSG_TYPE& msg )
{
  std::async( std::launch::async,
              MsgBus< BUS_ID >::PublishBlocking< MSG_TYPE >,
              msg
             );
}

////////////////////////////////////////////////////////////////////////////////

template < int BUS_ID >
bool MsgBus< BUS_ID >::IsValidHandle( SubscriberHandle& handle )
{
  return handle.IsValid();
}

////////////////////////////////////////////////////////////////////////////////

template < int MSG_BUS_NUM >
MsgBusRepository MsgBus< MSG_BUS_NUM >::msgBusRepository;

示例

以下是消息总线使用的简单示例。让我们定义消息类型和一些订阅者

using namespace std;

struct MSG_TYPE_1
{
    int i;
};

void RegularFunctionSubscriber( MSG_TYPE_1 msg )
{
    cout<< "FunctionSubscriber " << msg.i << endl;
}

class FunctorSubscriber
{
public:
    void operator()( MSG_TYPE_1 msg ) 
    { cout<< "FunctorSubscriber " << msg.i << endl; }
}; 

现在我们准备好使用我们的消息总线了

MSG_TYPE_1 msg1 = { 10 };

FunctorSubscriber functorSubscriber;

// Regular Function Subscriber
SubscriberHandle handle1 = MsgBus<>::Subscribe< MSG_TYPE_1 >( RegularFunctionSubscriber );

// Functor Subscriber
SubscriberHandle handle2 = MsgBus<>::Subscribe< MSG_TYPE_1 >( functorSubscriber );

// Lambda Function Subscriber
SubscriberHandle handle3 = MsgBus<>::Subscribe< MSG_TYPE_1 >( [](MSG_TYPE_1 msg) 
{ cout<< "Lambda Subscriber " << msg.i << endl; } );

MsgBus<>::PublishBlocking( msg1 );

MsgBus<>::PublishAsync( msg1 );

MsgBus<>::UnSubscribe( handle1 );

MsgBus<>::UnSubscribe( handle2 );

MsgBus<>::UnSubscribe( handle3 );

实现

正如我上面描述的,消息总线将所有注册的订阅者保存在内部存储库中,并且知道在发布消息时根据消息类型调用它们。

MsgBus 接收的非类型模板参数(默认为零)使我们能够创建多个消息总线。

消息总线存储库实现为 map,其中索引是消息 ID,内容是另一个 map,其中包含特定消息类型的所有订阅者。

/*!*****************************************************************************
* @file   MsgBusRepository.h
*
* @brief  Repository of all callable targets for specific bus.
*
* @author Evgeny Zavalkovsky
*
* @date   February 2014
*******************************************************************************/

#ifndef MSGBUSREPOSITORY_H_
#define MSGBUSREPOSITORY_H_

#include <map>

#include "../API/MessageBusDefs.h"
#include "Infra/TypeId.h"
#include "Infra/SharedMutex.h"
#include "Infra/SubscriberHandle.h"
#include "MsgTypeContainer.h"

/*!*****************************************************************************
* @class MsgBusRepository
*
* @brief Repository of all callable targets for specific bus.
*
*******************************************************************************/

class MsgBusRepository
{
public:

  /*!***************************************************************************
  *
  * @brief Constructor.
  *
  *****************************************************************************/

  MsgBusRepository()  : operational( true ) {}

  /*!***************************************************************************
  *
  * @brief Destructor.
  *
  *****************************************************************************/
  ~MsgBusRepository()
  {
    mutex.LockExclusive();

    for (auto iter: repositoryMap )
    {
      delete iter.second;
    }

    operational = false;

    mutex.UnlockExclusive();
  }

  /*!***************************************************************************
  *
  * @brief Subscribe.
  *        Add new Subscriber to the repository.
  *
  *****************************************************************************/

  template < typename MSG_TYPE >
  SubscriberHandle Subscribe( Subscriber< MSG_TYPE > subscriber )
  {
    TypeId typeId = GetTypeId< MSG_TYPE >();

    mutex.LockExclusive();

    SubscriberHandleTyped< MSG_TYPE > handle;

    if ( operational )
    {
      auto ret = repositoryMap.insert(
                              MsgBusRepositoryMapPair( typeId, nullptr ) );

      /// Check if this is the first subscriber for the MSG_TYPE.
      if ( ret.second == true )
      {
        ret.first->second = new MsgTypeContainer< MSG_TYPE >;
      }

      MsgTypeContainer< MSG_TYPE >*
      container = static_cast<MsgTypeContainer< MSG_TYPE >*>(ret.first->second);

      /// Add subscriber to the container.
      container->Add( handle, subscriber);
    }

    else
    {
      handle.SetInvalid();
    }

    mutex.UnlockExclusive();

    return handle;
  }

  /*!***************************************************************************
  *
  * @brief UnSubscribe.
  *        Remove subscriber from repository.
  *
  *****************************************************************************/

  void UnSubscribe( SubscriberHandle& handle )
  {
    mutex.LockExclusive();

    if( operational && handle.IsValid() )
    {
      TypeId typeId = handle.GetTypeid();

      auto iter = repositoryMap.find( typeId );

      if ( iter != repositoryMap.end() )
      {
        MsgTypeContainerBase* container =iter->second;

        container->Remove( handle );

        /// Check if this is the last subscriber in the container
        if( container->Empty() )
        {
          repositoryMap.erase( iter );

          delete container;
        }
      }
    }

    handle.SetInvalid();

    mutex.UnlockExclusive();
  }

  /*!***************************************************************************
  *
  * @brief Publish.
  *        Publish message for all subscribers for MSG_TYPE.
  *
  *****************************************************************************/

  template < typename MSG_TYPE > void Publish( const MSG_TYPE& msg )
  {
    TypeId typeId = GetTypeId< MSG_TYPE >();

    mutex.LockShared();

    if( operational )
    {
      auto iter = repositoryMap.find( typeId );

      if ( iter != repositoryMap.end() )
      {
        MsgTypeContainer< MSG_TYPE >*
        container = static_cast< MsgTypeContainer< MSG_TYPE >* >(iter->second);

        container->Publish( msg );
      }
    }

    mutex.UnlockShared();
  }

  /// Disable coping and moving.

  MsgBusRepository( MsgBusRepository& )  = delete;
  MsgBusRepository( MsgBusRepository&& ) = delete;
  MsgBusRepository& operator= ( MsgBusRepository& ) = delete;
  MsgBusRepository& operator= ( MsgBusRepository&& ) = delete;

private:

  using MsgBusRepositoryMap     = std::map< TypeId, MsgTypeContainerBase* >;
  using MsgBusRepositoryMapPair = std::pair< TypeId, MsgTypeContainerBase* >;

  bool operational;
  MsgBusRepositoryMap repositoryMap;

  /// Multiple Readers - Single Writer Lock.
  SharedMutex         mutex;
};

#endif /* MSGBUSREPOSITORY_H_ */

最后一个代码片段是每个特定消息类型的订阅者容器的实现。

/*!*****************************************************************************
* @file   MsgTypeContainer.h
*
* @brief  Holds all callable targets of the specific MSG_TYPE.
*
* @author Evgeny Zavalkovsky
*
* @date   February 2014
*******************************************************************************/

#ifndef MSGTYPECONTAINER_H_
#define MSGTYPECONTAINER_H_

#include <map>

#include "../API/MessageBusDefs.h"
#include "Infra/SubscriberHandle.h"

/*!*****************************************************************************
* @class MsgTypeContainerBase
*
* @brief Non template base of MsgTypeContainer class
*        Required for omitting template parameter dependency
*        in MsgTypeContainer class
*
*******************************************************************************/

class MsgTypeContainerBase
{
public:

  MsgTypeContainerBase() = default;
  virtual ~MsgTypeContainerBase() = default;
  MsgTypeContainerBase( MsgTypeContainerBase& ) = delete;
  MsgTypeContainerBase( MsgTypeContainerBase&& ) = delete;
  MsgTypeContainerBase& operator= ( MsgTypeContainerBase& ) = delete;
  MsgTypeContainerBase& operator= ( MsgTypeContainerBase&& ) = delete;

  virtual void Remove( SubscriberHandle handle ) = 0;
  virtual bool Empty() = 0;
};

/*!*****************************************************************************
* @class MsgTypeContainer
*
* @brief Holds all callable targets of the specific MSG_TYPE
*
*******************************************************************************/

template < typename MSG_TYPE >
class MsgTypeContainer : public MsgTypeContainerBase
{
public:

  /*!***************************************************************************
  *
  * @brief Add.
  *        Add new callable target.
  *
  *****************************************************************************/

  void Add( SubscriberHandle handle, Subscriber< MSG_TYPE > subscriber  )
  {
    containerMap.insert( MsgBusContainerMapPair( handle, subscriber ) );
  }

  /*!***************************************************************************
  *
  * @brief Remove.
  *        Remove callable target.
  *
  *****************************************************************************/

  void Remove( SubscriberHandle handle )
  {
    containerMap.erase( handle );
  }

  /*!***************************************************************************
  *
  * @brief Empty.
  *        Check if container is empty.
  *
  *****************************************************************************/

  bool Empty()
  {
    return containerMap.empty();
  }

  /*!***************************************************************************
  *
  * @brief Publish.
  *        Publish message to all targets in conatiner.
  *
  *****************************************************************************/
  void Publish( const MSG_TYPE& msg )
  {
    for (auto& iter: containerMap )
    {
      iter.second( msg );
    }
  }

  /// Default Constructor and Destructor
  //  Deleted Move and Copy Constructors and Assign Operators

  MsgTypeContainer() = default;
  virtual ~MsgTypeContainer() noexcept = default;
  MsgTypeContainer( MsgTypeContainer& ) = delete;
  MsgTypeContainer( MsgTypeContainer&& ) = delete;
  MsgTypeContainer& operator= ( MsgTypeContainer& ) = delete;
  MsgTypeContainer& operator= ( MsgTypeContainer&& ) = delete;

private:

  using MsgBusContainerMap = std::map< SubscriberHandle,
                                       Subscriber< MSG_TYPE >
                                     >;

  using MsgBusContainerMapPair = std::pair< SubscriberHandle,
                                            Subscriber< MSG_TYPE >
                                          >;

  MsgBusContainerMap containerMap;
};

#endif /* MSGTYPECONTAINER_H_ */

隐藏的细节

在本文中,我没有概述两个通用基础设施实现

  • 计数信号量,由于某些原因它不是 C++11 的一部分
  • 多个读者 - 单个写者信号量,计划作为 std::shared_lock 出现在 C++14 中

有关详细信息,请查看源代码。

操作系统依赖性

消息总线是绝对与操作系统无关的。

文档

消息总线具有完整的 DoxyGen 生成的文档。

请查看Documentation.html

编译器支持

消息总线已使用 gcc 4.8.1VS2013 编译器进行测试。

对于 gcc 4.8.1 编译器:必须添加编译器标志 -std=c++11 和链接器标志 -pthread

© . All rights reserved.