65.9K
CodeProject 正在变化。 阅读更多。
Home

FreeRTOS 线程包:轻松创建多线程物联网代码

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2021年2月28日

MIT

27分钟阅读

viewsIcon

20371

downloadIcon

159

使用流行的 RTOS 为您的物联网设备启用轻松的多线程。

FreeRTOS Thread Pack

引言

至少可以说,为物联网设备编写多线程代码的想法可能会令人望而却步。由于几乎不存在集成调试设施、代码上传时间长导致构建周期长、RAM 有限以及通常只有一个 CPU 核心可用,缺点往往会超过优点。

然而,这里存在巨大的未开发机会,特别是考虑到像 ESP32 和各种基于 ARM 的设备都采用多核配置,甚至在单核配置中,几乎所有这些设备在大多数 I/O 点上都受到严重瓶颈。至少,一些休眠的 I/O 线程确实可以帮助我们。

我们将依赖 FreeRTOS 来提供线程调度和设备特定的“SMP”支持以利用所有核心。FreeRTOS 在 Arduino 库管理器中作为一个“库”提供,但该分支不起作用。FreeRTOS 已经内置在 ESP32 的 IDF 代码库中,因此在 ESP32 上始终可用。

FreeRTOS 很好,但我们可以通过在线程周围添加包装器,并提供我们自己的池化和同步设施来改进它,这就是 FreeRTOS 线程包所带来的。

您可以使用此库来使您的代码更轻松、更灵活地利用多个线程和核心。

兼容性说明

此代码是在 FreeRTOS 的 ESP32 变体上编写和测试的。它使用了一些 ESP32 特有的函数,例如 CPU 亲和性函数。这些函数必须从其他实现中删除。我还没有在其他平台上测试过,但当我测试时,我会添加条件编译。

Arduino 版本的 FreeRTOS 是原始代码的一个分支,不支持很多 FreeRTOS 功能。我开始为其添加支持,但后来意识到它实际实现的操作系统功能很少,于是我决定不再继续。特别是在 AVR 设备上,使用 Arduino 框架版本的 FreeRTOS,同步上下文根本不可行。再加上像 atMega2560 这样的某些 CPU 缺乏对优化原子操作的 CPU 支持,我没有有效的方法来替换 Arduino 版本所缺少的功能。

一种不同的方法

我通常在代码之前描述概念,但在这里我假设您熟悉多线程、线程池和同步等概念。如果您不熟悉这些内容,我不想浪费您的时间使用此库。这是为已经理解这些概念的受众准备的,所以本文基本上分为“如何使用它”和“我是如何制作的”部分。

如何使用它

.NETisms

首先,我想从一个也许是不太策略的建议开始,即微软在 .NET 的线程处理方面做得很好。线程池是一个很棒的东西。同步上下文是一个绝妙的想法。我们将把这些好想法应用到我们自己的代码中,尽管它不是 .NET。因此,如果您已经熟悉 C#,使用此代码会有些熟悉感。当然,由于这些是截然不同的编程环境,所以存在差异,但“熟悉”是这里最重要的词。

何时避免使用此功能

让我们谈谈什么时候不应该使用这个。线程不是轻量级对象。它们需要堆栈空间,堆栈本身是从堆中分配的。每个线程都会在 CPU 开销方面给线程调度程序带来负担。在小型设备上,CPU 周期和 RAM 非常宝贵。此外,堆分配本身也很繁重。相对于这些小型设备,线程基本上是沉重的。例如,在 ESP32 上,使用默认参数,每个线程会消耗略多于 4KB 的堆,并强制该核心的调度程序将线程添加到它必须执行的上下文切换列表中。

因此,除非您能从中获得某种好处,否则您不会想使用线程。如果您需要等待缓慢的 I/O 同时执行其他操作,线程可能是一个不错的选择。如果您需要在另一个核心上分担一些 CPU 密集型工作,同时继续使用主核心操作,线程可能是一个不错的选择。否则,还有更有效的方法可以在物联网设备上进行时间分片,例如使用纤程甚至协程。此库目前不支持纤程,但 FreeRTOS 支持它们,并且它们可能会在将来添加到此库中。

线程魔术

我们这里有三个主要关注点——线程操作、池管理和同步。涵盖这些领域使我们在实际使用线程方面“功能齐全”,因此它们很重要。

FRThread 类进行线程操作

FRThread 很像 .NET 的 Thread 类。FRThread::create()FRThread::createAffinity() 创建一个前台线程(尽管如果指示,它也可以在空闲优先级下执行)。您可以使用 FRThread::current() 获取当前线程,或使用 FRThread::sleep() 使当前线程休眠。您还可以使用 FRThread::idle() 获取 CPU 空闲线程。

线程生命周期

线程是一个前台线程,其生命周期由传递给创建函数的代码的生命周期决定。如果代码执行无限循环,线程将一直存在,直到调用 FRThread.abort()。请注意,生命周期不与 FRThread 实例本身绑定。如果它超出作用域,线程仍然存活,直到调用 abort(),或者传递给创建函数的代码运行完成。

创建函数

线程创建函数的签名是 void(const void*)。它可以是 lambda 表达式或用 std::function<void(const void*>)> 包装。void* 参数是一个应用程序定义的状态,在执行时传递给函数。此函数中的任何代码都将在目标线程上运行,并且应对其访问的任何共享数据使用同步。当代码退出时,线程会自动销毁。

线程生命周期

当调用 FRThread::create()FRThread::createAffinity() 时,会创建一个线程并挂起,然后才执行任何传入的代码。在 FreeRTOS 下,线程不是在挂起状态下创建的,因此 FRThread 使用了一个 thunk,它会导致线程在创建后立即挂起自身。这就是为什么说线程是创建后挂起,而不是像 .NET 的 Thread 那样在挂起状态下创建。这种差异很微妙,但不一定不重要,因为线程在创建后短暂的瞬间在技术上是“活着的”,然后才进入休眠状态。对于大多数情况,这与创建时挂起没有什么不同。

在调用 FRThread.start() 之前,线程中的任何代码都不会运行。在任何时候,都可以调用 FRThread.suspend() 使线程进入休眠状态,直到再次调用 FRThread.start()

一个示例

以下是一些使用线程的示例代码。 admittedly 比较刻意,但它演示了基本原理

#include <FRThreadPack.h>
void setup() {
  Serial.begin(115200);

  // create some mischief
  FRThread mischief = FRThread::create([](const void*){
    while(true) {Serial.print(".");}
  },nullptr);
  
  // start the mischief
  mischief.start();

  // print out a string. We don't do so all at once, because
  // depending on the platform, Serial.print/println are atomic,
  // meaning a thread can't interrupt them.
  const char*sz="The quick brown fox jumped over the lazy dog\r\n";
  char szch[2];
  szch[1]=0;
  while(*sz) {
    szch[0]=*sz;
    Serial.print(szch);
    ++sz;
  }
  // that's enough foolishness!
  mischief.abort();
}

void loop() {
}

输出将是这样的,但可能包含更多点

...The quic...k brown ....f...ox jumpe...d over th...e lazy d...og

使用 FRThreadPool 类进行线程池管理

FRThreadPool 允许您将工作排入队列,以便线程在可用时从池中调度。线程池非常适合管理某些类型的长时间运行操作。

线程分配与销毁

在 .NET 中,ThreadPool 类已经有几个“预分配”的线程在等待。这对于某些环境来说很好,但对于 IoT 设备来说并不太合适,因为相对而言,线程即使只是维护也代价高昂。此外,由于 RTOS 调度程序的有限能力,您需要了解您的硬件并自行将线程分配到不同的核心。根据我的经验,FreeRTOS 在自动将新线程分配到核心方面并不出色。在示例代码中我们没有使用亲和性,因为有些平台没有多个核心,但除此之外您应该考虑它。此外,您很可能希望在一个核心上使用更高优先级的线程,而在主核心上使用更低优先级的线程,也许还在同一个池中。

因此,FRThreadPool 要求您自行为其创建线程。线程池使用特殊的“调度器”线程,而不是通用 FRThread 线程,因为前者“感知池”而更高效。因此,要为 FRThreadPool 创建线程,您可以使用 FRThreadPool.createThread()FRThreadPool.createThreadAffinity() 来创建池的线程。当 FRThreadPool 超出作用域时,这些线程会自动销毁。请注意,在创建它们时,理论上您可以为每个线程指定不同的堆栈大小,但实际上这样做只会浪费内存,因为在池中执行的代码将受限于指定的最小堆栈大小。

这些方法返回一个 FRThread,但您不应该对这些线程调用 abort()。如果您想显式销毁所有线程,请使用 FRThreadPool.shutdown()。此方法会立即返回,但要等到所有线程完成当前操作后才会完成。

将工作排入队列

与 .NET 的 ThreadPool 非常相似,您可以使用 FRThreadPool.queueUserWorkItem() 将工作分派给等待中的池线程之一。如果没有线程等待,工作将放置在积压队列中。如果队列已满,queueUserWorkItem() 会阻塞,直到有空间为止。函数签名和状态参数与 FRThread::create() 相同。

一个示例

其中一些可能听起来比实际复杂。如果是这样,一个例子应该能说明问题

#include <FRThreadPack.h>
 
void setup() {
  Serial.begin(115200);
  // create a thread pool
  // note that this thread pool
  // will go out of scope and all
  // created threads will be exited
  // once setup() ends
  FRThreadPool pool;
  // create three threads for the pool
  // all of these threads are now waiting on incoming items.
  // once an item becomes available, one of the threads will
  // dispatch it. When it's complete, it will return to the
  // listening state. You do not use this thread pool the way
  // you use .NET's thread pool. .NET's thread pool
  // has lots of reserve threads created by the system.
  // This threadpool has no threads unless you create them.
  pool.createThread(); 
  pool.createThread();
  pool.createThread();
  
  // now queue up 4 work items. The first 3 will start executing immediately
  // the 4th one will start executing once one of the others exits.
  // this is because we have 3 threads available.
  pool.queueUserWorkItem([](void*state){
    delay(3000);
    Serial.println("Work item 1");
  },nullptr);
  pool.queueUserWorkItem([](void*state){
    delay(2000);
    Serial.println("Work item 2");
  },nullptr);
  pool.queueUserWorkItem([](void*state){
    delay(1000);
    Serial.println("Work item 3");
  },nullptr);
  pool.queueUserWorkItem([](void*state){
    Serial.println("Work item 4");
  },nullptr);

  // the thread pool exits here, waiting for pool threads to complete
  // so this can take some time.
}

void loop() {
}

这很可能会输出以下内容——线程本质上不是确定性的

Work item 3
Work item 4
Work item 2
Work item 1

使用 FRSynchronizationContext 进行同步

现在,来点完全不同的东西。如果您熟悉 .NET WinForms 或 WPF 开发,您可能以前使用过 SynchronizationContext,尽管是间接使用,但您可能从未近距离接触过它。它们是一种不寻常但巧妙的、围绕线程安全消息传递方案的抽象。

它们的功能是让您能够将代码调度到特定线程(通常是应用程序的主线程)上执行。

一种不同的同步方法

通常,当我们考虑多线程代码的同步时,我们会想到在数据周围创建读/写屏障并获取或释放资源。

如果你有耐心,这是一种很好的做事方式。它可以像你所做的那样高效。但当事情出错时,追查 bug 也是一场噩梦,特别是它们经常表现为间歇性的竞态条件。

还有一种通过消息传递来处理事情的方法。基本上,我们使用线程安全的环形缓冲区来存储消息。我们从一个或多个线程向它们发布消息,由另一个线程接收并处理。Windows 使用类似的方法来处理其... 窗口。

消息就是数据。同步已经在消息上完成。这也有缺点,其中之一是缺乏灵活性。一条消息能有多通用?通常很难想出一条足够通用的消息来处理所有情况,但请跟着我,因为我向你保证,这正是同步上下文所提供的。

在开始之前,我们需要谈谈代码的生命周期。

隐藏的入口点,隐藏的主循环:线程“生命周期”循环

几乎所有的物联网应用程序都在其主线程中循环。在 Arduino 应用程序中,您无法在 .ino 文件中看到循环本身,但该循环确实存在。它被 IDE 的源代码破坏“功能”隐藏了,但它就是调用您代码中 loop() 方法的。

基本上,从本质上讲,如果我们将 Arduino 框架所做的事情转换为经典的 C 应用程序,它看起来会像这样**

// forward declarations:
void setup();
void loop();
// "real" entry point: 
int main(int argc, char** argv) { 
    setup();
    while(true) {
        loop();
    }
    return 0;
}
// your code is inserted here
#include "yourcode.ino"
...

** 我并不是说它真的会转换为这段代码。这仅仅是为了演示。您的平台可能确实使用 FreeRTOS 来创建一个新的“任务”,该任务调用 setup()loop(),但它实现了相同的功能。

这里的重点是,如果没有某种东西阻止,你的应用程序就会退出,而在物联网设备中,你最终不会退出 main()。永远不会。因为这样做会导致深渊。main() 之后什么都没有,永恒。这条路通向龙。正因为如此,某个地方有一个循环,或者等效地阻止了这种退出发生。

如果不是这样,你的设备几乎肯定会在每次运行到 main() 结束时重启(或者更糟,停止),因为没有其他事情可做。这不是一台电脑。没有命令行或桌面可以返回。没有“进程”的概念——只有在启动时运行的代码。“进程退出”实际上是一个未定义的状态!除了重启或停止,绝对无事可做——除非一些开发人员耍小聪明,将太空侵略者或飞行模拟器偷偷塞进了芯片本身。在我见过的一些事情之后,我不会排除这种可能性。也许你会发现宝藏。

但目前,我们总是以某种方式循环。

你会注意到,这与管理其他线程生命周期的方式非常相似,只是在退出时不会重新启动——我的意思是,它与线程的生命周期与其代码的生命周期相同这一点相似。一旦代码退出,使其存活的东西也“死了”。超出作用域就是超出作用域,这是……我将留给虚无主义者思考的问题。

阻止所有这些哲学和它带来的存在性问题的循环,我称之为“生命周期循环”。这些循环具有定义你的应用程序何时生死的区别——这些循环是你的线程的脉搏并使其保持活力——甚至你的主应用程序/线程循环也是如此。这适用于 Windows GUI 应用程序、桌面系统上的交互式控制台应用程序、服务器上的守护进程,以及任何需要活得不仅仅是“完成一项任务就结束”的生活的东西,无论平台如何。

一个同步上下文“存活”在这样的生命周期循环中,无论是在主线程的生命周期循环中还是在辅助线程的生命周期循环中。它从其宿主循环中窃取周期来处理来自其他线程的消息。如果我们修改上面假设的经典 C 应用程序,将同步上下文插入到我们的主循环中,它将是这样的

#include <FRThreadPack.h>
FRSynchronizationContext g_mainSync;
// forward declarations:
void setup();
void loop();
// "real" entry point:
int main(int argc, char** argv) {
  setup();
  while(true) {
    loop();
    // process an incoming 
    // message from a thread, 
    // if there is one:
    g_mainSync.processOne();
  }
  return 0;
}
// your code is inserted here
#include "yourcode.ino"
...

现在很明显,Arduino IDE 不会允许我们这样做。PlatformIO 可能会,但它仍然是一种技巧。幸运的是,我们根本不需要这样做。这只是为了说明这个概念。我们可以通过将相关代码移动到 .ino 文件本身来实现完全相同的功能,例如 #includeg_mainSync 全局声明,然后从 loop() 内部调用 g_mainSync.processOne()

#include <FRThread.h>
FRSynchronizationContext g_mainSync;

void setup() {
}
void loop() {
  // process incoming messages
  g_mainSync.processOne();
}

但现在你明白了,我们真正做的,是将 g_mainSync “注入”到应用程序的主生命周期循环中。

你可以做类似的事情,将它们注入到辅助线程的生命周期循环中。你可以根据需要创建任意数量的,但最常见的情况是你只有一个,它存在于主应用程序的线程中,无论你有多少个辅助线程。

常见用例

典型的场景是主线程在辅助线程上创建并调度长时间运行的任务(可能使用线程池),然后这些线程使用消息传递将结果报告回主线程。

一个更复杂的用例

一种更罕见的场景是,您有必须进行协调的辅助线程,例如,从某个 I/O 源读取、进行一些后处理并将其写入另一个 I/O 源可能涉及多个线程之间相互通信,并且还可能涉及主线程。在这种情况下,您可能有一个“存活”在写入线程的生命周期循环中的同步上下文,读取线程向其发布消息。您还将在主线程中拥有主同步上下文,两个辅助线程都可以向其发布消息。

一则包罗万象的消息

我提过很多次传递消息,但只暗示了消息实际包含什么。

在同步上下文中,我们只有一种消息,但这种消息可以非常灵活。消息包含一个 std:function<void(void *)> 函数对象。如果这是 C#,我会说它包含一个 Action<Object> 委托。这个函数对象由接收者在接收者的线程中执行。

这样做的好处是,我只需将代码打包成 C++ lambda 表达式,然后将其发送到目标线程执行。由于代码在目标线程的上下文中执行,因此不需要额外的同步——我可以使用这段代码来更新用户界面,或者作为消息结果的任何其他操作。有一个通用的 void* 参数可以随之发送,但您也可以简单地使用提升来为您完成工作。

最终将其整合:发布消息

在这一切之后,你拥有的是一种简单的方法,可以在任何存在 FRSynchronizationContext 的线程上运行任意代码。

您使用 FRSynchronizationContext.post() 向同步上下文发布一些代码——通常是 lambda 表达式。它不会通知您完成情况。它只是“即发即忘”,但它是分派消息最有效的方式,在实践中,它很好地处理了许多(如果不是大多数)真实世界的场景。

您可以使用 FRSynchronizationContext.send() 将一些代码发送到同步上下文。如果您这样做,send() 将不会返回,直到其中包含的代码已在目标线程上执行。它尽可能高效,但不如 post() 高效。如果您必须等到接收者处理完消息,请使用它。

这使得同步多线程代码变得更加容易,而不是使用原始的同步原语。这在调试工具极少的设备上尤其重要,因为以这种方式同步代码要简单得多,因此错误也会少得多。

一个示例

不举例说明很难理解这么多内容,所以我们现在就来举个例子。

#include <FRThreadPack.h>
FRSynchronizationContext g_mainSync;
void setup() {
  Serial.begin(115200);
  if(!g_mainSync.handle()) {
    Serial.println("Could not initialize synchronization context");
    while(true); // halt
  }
  uint32_t ts = millis()+500;
  // note that using snprintf (or any printf variant) requires extra stack for the thread.
  // this is yet another good reason to use a synchronization context. That way you can
  // keep your threads minimal on stack space, focused on work, and letting the main 
  // thread do things like pretty print results from that work.
  FRThread threadA = FRThread::create([](const void* state){
    // wait for the starting tick:
    while(millis()<*((uint32_t*)state));
    // first post some code to be run on the main thread
    // post() does not block. This code will eventually be
    // executed from inside loop() by way of g_mainSync.processOne()
    g_mainSync.post([](const void*){
      // this code runs on the main application thread -
      // the thread g_mainSync.processOne() is called from:
      unsigned long tid = (unsigned long)FRThread::current().handle();
      char szb[1024];
      snprintf(szb,1024,"Hello from thread A by way of main thread (%lx)\r\n",tid);
      Serial.println(szb);    
    });
    // this code is running from thread A:
    unsigned long tid = (unsigned long)FRThread::current().handle();
    char szb[1024];
    snprintf(szb,1024,"Hello from thread A (%lx)\r\n",tid);
    Serial.println(szb);
    },&ts,1,8192);
  if(!threadA.handle()) {
    Serial.println("Could not create thread A");
    while(true); // halt
  }
  FRThread threadB = FRThread::create([](const void* state){
    // wait for the starting tick:
    while(millis()<*((uint32_t*)state));
    // first post some code to be run on the main thread. send()
    // blocks until the code is executed. This code will be
    // executed from inside loop() by way of 
    // g_mainSync.processOne()
    g_mainSync.send([](const void*){
      unsigned long tid = (unsigned long)FRThread::current().handle();
      char szb[1024];
      snprintf(szb,1024,"Hello from thread B by way of main thread (%lx)\r\n",tid);
      Serial.println(szb);    
    });
    // this code is running from thread B:
    unsigned long tid = (unsigned long)FRThread::current().handle();
    char szb[1024];
    snprintf(szb,1024,"Hello from thread B (%lx)\r\n",tid);
    Serial.println(szb);
    },&ts,1,8192);
  if(!threadB.handle()) {
    Serial.println("Could not create thread B");
    while(true); // halt
  }
  threadA.start();
  threadB.start();
  // display from the main application thread:
  unsigned long tid = (unsigned long)FRThread::current().handle();
  char szb[1024];
  snprintf(szb,1024,"Hello from main thread (%lx)\r\n",tid);
  Serial.println(szb);
}

void loop() {
  // dispatch messages from our synchronization context
  g_mainSync.processOne();
}

这里内容很多,但大部分都是注释。让我们从头开始。

由于我们使用 C++ RAII 但不使用 C++ 异常,我们检查 handle() 以指示对象是否已初始化。

接下来,我们为未来的半秒钟创建了一个快速而粗略的时间戳。可以把它看作是一种发令枪。

现在我们 create() threadA

关于堆栈使用注意事项:问题是,这次我们确实需要指定一个堆栈大小,而且不是一个微不足道的大小。请记住,这是字长,而不是字节长。在我的测试机器上,一个字是 4 字节。这意味着在我的机器上有 32KB 的堆栈。C 字符串格式化函数显然非常占用堆栈,如注释中所述。这是使用 FRSynchronizationContext 的次要原因。由于您将代码发布到目标线程的上下文中执行,因此代码受目标堆栈帧的限制,而不是您的堆栈帧。通常,主应用程序线程有足够的堆栈空间。辅助线程通常不是这样,因此在主线程中执行格式化函数可以避免您在辅助线程有限的堆栈中执行此操作。

在线程 A 中,我们做的第一件事就是旋转,等待我们之前设置的时间戳过期。一旦过期,BANG! 我们就开始了。注意我们使用了 const void* state 参数来传递时间戳。我们也可以通过 lambda 表达式的捕获轻松做到这一点,但那样效率较低,也给编译器增加了额外的工作。

现在我们通过 g_mainSync 将一些代码 post() 到主线程,它在那里运行。请记住,这段代码在目标线程上执行。

实际应用中的 Post 和 Send

在现实世界中,我们将使用上面的 post() 将某种结果或线程完成工作的指示传回主应用程序线程。由于此代码将在主应用程序线程中运行,因此它不需要与主线程数据进行进一步同步交互。任何捕获和提升的参数,或通过 state 参数传递的参数也是安全的,只要我们不在 post() 调用后立即从辅助线程中再次触及它们。一旦发布,就不要触及,你就会没事。

如果你真的必须在将代码分派到目标线程执行后访问该数据,那么你可能应该使用 send() 而不是 post()。使用 send() 将会阻塞,直到代码最终被执行。一旦该代码被执行,它使用的数据应该可以再次被当前线程安全地触及,当然,除非它又将其传递给了另一个线程。尽量保持简单,避免编写需要协调许多不同线程的代码,这样你就会没事。

无论如何,在我们分派了另一条消息后,这次是从 threadA 本身分派的,我们又做了与刚才非常相似的事情,只是现在是 threadB。主要区别在于 threadB 使用了 send()

最后,作为我们在 setup() 中的最后一步,我们从主应用程序线程发布了一条消息。

loop() 中,我们唯一的任务是处理来自其他线程的同步上下文消息。

同步上下文总结

这些“野兽”存在于特定线程(可能是主应用程序线程)的“生命周期”循环中,并处理从其他线程发布或发送的传入消息。消息包含在接收它的目标线程(同步上下文所在的线程)上执行的代码。因此,一个线程可以将任意代码分派到任何其他存在同步上下文的线程上运行。这样做时,无需进一步同步对该代码数据的访问,因为目标线程的数据可以直接从目标线程本身通过来自源线程的消息进行更新。这避免了对更复杂同步技术的需要,例如使用互斥锁或信号量等同步原语。

工作原理

现在我们来看看有趣的部分。在这里,我们可以探索让这一切运作的魔法。我们将首先介绍线程,然后是同步上下文,最后是线程池,因为后者建立在前两者之上。

FRThread 类

除了一个方面,这个类实际上并不是那么有趣。它的大部分只是 FreeRTOS C 风格“Task”API 的一个轻量级包装。唯一特别有趣的是线程创建,所以我们来探讨一下

// creates a thread
static FRThread create(
    std::function<void(const void*)> fn,
    const void* state,
    UBaseType_t priority=tskIDLE_PRIORITY+1, 
    uint32_t stackWordSize=1024) {
    TaskHandle_t handle = nullptr;
    TaskEntryThunk tet;
    tet.callingThreadHandle=xTaskGetCurrentTaskHandle();
    tet.fn = fn;
    tet.state = state;
    if(pdPASS==xTaskCreate(taskEntryThunk,"FRThread",stackWordSize,&tet,priority,&handle)) {
        FRThread result(handle);
        ulTaskNotifyTake(pdTRUE,portMAX_DELAY);
        return result;
    }
    return FRThread(nullptr);
}

在这里,大部分工作都转发给了 FreeRTOS 的 xTaskCreate(),但首先,我们用我们当前调用线程的句柄(不是新句柄!)、std:function<void(const void*)> fn 参数和 const void* state 参数填充一个 TastEntryThunk tet 结构。当我们调用 xTaskCreate() 时,我们传递的是一个名为 taskEntryThunk 的函数的地址,而不是 fn,因为 fn 是一个函数对象,而不是函数指针,所以它当然不会起作用。我们没有将 state 参数用于线程的参数,而是使用 tet 的地址,它顺便保存了我们传入的 state 参数值。

如果你以前在微软工作过,“thunk”可能已经暗示了我们正在做什么。我来解释一下。我们需要在为线程例程传入的用户代码周围注入一些代码。这些代码将完成几件事。首先,它提供了一种从我们基于 std::function 的函数对象中提供一个可用的“扁平”函数指针的方法。其次,它允许我们让线程控制自己的生命周期,即在创建时挂起自身并在退出时删除自身。这是 thunk 函数

static void taskEntryThunk(void *data) {
    TaskEntryThunk thunk = *((TaskEntryThunk*)data);
    // let the creator know we're done
    xTaskNotifyGive(thunk.callingThreadHandle);
    vTaskSuspend(nullptr);
    thunk.fn(thunk.state);
    vTaskDelete(nullptr);
}

基本上,这其中一项工作是将一种“调用约定”(在抽象意义上,即使不是技术意义上)转换为另一种。我们有自己的可调用线程函数约定(一个函数对象和一个 const void* 状态参数),而 FreeRTOS 有自己的约定,它是一个 C 风格的函数指针,它接受一个非 const void* 作为单个参数。

你会注意到第一件事是参数立即以 TaskEntryThunk 的形式复制到堆栈中。现在,如果 void* data 本身是由调用者在堆栈上分配的,那么它最终可以安全地超出作用域。为了发出这个信号,我们使用 xTaskNotifyGive(),这是一种轻量级的信号机制,因此我们可以解除阻塞调用者,并且它最终可以返回。这就是我们最初必须将调用者的线程句柄存储在 callingThreadHandle 中的原因。在创建代码中,我们有 ulTaskNotifyTake(pdTRUE,portMAX_DELAY),它会阻塞直到相应的 xTaskNotifyGive(thunk.callingThreadHandle)。如果我们没有这样做,那么调用者可能在我们到达这里时已经返回,并且 void* data 指向的堆栈空间可能已经被回收,导致故障。

请注意,我们不对用户传入的 const void* state 参数提供此类保护。如果不复制它,就不可能做到这一点,而复制它又需要知道其大小。调用者有责任确保 state 参数指向的数据在线程创建时仍然存在。这与 FreeRTOS 文档描述其“任务”创建行为的方式一致。

我们接下来做的是使用 vTaskSuspend(nullptr) 挂起当前正在运行的线程(新创建的线程)。这是为了模拟线程在挂起状态下创建,就像 .NET 中一样。线程在挂起状态下创建更灵活,因此它不立即执行,但 FreeRTOS 似乎没有提供这一点。这就是我们模拟它的原因。此时,线程处于挂起状态,直到对其调用 FRThread.start()

完成所有这些之后,我们就可以使用 fn 函数对象来调用我们的代码,并传入我们最初在 FRThread::create() 中获得的 const void* state 参数。

最后,一旦该代码返回,就会调用 vTaskDelete(nullptr) 来删除当前线程。

FRThread 中有一个类似的线程创建函数,名为 createDispatcher(),它用于创建 FRThreadPool 类使用的线程,但我们稍后再讨论它。

FRSynchronizationContext 类

我已经在上面从这个类中移除了大部分实际的魔法,但还有更多内容需要涵盖。

它所基于的环形缓冲区由 FreeRTOS 提供,这很有趣,因为它使同步上下文的实现比我最初从 .NET 中移植过来的 C# 版本更容易编写。

最主要的是它使用 RAII 来管理环形缓冲区资源。创建调用如下所示

m_messageRingBufferHandle = 
 xRingbufferCreate(sizeof(Message) * queueSize + (sizeof(Message) - 1), RINGBUF_TYPE_NOSPLIT);

此处缓冲区大小的计算并非特别直接。老实说,我也不完全相信 queueSize 会忠实地表示“不分割”缓冲区中允许的消息数量。我已尝试确保它会这样做,尽管由于缺乏分割而带来了额外的开销,通过增加几乎足以容纳一个额外消息的字节数。FreeRTOS 文档并未完全明确需要多少开销才能使其工作,因此我当时尽我所能进行了即兴创作。我将在未来的测试中重新审视它。

环形缓冲区中消息的结构如下

struct Message
{
    std::function<void(void *)> fn;
    void *state;
    TaskHandle_t finishedNotifyHandle;
};

你可能已经注意到,这与我们之前看到的 TaskEntryThunk 结构非常相似。它包含一个函数对象引用 fn 和一个状态参数 state。如果使用 send() 创建消息,它还包含调用 send() 的线程(任务)句柄,在 finishedNotifyHandle 中。这用于通知发送方,目标线程上的代码执行已最终完成。

send()post() 基本做同样的事情,只是 send() 多了一个步骤,所以我们来回顾一下 send()

// sends a message to the thread update() is called from. 
// this method blocks until the update thread executes the 
// method and it returns.
bool send(
    std::function<void(void *)> fn, 
    void *state = nullptr, 
    uint32_t timeoutMS = 10000)
{
    if(nullptr!=m_messageRingBufferHandle) {
        Message msg;
        msg.fn = fn;
        msg.state = state;
        msg.finishedNotifyHandle = xTaskGetCurrentTaskHandle();
        uint32_t mss = millis();
        UBaseType_t res = xRingbufferSend
        (m_messageRingBufferHandle, &msg, sizeof(msg), pdMS_TO_TICKS(timeoutMS));
        mss = millis() - mss;
        if (timeoutMS >= mss)
            timeoutMS -= mss;
        else
            timeoutMS = 0;
        if (res == pdTRUE)
        {
            ulTaskNotifyTake(pdTRUE, pdMS_TO_TICKS(timeoutMS));
            return true;
        }
    }
    return false;
}

此方法接受一个函数对象、一些状态和一个超时。

我曾考虑提供一个没有超时的版本,但我担心在例如开发人员启用 C++ 异常、捕获它并基本上创建一个发送者永远不会被唤醒的情况时会发生什么。在我确信可能导致这种情况发生之前,我不会提供一个没有超时的版本。

Message 结构是使用传入数据和当前线程句柄构建的(后者不用于 post())。

为了使超时起作用,我们必须从总超时中减去将消息发布到队列所需的时间,并将差值用作传递给 ulTaskNotifyTake() 的新超时。不幸的是,除非我遗漏了什么,否则没有明确的方法来确定操作超时,因此无法检测到这一点,留下了一些明显的代码异味。

当我找到更可靠的发送方式时,我会更新代码。

如我所说,post 的功能大同小异,但更简单一些,并且无需等待代码完成执行。

最后,我们将介绍使用 processOne() 处理和分派传入消息。

bool processOne(bool blockUntilReady=false)
{
    if(nullptr!=m_messageRingBufferHandle) {
        //Receive an item from no-split ring buffer
        size_t size = sizeof(Message);
        Message *pmsg = (Message *)xRingbufferReceive(
            m_messageRingBufferHandle, 
            &size,
            blockUntilReady?portMAX_DELAY:0
        );
        if (nullptr == pmsg)
            return true;
        if (size != sizeof(Message))
            return false;
        Message msg = *pmsg;
        // free the item we retrieved and return the slot to the queue
        vRingbufferReturnItem(m_messageRingBufferHandle, pmsg);    
        // when fn is null this is a quit message, which makes us return false
        if(msg.fn) {
            msg.fn(msg.state);
            if (nullptr != msg.finishedNotifyHandle)
            {
                xTaskNotifyGive(msg.finishedNotifyHandle);
            }
            return true;
        }
    }
    return false;
}

我们做的第一个非平凡的事情是检索指向下一个可用消息的指针。如果它为空,则表示还没有可用的消息。如果 blockUntilReadytrue,我们只是等待直到有一个。否则,我们立即返回,表示成功但没有分派任何东西。

之后,我们释放该项,然后如果函数对象不为空,我们使用给定状态调用它。最后,如果消息有线程句柄,我们向它发送通知以唤醒发送者。

FRThreadPool 类

线程池编排了一组等待线程,这些线程将根据需要唤醒以分派工作项,然后返回到池中。它在内部(错误地)使用 FRSynchronizationContext 将工作请求分派给第一个可用的等待线程。同时,线程本身——尽管是使用 FRThread 类创建的——是通过一个特殊的线程过程实现的,该过程将它们转换为自管理的守护线程。

巧妙之处在于我们如何使用同步上下文。它不是“存在”于一个线程中,而是同一个同步上下文“存在”于池中的每个线程中,调用 processOne(true)——true 表示如果没有新消息,它就会进入休眠状态。当有新的工作请求时,它被发布到同步上下文,池中每个挂起的线程——这意味着任何不忙的线程——都在等待被传入消息唤醒。第一个收到消息的线程将其从环形缓冲区中移除,并在其线程上调度它。完成之后,它通过再次调用 processOne(true) 使自己回到休眠状态,等待下一条消息。

清理工作需要一些操作。线程池会记录池中每个线程的当前数量。一旦线程池超出作用域,或者调用 shutdown(),就会调用 postQuit() 到调度程序同步上下文,次数等于线程数量,有效地导致池中的每个线程在处理完当前请求后放弃其工作并销毁自己。我实际上有一个更好的关闭方式,通过在池中持有一个原子标志,线程可以读取该标志来指示退出条件,但我还没有实现它。

最有趣的部分是线程函数。其他一切都只是我们已经涵盖的概念和代码的变体。这是线程池线程的入口点

static void dispatcherEntry(void* data) {
    DispatcherEntry de = *((DispatcherEntry*)data);
    FRSynchronizationContext sc = *((FRSynchronizationContext*)de.psyncContext);
    if(nullptr!=de.pthreadCount)
        ++*de.pthreadCount;
    if(nullptr!=de.callingThreadHandle)
        xTaskNotifyGive(de.callingThreadHandle);
    while(sc.processOne(true));
    if(nullptr!=de.pshutdownThreadHandle && 
        nullptr!=*de.pshutdownThreadHandle) {
        xTaskNotifyGive(*de.pshutdownThreadHandle);
    }
    if(nullptr!=de.pthreadCount)
        --*de.pthreadCount;
    vTaskDelete(nullptr);
}

这与我们的其他线程入口点类似,具有类似的防止调用者过早返回清除堆栈的保护。线程过程的生命周期循环只是在调度程序同步上下文上调用 processOne(true),直到它返回 false。当它关闭时,它会发出通知,唤醒正在关闭线程的线程。它还会在完成后递减线程计数并销毁自身。因此,我们实际上不需要跟踪池中的线程——它们是自跟踪和自维护的。它们会一直存在,直到收到退出消息。

关注点

FreeRTOS 是一个简洁的小型软件包,但线程调度器还有待改进。我猜测部分原因是没有太多可供查询内核的 CPU 使用或线程活动指标,所以我猜测调度器没有好的方法来确定将线程放置在哪个核心上,更不用说任何防止饥饿的方法了。根据我的经验,饥饿很常见,所以在构建代码时,您可能希望在测试时从辅助线程向串口发送消息,以确保它们得到服务。

历史

  • 2021 年 2 月 27 日 - 初次提交
© . All rights reserved.