带线程的 C++ 状态机






4.76/5 (34投票s)
结合状态机和多播异步回调的框架。
引言
基于软件的有限状态机 (FSM) 是一种用于将设计分解为状态和事件的实现方法。没有操作系统的简单嵌入式设备采用单线程,以便状态机在单个“线程”上运行。更复杂的系统使用多线程来分配处理任务。
存在许多 FSM 实现,包括我在此 Code Project 上撰写的《C++ 中的状态机设计》。这篇文章介绍了如何使用 StateMachine
基类创建 C++ 状态机。然而,缺少的是如何在多线程环境中集成多个状态机。
“异步组播回调与线程间消息传递”是我在 Code Project 上写的另一篇文章。该设计提供了一种简单、可移植的回调机制,可以处理在客户端指定的控制线程上异步调用回调(带有事件数据)的底层细节。
本文将前面介绍的两种技术——状态机和异步回调——结合到一个项目中。在之前的文章中,通过简单的例子可能不容易看出多个状态机如何协调活动以及它们之间如何分发事件。本文的目标是提供一个完整的、可工作的项目,其中包含线程、定时器、事件和状态机协同工作。为了说明这个概念,示例项目实现了一个基于状态的自检引擎,该引擎利用线程之间的异步通信。
我不会重新解释 StateMachine
和 AsyncCallback<>
的实现,因为之前的文章已经详细介绍过了。主要关注点是如何将状态机和异步回调结合到一个框架中。
使用 CMake 创建构建文件。CMake 是免费开源软件。支持 Windows、Linux 和其他工具链。有关更多信息,请参阅 **CMakeLists.txt** 文件。
查看 GitHub 以获取最新源代码
- C++ 状态机与线程 - 作者:David Lafreniere
自检子系统
自检对硬件和机械系统执行一系列测试,以确保正常运行。在此示例中,有四个状态机类实现了我们的自检子系统,如下面的继承图所示。
异步回调
AsyncCallback<>
类贯穿始终用于提供异步回调。第一次使用它是在 SelfTest
类中。每当自检完成时,就会调用 SelfTest::CompletedCallback
回调,通知已注册的客户端。SelfTestEngine
注册到 CentrifugeTest
和 PressureTest
,以便在测试完成时收到通知。
第二次是用户界面注册到 SelfTestEngine::StatusCallback
。这允许运行在另一个线程上的客户端注册并接收执行期间的状态回调。AsyncCallback<>
允许客户端指定精确的回调线程,从而可以轻松避免跨线程错误。
最后一个位置是在 Timer
类中,该类会周期性地向已注册的回调函数触发回调。一个通用的、低速的定时器,能够在一个客户端指定的线程上调用函数,对于事件驱动的状态机非常有用,在这种状态机中,您可能希望轮询某些条件的发生。在这种情况下,Timer
类用于将轮询事件注入状态机实例。
SelfTestEngine
SelfTestEngine
是线程安全的,并且是客户端使用自检子系统的主要入口点。CentrifugeTest
和 PressureTest
是 SelfTestEngine
的成员。SelfTestEngine
负责按正确顺序对单个自检进行排序,如下面的状态图所示。
Start
事件启动自检引擎。SelfTestEngine::Start()
是一个异步函数,它依赖于 StartCallback
来调用 private
的 SelfTestEngine::StartPrivateCallback()
事件函数。由于 Start()
是异步的,因此可以安全地由运行在任何线程上的任何客户端调用。
void SelfTestEngine::Start()
{
// Asynchronously call StartPrivateCallback
StartCallback(NoData());
}
void SelfTestEngine::StartPrivateCallback()
{
BEGIN_TRANSITION_MAP // - Current State -
TRANSITION_MAP_ENTRY (ST_START_CENTRIFUGE_TEST) // ST_IDLE
TRANSITION_MAP_ENTRY (CANNOT_HAPPEN) // ST_COMPLETED
TRANSITION_MAP_ENTRY (CANNOT_HAPPEN) // ST_FAILED
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_START_CENTRIFUGE_TEST
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_START_PRESSURE_TEST
END_TRANSITION_MAP(NULL)
}
当每个自检完成时,会触发 Complete
事件,导致下一个自检启动。所有测试完成后,状态机将过渡到 Completed
然后回到 Idle
。如果在执行期间任何时候生成 Cancel
事件,则会过渡到 Failed
状态。
SelfTest
基类提供了所有 SelfTest
派生状态机共有的三个状态:Idle
、Completed
和 Failed
。然后 SelfTestEngine
添加了另外两个状态:StartCentrifugeTest
和 StartPressureTest
。
SelfTestEngine
有一个 public
事件函数 Start()
,它启动自检。SelfTestEngine::StatusCallback
是一个异步回调,允许客户端注册以在测试期间获取状态更新。类中还包含一个 WorkerThread
实例。所有自检状态机的执行都发生在这个线程上。
class SelfTestEngine : public SelfTest
{
public:
// Clients register for asynchronous self-test status callbacks
static AsyncCallback<SelfTestStatus> StatusCallback;
// Singleton instance of SelfTestEngine
static SelfTestEngine& GetInstance();
// Start the self-tests
void Start();
WorkerThread& GetThread() { return m_thread; }
static void InvokeStatusCallback(std::string msg);
private:
AsyncCallback<> StartCallback;
void StartPrivateCallback();
SelfTestEngine();
void Complete();
// Sub self-test state machines
CentrifugeTest m_centrifugeTest;
PressureTest m_pressureTest;
// Worker thread used by all self-tests
WorkerThread m_thread;
// State enumeration order must match the order of state method entries
// in the state map.
enum States
{
ST_START_CENTRIFUGE_TEST = SelfTest::ST_MAX_STATES,
ST_START_PRESSURE_TEST,
ST_MAX_STATES
};
// Define the state machine state functions with event data type
STATE_DECLARE(SelfTestEngine, StartCentrifugeTest, NoEventData)
STATE_DECLARE(SelfTestEngine, StartPressureTest, NoEventData)
// State map to define state object order. Each state map entry defines a
// state object.
BEGIN_STATE_MAP
STATE_MAP_ENTRY(&Idle)
STATE_MAP_ENTRY(&Completed)
STATE_MAP_ENTRY(&Failed)
STATE_MAP_ENTRY(&StartCentrifugeTest)
STATE_MAP_ENTRY(&StartPressureTest)
END_STATE_MAP
// Declare state machine events that receive async callbacks
CALLBACK_DECLARE_NO_DATA(SelfTestEngine, StartPrivateCallback)
CALLBACK_DECLARE_NO_DATA(SelfTestEngine, Complete)
CALLBACK_DECLARE_NO_DATA(SelfTest, Cancel)
};
如前所述,SelfTestEngine
注册以接收来自每个子自检(即 CentrifugeTest
和 PressureTest
)的异步回调,如下所示。当子自检状态机完成时,将调用 SelfTestEngine::Complete()
函数。当子自检状态机失败时,将调用 SelfTestEngine::Cancel()
函数。
SelfTestEngine::SelfTestEngine() :
SelfTest(ST_MAX_STATES),
m_thread("SelfTestEngine")
{
StartCallback.Register(&SelfTestEngine::StartPrivateCallback, &m_thread, this);
// Register for callbacks when sub self-test state machines complete or fail
m_centrifugeTest.CompletedCallback.Register(&SelfTestEngine::Complete, &m_thread, this);
m_centrifugeTest.FailedCallback.Register(&SelfTestEngine::Cancel, &m_thread, this);
m_pressureTest.CompletedCallback.Register(&SelfTestEngine::Complete, &m_thread, this);
m_pressureTest.FailedCallback.Register(&SelfTestEngine::Cancel, &m_thread, this);
}
SelfTest
基类分别在 Completed
和 Failed
状态下生成 CompletedCallback
和 FailedCallback
,如下所示。
STATE_DEFINE(SelfTest, Completed, NoEventData)
{
SelfTestEngine::InvokeStatusCallback("SelfTest::ST_Completed");
// Generate asynchronous completed callback if client is registered
if (CompletedCallback)
CompletedCallback(NoData());
InternalEvent(ST_IDLE);
}
STATE_DEFINE(SelfTest, Failed, NoEventData)
{
SelfTestEngine::InvokeStatusCallback("SelfTest::ST_Failed");
// Generate asynchronous failed callback if client registered
if (FailedCallback)
FailedCallback(NoData());
InternalEvent(ST_IDLE);
}
有人可能会问,为什么状态机使用异步回调?如果状态机在同一个线程上,为什么不使用普通、同步的回调呢?需要防止的问题是回调到当前正在执行的状态机,即调用堆栈绕回同一个类实例。例如,应防止以下调用序列:SelfTestEngine
调用 CentrifugeTest
回调 SelfTestEngine
。异步回调允许堆栈展开并防止这种不期望的行为。
CentrifugeTest
下面的 CentrifugeTest
状态机图实现了在“C++ 中的状态机设计”中描述的离心机自检。这里的区别在于使用了 Timer
类通过异步回调提供 Poll
事件。
定时器
Timer
类提供了一个通过注册 Expired
来接收函数回调的通用机制。Start()
以特定间隔启动回调。Stop()
停止回调。
class Timer
{
public:
static const DWORD MS_PER_TICK;
/// An expired callback client's register with to get callbacks
AsyncCallback<> Expired;
/// Starts a timer for callbacks on the specified timeout interval.
/// @param[in] timeout - the timeout in milliseconds.
void Start(DWORD timeout);
/// Stops a timer.
void Stop();
...
所有 Timer
实例都存储在一个 private static
列表中。WorkerThread::Process()
循环通过调用 Timer::ProcessTimers()
来定期服务列表中的所有定时器。在 Expired
中注册的客户端会在定时器到期时被调用。
case WM_USER_TIMER:
Timer::ProcessTimers();
break;
轮询事件
CentrifugeTest
有一个 Timer
实例并注册回调。回调函数、一个线程实例和一个 this
指针会传递给 Register()
,以促进异步回调机制。
// Register for timer callbacks
m_pollTimer.Expired.Register(&CentrifugeTest::Poll, &SelfTestEngine::GetInstance().GetThread(), this);
使用 Start()
启动定时器时,会以指定的间隔周期性地调用 Poll()
事件函数。
void CentrifugeTest::Poll()
{
BEGIN_TRANSITION_MAP // - Current State -
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_IDLE
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_COMPLETED
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_FAILED
TRANSITION_MAP_ENTRY (EVENT_IGNORED) // ST_START_TEST
TRANSITION_MAP_ENTRY (ST_WAIT_FOR_ACCELERATION) // ST_ACCELERATION
TRANSITION_MAP_ENTRY (ST_WAIT_FOR_ACCELERATION) // ST_WAIT_FOR_ACCELERATION
TRANSITION_MAP_ENTRY (ST_WAIT_FOR_DECELERATION) // ST_DECELERATION
TRANSITION_MAP_ENTRY (ST_WAIT_FOR_DECELERATION) // ST_WAIT_FOR_DECELERATION
END_TRANSITION_MAP(NULL)
}
STATE_DEFINE(CentrifugeTest, Acceleration, NoEventData)
{
SelfTestEngine::InvokeStatusCallback("CentrifugeTest::ST_Acceleration");
// Start polling while waiting for centrifuge to ramp up to speed
m_pollTimer.Start(10);
}
将回调连接到事件函数
AsyncCallback<>
机制能够调用一个 static
成员函数或一个自由函数。然而,状态机事件是使用实例成员函数实现的。连接两者的关键是 CALLBACK_DECLARE
和 CALLBACK_DECLARE_NO_DATA
宏。
CentrifugeTest.h 有这一行
CALLBACK_DECLARE_NO_DATA(CentrifugeTest, Poll)
第一个宏参数是类名。第二个是实例事件函数名。宏定义如下
#define CALLBACK_DECLARE_NO_DATA(stateMachine, eventName) \
private:\
static void eventName(const NoData& data, void* userData) { \
ASSERT_TRUE(userData != NULL); \
stateMachine* stateMachine##Instance = static_cast<stateMachine*>(userData); \
stateMachine##Instance->eventName(); }
展开上面的 CALLBACK_DECLARE_NO_DATA
宏会得到
private:
static void Poll(const NoData& data, void* userData) {
ASSERT_TRUE(userData != NULL);
CentrifugeTest* CentrifugeTestInstance = static_cast<CentrifugeTest*>(userData);
CentrifugeTestInstance->Poll(); }
这里没有魔法。创建了一个静态成员 Poll()
函数,该函数接受 AsyncCallback<>
回调。void* userData
来自 Register()
函数的 3rd 参数,在本例中是 CentrifugeTest
的一个实例。void*
被转换为 CentrifugeTest*
类型,以便可以调用实例成员 Poll()
。
在这种情况下,使用“NO_DATA
”宏版本,因为 Poll()
事件不接受参数。要将回调连接到带有事件数据的事件函数,请使用 CALLBACK_DELCARE
宏,如下所示。
CALLBACK_DECLARE(MyStateMachine, MyEventFunc, MyEventFuncData)
当然,您可以无需多行宏完成所有这些工作,但它清理了项目中可能传播的大量重复代码。
用户界面
该项目没有用户界面,只有文本控制台输出。在此示例中,“用户界面”仅通过 SelfTestEngineStatusCallback()
函数在用户界面线程上输出自检状态消息。
WorkerThread userInterfaceThread("UserInterface");
void SelfTestEngineStatusCallback(const SelfTestStatus& status, void* userData)
{
// Output status message to the console "user interface"
cout << status.message.c_str() << endl;
}
在自检开始之前,用户界面会注册到 SelfTestEngine::StatusCallback
回调。
SelfTestEngine::StatusCallback.Register(&SelfTestEngineStatusCallback, &userInterfaceThread);
这里的用户界面线程仅用于模拟对通常在单独控制线程中运行的 GUI 库的回调。
运行时
程序的主函数 main()
如下所示。它创建两个线程,注册以从 SelfTestEngine
接收回调,然后调用 Start()
启动自检。
int main(void)
{
// Create the worker threads
userInterfaceThread.CreateThread();
SelfTestEngine::GetInstance().GetThread().CreateThread();
// Register for self-test engine callbacks
SelfTestEngine::StatusCallback.Register(&SelfTestEngineStatusCallback, &userInterfaceThread);
SelfTestEngine::GetInstance().CompletedCallback.Register
(&SelfTestEngineCompleteCallback, &userInterfaceThread);
// Start the worker threads
ThreadWin::StartAllThreads();
// Start self-test engine
SelfTestEngine::GetInstance().Start();
// Wait for self-test engine to complete
while (!selfTestEngineCompleted)
Sleep(10);
// Exit the worker threads
userInterfaceThread.ExitThread();
SelfTestEngine::GetInstance().GetThread().ExitThread();
return 0;
}
SelfTestEngine
在 UserInteface
线程上生成异步回调。SelfTestEngineStatusCallback()
回调将消息输出到控制台。
void SelfTestEngineStatusCallback(const SelfTestStatus& status, void* userData)
{
// Output status message to the console "user interface"
cout << status.message.c_str() << endl;
}
SelfTestEngineCompleteCallback()
回调设置一个标志,让 main()
循环退出。
void SelfTestEngineCompleteCallback(const NoData& data, void* userData)
{
selfTestEngineCompleted = TRUE;
}
运行项目会输出以下控制台消息。
参考文献
- C++ 中的状态机设计 - 作者:David Lafreniere
- 异步组播回调与线程间消息传递 - 作者:David Lafreniere
结论
StateMachine
和 AsycCallback<>
的实现可以单独使用。它们各自都有用。然而,将两者结合起来,为多线程的状态驱动应用程序开发提供了一个新颖的框架。本文展示了如何在多线程时协调状态机的行为,这在查看简单、单线程的示例时可能不那么明显。
我在许多不同的 PC 和嵌入式项目上成功地使用了类似的想法。通过少量工作,该代码可以移植到任何平台。我特别喜欢异步回调的想法,因为它有效地隐藏了线程间通信,而状态机的组织使得创建和维护自检变得容易。
历史
- 2016 年 11 月 18 日
- 首次发布
- 2016年11月19日
- 文章小幅修正
- 2016 年 11 月 25 日
- 细微的语法更改和澄清
- 2016 年 12 月 2 日
- 修复了
Callback::operator==()
中的错误 - 更新了附加的源代码
- 修复了