65.9K
CodeProject 正在变化。 阅读更多。
Home

带异步多播委托的 C++ 状态机

starIconstarIconstarIconstarIconstarIcon

5.00/5 (12投票s)

2017 年 1 月 11 日

CPOL

10分钟阅读

viewsIcon

33485

downloadIcon

908

结合状态机和异步多播委托的框架。

引言

基于软件的有限状态机(FSM)是一种用于将设计分解为状态和事件的实现方法。没有操作系统的简单嵌入式设备使用单线程,使得状态机运行在单个“线程”上。更复杂的系统使用多线程来分配处理工作。

存在许多 FSM 实现,包括我在这里 Code Project 上写的一篇题为“C++ 状态机设计”的文章。该文章介绍了如何使用 StateMachine 基类创建 C++ 状态机。然而,缺少的是如何在多线程环境中集成多个状态机。

C++ 中的异步多播委托”是我在 Code Project 上写的另一篇文章。该设计提供了一个 C++ 委托库,能够对任何可调用函数进行同步和异步调用。

本文将前面描述的两种技术——状态机和异步多播委托——结合到一个项目中。在之前的文章中,可能不容易通过简单的例子看出多个状态机如何相互协调活动并相互发送事件。本文的目标是提供一个完整的、可工作的项目,其中包含线程、计时器、事件和状态机,它们协同工作。为了说明这个概念,示例项目实现了一个基于状态的自检引擎,利用线程之间的异步通信。

我不会重新解释 StateMachineDelegate<> 的实现,因为之前的文章已经涵盖了这些。主要重点是如何将状态机和委托组合到一个单一的框架中。

本文在概念上类似于我的文章“带线程的 C++ 状态机”。那篇文章使用了 AsyncCallback<> 进行线程间消息传递,而本文则使用了 Delegate<>AsyncCallback<> 是一个简单、紧凑的实现,其回调函数类型和签名有限。AsyncCallback<> 的优点是实现更紧凑、更简单,但代价是只能指向具有单个函数参数的自由函数或静态函数。本文中演示的 Delegate<> 实现允许对任何函数类型(包括成员函数)和任何函数签名进行异步回调,从而简化了与 StateMachine 的集成。

使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。

查看 GitHub 以获取最新源代码

相关的 GitHub 仓库

异步委托回调

如果您不熟悉委托,它的概念非常简单。委托可以被认为是超级函数指针。在 C++ 中,没有哪种指针类型能够指向所有可能的函数变体:实例成员、虚函数、const 函数、静态函数和自由(全局)函数。函数指针无法指向实例成员函数,指向成员函数的指针也有各种限制。然而,委托类可以以类型安全的方式指向任何函数,前提是函数签名匹配。简而言之,委托指向具有匹配签名的任何函数,以支持匿名函数调用。

异步委托更进一步,允许在客户端指定的控制线程上匿名调用任何函数。函数及其所有参数都安全地从目标线程调用,从而简化了线程间通信并消除了跨线程错误。

Delegate<> 框架贯穿始终,用于提供异步回调,形成有效的发布者-订阅者机制。发布者公开委托容器接口,一个或多个订阅者将委托实例添加到容器以接收匿名回调。

首次使用是在 SelfTest 类中,其中 SelfTest::CompletedCallback 委托容器允许订阅者添加委托。每当自检完成时,都会调用 SelfTest::CompletedCallback 回调,通知已注册的客户端。SelfTestEngine 注册到 CentrifugeTestPressureTest,以便在测试完成时获得异步通知。

第二次使用是用户界面注册到 SelfTestEngine::StatusCallback。这允许在另一个线程上运行的客户端注册并接收执行期间的状态回调。MulticastDelegateSafe1<> 允许客户端指定确切的回调线程,从而易于避免跨线程错误。

最后一次使用是在 Timer 类中,该类在已注册的回调函数上触发周期性回调。一个通用的、低速的计时器,能够在一个客户端指定的线程上调用一个函数,对于事件驱动的状态机非常有用,您可以轮询某个条件是否发生。在这种情况下,Timer 类用于将轮询事件注入状态机实例。

自检子系统

自检对硬件和机械系统执行一系列测试,以确保正确运行。在此示例中,有四个状态机类实现了我们的自检子系统,如下图所示的继承图:

图 1:自检子系统继承图

SelfTestEngine

SelfTestEngine 是线程安全的,也是客户端使用自检子系统的主要接触点。CentrifugeTestPressureTestSelfTestEngine 的成员。SelfTestEngine 负责按正确顺序对各个自检进行排序,如下图所示的状态图:

图 2:SelfTestEngine 状态机

Start 事件启动自检引擎。SelfTestEngine::Start() 是一个异步函数,如果调用者不在正确的执行线程上,它会重新调用 Start() 函数。执行一个简单的检查,看调用者是否在期望的控制线程上。如果不是,则在栈上创建一个临时的异步委托并调用它。该委托和调用者的所有原始函数参数都会在堆上复制,然后函数会在 m_thread 上重新调用。这是创建异步 API 的一种优雅方式,只需最少的努力。由于 Start() 是异步的,因此任何在任何线程上运行的客户端都可以安全地调用它。

void SelfTestEngine::Start(const StartData* data)
{
    // Is the caller executing on m_thread?
    if (m_thread.GetThreadId() != WorkerThread::GetCurrentThreadId())
    {
        // Create an asynchronous delegate and reinvoke the function call on m_thread
        Delegate1<const StartData*>& delegate = 
                  MakeDelegate(this, &SelfTestEngine::Start, &m_thread);
        delegate(data);
        return;
    }

    BEGIN_TRANSITION_MAP                                    // - Current State -
        TRANSITION_MAP_ENTRY (ST_START_CENTRIFUGE_TEST)     // ST_IDLE
        TRANSITION_MAP_ENTRY (CANNOT_HAPPEN)                // ST_COMPLETED
        TRANSITION_MAP_ENTRY (CANNOT_HAPPEN)                // ST_FAILED
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)                // ST_START_CENTRIFUGE_TEST
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)                // ST_START_PRESSURE_TEST
    END_TRANSITION_MAP(data)
}

当每个自检完成时,Complete 事件触发,导致下一个自检启动。所有测试完成后,状态机转换到 Completed 并返回到 Idle。如果在执行过程中任何时候生成了 Cancel 事件,就会发生到 Failed 状态的转换。

SelfTest 基类提供了所有 SelfTest 派生状态机共有的三个状态:IdleCompletedFailedSelfTestEngine 然后添加了两个状态:StartCentrifugeTestStartPressureTest

SelfTestEngine 有一个 public 事件函数 Start(),用于启动自检。SelfTestEngine::StatusCallback 是一个异步回调,允许客户端注册以在测试期间获取状态更新。类中还包含一个 WorkerThread 实例。所有自检状态机的执行都发生在此线程上。

class SelfTestEngine : public SelfTest
{
public:
    // Clients register for asynchronous self-test status callbacks
    static MulticastDelegateSafe1<const SelfTestStatus&> StatusCallback;

    // Singleton instance of SelfTestEngine
    static SelfTestEngine& GetInstance();

    // Start the self-tests. This is a thread-safe asychronous function. 
    void Start(const StartData* data);

    WorkerThread& GetThread() { return m_thread; }
    static void InvokeStatusCallback(std::string msg);

private:
    SelfTestEngine();
    void Complete();

    // Sub self-test state machines 
    CentrifugeTest m_centrifugeTest;
    PressureTest m_pressureTest;

    // Worker thread used by all self-tests
    WorkerThread m_thread;

    StartData m_startData;

    // State enumeration order must match the order of state method entries
    // in the state map.
    enum States
    {
        ST_START_CENTRIFUGE_TEST = SelfTest::ST_MAX_STATES,
        ST_START_PRESSURE_TEST,
        ST_MAX_STATES
    };

    // Define the state machine state functions with event data type
    STATE_DECLARE(SelfTestEngine,     StartCentrifugeTest,    StartData)
    STATE_DECLARE(SelfTestEngine,     StartPressureTest,      NoEventData)

    // State map to define state object order. Each state map entry defines a
    // state object.
    BEGIN_STATE_MAP
        STATE_MAP_ENTRY(&Idle)
        STATE_MAP_ENTRY(&Completed)
        STATE_MAP_ENTRY(&Failed)
        STATE_MAP_ENTRY(&StartCentrifugeTest)
        STATE_MAP_ENTRY(&StartPressureTest)
    END_STATE_MAP    
};

如前所述,SelfTestEngine 注册以从每个子自检(例如 CentrifugeTestPressureTest)接收异步回调,如下所示。当子自检状态机完成时,将调用 SelfTestEngine::Complete() 函数。当子自检状态机失败时,将调用 SelfTestEngine::Cancel() 函数。

SelfTestEngine::SelfTestEngine() :
    SelfTest(ST_MAX_STATES),
    m_thread("SelfTestEngine")
{
    // Register for callbacks when sub self-test state machines complete or fail
    m_centrifugeTest.CompletedCallback += 
                 MakeDelegate(this, &SelfTestEngine::Complete, &m_thread);
    m_centrifugeTest.FailedCallback += 
                 MakeDelegate<SelfTest>(this, &SelfTest::Cancel, &m_thread);
    m_pressureTest.CompletedCallback += 
                 MakeDelegate(this, &SelfTestEngine::Complete, &m_thread);
    m_pressureTest.FailedCallback += 
                 MakeDelegate<SelfTest>(this, &SelfTest::Cancel, &m_thread);
}

SelfTest 基类在 CompletedFailed 状态下分别生成 CompletedCallbackFailedCallback,如下所示。

STATE_DEFINE(SelfTest, Completed, NoEventData)
{
    SelfTestEngine::InvokeStatusCallback("SelfTest::ST_Completed");

    if (CompletedCallback)
        CompletedCallback();

    InternalEvent(ST_IDLE);
}

STATE_DEFINE(SelfTest, Failed, NoEventData)
{
    SelfTestEngine::InvokeStatusCallback("SelfTest::ST_Failed");

    if (FailedCallback)
        FailedCallback();

    InternalEvent(ST_IDLE);
}

有人可能会问,为什么状态机使用异步委托回调?如果状态机在同一个线程上,为什么不使用普通、同步的回调呢?要防止的是回调到正在执行的状态机,也就是说,调用栈绕回到了同一个类实例。例如,应防止以下调用序列:SelfTestEngine 调用 CentrifugeTest,然后回调 SelfTestEngine。异步回调允许栈展开并防止这种不希望的行为。

CentrifugeTest

下面显示的 CentrifugeTest 状态机图实现了在“C++ 状态机设计”中描述的离心机自检。CentrifugeTest 通过继承 SelfTest 类的 IdleCompletedFailed 状态来使用状态机继承。这里的区别在于 Timer 类用于通过异步委托回调提供 Poll 事件。

图 3:CentrifugeTest 状态机

定时器

Timer 类提供了一种常见的接收函数回调的机制,通过向 Expired 注册。Start() 以特定间隔启动回调。Stop() 停止回调。

class Timer 
{
public:
    static const DWORD MS_PER_TICK;

    /// Client's register with Expired to get timer callbacks
    SinglecastDelegate0<> Expired;

    /// Constructor
    Timer(void);

    /// Destructor
    ~Timer(void);

    /// Starts a timer for callbacks on the specified timeout interval.
    /// @param[in]    timeout - the timeout in milliseconds.
    void Start(DWORD timeout);

    /// Stops a timer.
    void Stop();
...

所有 Timer 实例都存储在一个 private static 列表中。WorkerThread::Process() 循环通过调用 Timer::ProcessTimers() 定期服务列表中的所有计时器。当计时器过期时,会调用向 Expired 注册的客户端。

     case WM_USER_TIMER:
         Timer::ProcessTimers();
         break;

Win32 和 std::thread 工作线程

源代码提供了两个备用的 WorkerThread 实现。Win32 版本包含在 WorkerThreadWin.cpp/h 中,并依赖于 Windows API。std::thread 版本位于 WorkerThreadStd.cpp/h,并使用 C++11 的线程功能。通过在 DelegateOpt.h 中定义 USE_WIN32_THREADSUSE_STD_THREADS 来选择这两个实现之一。

有关底层线程类实现的更多信息,请参阅 带同步启动的 Win32 线程包装器带消息队列和计时器的 C++ std::thread 事件循环

堆与池

在某些项目中,不希望使用堆来检索动态存储。也许项目是任务关键的,并且由于堆碎片化而导致内存故障的风险是不可接受的。或者,可能认为堆开销和非确定性执行的代价太大。无论哪种情况,项目都包含一个固定块内存分配器,用于将所有内存分配重定向到固定块分配器。通过在 DelegateOpt.h 中定义 USE_XALLOCATOR 来启用委托库上的固定块分配器。要启用状态机上的分配器,请在 StateMachine.h 中取消注释 XALLOCATOR

有关 xallocator 的更多信息,请参阅 使用快速固定块内存分配器替换 malloc/free

轮询事件

CentrifugeTest 有一个 Timer 实例并注册回调。回调函数、线程实例和 this 指针被提供给 Register(),以促进异步回调机制。

// Register for timer callbacks
m_pollTimer.Expired = MakeDelegate(this, &CentrifugeTest::Poll, 
                                   &SelfTestEngine::GetInstance().GetThread());

当使用 Start() 启动计时器时,Poll() 事件函数会以指定的间隔周期性地调用。请注意,当调用 Poll() 外部事件函数时,会根据状态机的当前状态转换到 WaitForAccelerationWaitForDeceleration。如果 Poll() 在错误的时间被调用,事件将被静默忽略。

void CentrifugeTest::Poll()
{
    BEGIN_TRANSITION_MAP                                   // - Current State -
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)               // ST_IDLE
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)               // ST_COMPLETED
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)               // ST_FAILED
        TRANSITION_MAP_ENTRY (EVENT_IGNORED)               // ST_START_TEST
        TRANSITION_MAP_ENTRY (ST_WAIT_FOR_ACCELERATION)    // ST_ACCELERATION
        TRANSITION_MAP_ENTRY (ST_WAIT_FOR_ACCELERATION)    // ST_WAIT_FOR_ACCELERATION
        TRANSITION_MAP_ENTRY (ST_WAIT_FOR_DECELERATION)    // ST_DECELERATION
        TRANSITION_MAP_ENTRY (ST_WAIT_FOR_DECELERATION)    // ST_WAIT_FOR_DECELERATION
    END_TRANSITION_MAP(NULL)
}

STATE_DEFINE(CentrifugeTest, Acceleration, NoEventData)
{
    SelfTestEngine::InvokeStatusCallback("CentrifugeTest::ST_Acceleration");

    // Start polling while waiting for centrifuge to ramp up to speed
    m_pollTimer.Start(10);
}

用户界面

该项目没有用户界面,只有一个文本控制台输出。在这个例子中,“用户界面”只是通过 SelfTestEngineStatusCallback() 函数在用户界面线程上输出自检状态消息。

WorkerThread userInterfaceThread("UserInterface");

void SelfTestEngineStatusCallback(const SelfTestStatus& status)
{
    // Output status message to the console "user interface"
    cout << status.message.c_str() << endl;
}

在自检开始之前,用户界面会注册到 SelfTestEngine::StatusCallback 回调。

SelfTestEngine::StatusCallback += 
      MakeDelegate(&SelfTestEngineStatusCallback, &userInterfaceThread);

这里的用户界面线程仅用于模拟通常在单独控制线程中运行的 GUI 库的回调。

运行时

程序的主函数(main())如下所示。它创建两个线程,注册以从 SelfTestEngine 获取回调,然后调用 Start() 来启动自检。

int main(void)
{    
    // Create the worker threads
    userInterfaceThread.CreateThread();
    SelfTestEngine::GetInstance().GetThread().CreateThread();

    // Register for self-test engine callbacks
    SelfTestEngine::StatusCallback += 
             MakeDelegate(&SelfTestEngineStatusCallback, &userInterfaceThread);
    SelfTestEngine::GetInstance().CompletedCallback += 
         MakeDelegate(&SelfTestEngineCompleteCallback, &userInterfaceThread);
    
    // Start the worker threads
    ThreadWin::StartAllThreads();

    // Start self-test engine
    StartData startData;
    startData.shortSelfTest = TRUE;
    SelfTestEngine::GetInstance().Start(&startData);

    // Wait for self-test engine to complete 
    while (!selfTestEngineCompleted)
        Sleep(10);

    // Unregister for self-test engine callbacks
    SelfTestEngine::StatusCallback -= 
             MakeDelegate(&SelfTestEngineStatusCallback, &userInterfaceThread);
    SelfTestEngine::GetInstance().CompletedCallback -= 
         MakeDelegate(&SelfTestEngineCompleteCallback, &userInterfaceThread);

    // Exit the worker threads
    userInterfaceThread.ExitThread();
    SelfTestEngine::GetInstance().GetThread().ExitThread();

    return 0;
}

SelfTestEngineUserInteface 线程上生成异步回调。SelfTestEngineStatusCallback() 回调将消息输出到控制台。

void SelfTestEngineStatusCallback(const SelfTestStatus& status)
{
      // Output status message to the console "user interface"
      cout << status.message.c_str() << endl;
}

SelfTestEngineCompleteCallback() 回调设置一个标志,让 main() 循环退出。

void SelfTestEngineCompleteCallback()
{
      selfTestEngineCompleted = TRUE;
}

运行该项目会输出以下控制台消息:

图 4:控制台输出

结论

StateMachineDelegate<> 实现可以单独使用。各自都很有用。然而,将两者结合起来,为多线程状态驱动的应用程序开发提供了一个新颖的框架。本文展示了如何在使用多线程时协调状态机的行为,这在查看简单的单线程示例时可能并不完全显而易见。

我曾在许多不同的 PC 和嵌入式项目上成功地使用过类似的想法。通过少量努力,该代码就可以移植到任何平台。我特别喜欢异步委托回调的想法,因为它有效地隐藏了线程间通信,并且状态机的组织使得创建和维护自检变得容易。

参考文献

历史

  • 2017 年 1 月 11 日
    • 首次发布
  • 2017 年 1 月 19 日
    • 文章小幅更新
    • 更新了附加源代码,采用了更新的 StateMachine 实现
  • 2017 年 2 月 5 日
    • 更新了支持文章的参考文献
© . All rights reserved.