一个 Fiber 类(及其朋友)






4.91/5 (22投票s)
纤维是一种轻量级的协作式线程机制,或者说是一种协程机制,取决于你怎么看待它们。除了提供一种非常高效的类线程实现外,纤维还允许你提供“续体”,也就是执行某个函数、挂起自身,然后......
引言
这个项目源于我遇到了一个需要纤维调度器的客户。问题 arises from the need to convert non-preemptive “threads” from an existing programming domain to Windows. [这是我教学的一部分,不是合同的一部分,我在课堂上展示了这个解决方案;因此,我实际上并没有因为写这个程序而获得报酬,它仍然是我的。这样你就知道了,这不是专有作品。]
我设计的类旨在说明一个轮询纤维调度器。你可以随意将这些类改编到任何你想要的应用程序领域,使用任何你想要的规则。例如,如果你需要一个基于优先级的调度器,你也可以创建一个(稍后会详细介绍)。
这使用了纤维功能的一个子集来实现解决方案。纤维的功能比本解决方案中使用的要广泛得多,我稍后将讨论其中一些问题。
那么,什么是纤维?
纤维是一种非抢占式多路复用机制。事实上,纤维更像是协程的一种实现,这项技术可以追溯到20世纪60年代。纤维由一组寄存器和一个堆栈组成,因此非常像线程,但它不是线程。除非在线程中运行,否则纤维没有身份。当纤维被挂起时,它通过显式选择并命名(通过提供地址)目标纤维来将控制权转移给另一个纤维。执行此操作的操作称为SwitchToFiber
。纤维将继续在该线程中运行,直到执行另一个SwitchToFiber
。
纤维不像线程那样,永远不会退出。如果一个纤维实际上退出了,通过从其顶层纤维函数返回,执行它的线程将隐式调用ExitThread
并终止。请注意,对ExitThread
的此调用意味着线程本身不会通过其顶层函数退出,因此在那里必须进行的清理工作将不会执行。因此,您应该将线程的实际退出视为严重的编程错误。
MSDN列出了一系列与纤维相关的函数,但文档不完整、令人困惑、有时具有误导性,并且我相信在一种情况下是明显错误的。本文将尝试澄清这些问题,尽管我没有足够的信息来解决某些歧义或填补空白(但我已将此问题转交给我的一位MVP联系人,希望他能为我找到一些答案)。
纤维函数
纤维函数列在“进程和线程函数[base]”主题下。显然,“纤维”不被认为是标题中有价值的提及。
ConvertFiberToThread |
此操作撤销ConvertThreadToFiber(Ex) 操作,并释放原始转换中使用的资源。 |
ConvertThreadToFiber |
将当前线程转换为纤维。 |
ConvertThreadToFiberEx |
将当前线程转换为纤维,并提供一些附加选项。 |
CreateFiber |
分配一个纤维对象,为其分配一个堆栈,并将EIP设置为指定的起始地址。一个用户指定的PVOID 参数将被传递给纤维函数。 |
CreateFiberEx |
与CreateFiber 相同,但提供附加选项。 |
DeleteFiber |
删除由CreateFiber(Ex) 创建的纤维。这是摆脱纤维及其堆栈的方法,但它有一些必须考虑的限制。 |
FiberProc |
此类型的typedef 为typedef void (CALLBACK * LPFIBER_START_ROUTINE)(PVOID parameter) |
FlsAlloc |
分配一个纤维本地存储(FLS)索引;要删除它,请使用FlsFree 。 |
FlsCallback |
由FlsAlloc 建立的函数,在运行纤维的线程终止或纤维被释放时调用。 |
FlsFree |
释放由FlsAlloc 分配的纤维本地存储(FLS)索引。 |
FlsGetValue |
检索存储在特定索引处的当前FLS值。 |
FlsSetValue |
设置存储在特定索引处的当前FLS值。 |
GetCurrentFiber |
返回当前纤维的地址。 |
SwitchToFiber |
将控制权从一个纤维切换到另一个纤维。 |
创建运行中的纤维环境涉及几个步骤。
首先,必须创建一些纤维。当创建线程时,它会被隐式调度运行。当创建纤维时,没有机制使其能够自发运行。相反,纤维会保持“挂起”状态,直到有显式的SwitchToFiber
将控制权从当前运行的纤维转移到其他纤维之一。
由于实际运行的是线程,所以只有正在运行的线程才能执行SwitchToFiber
。但为了使其正确工作,它必须从一个现有的纤维到一个现有的纤维。线程不是纤维,所以虽然它可以创建一大堆纤维,但它不能让它们运行。
这通过使用ConvertThreadToFiber
或ConvertThreadToFiberEx
调用来解决。这些调用会在当前线程中创建一个“纤维工作区”,从而赋予线程纤维的特性,或者换句话说,“线程具有纤维性质”。现在,底层线程调度器将调度线程,包括现在具有纤维性质的线程。本质上,在ConvertThreadToFiber(Ex)
调用返回后,您会发现自己在一个纤维中运行,该纤维正在原始线程中运行。这个纤维现在可以自由地执行到任何其他纤维的SwitchToFiber
。
在某个时候,当一个纤维不再使用时,您必须调用DeleteFiber
来删除该纤维对象。如何执行此操作有一些限制,稍后将进行讨论。但最终,您创建的每个纤维都必须被删除。
当执行ConvertThreadToFiber(Ex)
调用时,它们也会返回一个纤维对象引用。当您调用SwitchToFiber
到这个纤维时,您就“回到了线程中”,然后您调用ConvertFiberToThread
来释放已分配给该线程的纤维特定资源。
基本上就是这样。这并不是全部,我在这里描述的内容代表了纤维宇宙全部功能的子集,但我稍后会对此进行阐述。
为什么使用纤维?
速度。简单。无需同步。
速度
纤维是轻量级的。从一个纤维切换到另一个纤维的操作需要32条指令,完全不涉及内核调用。在一台2GHz的机器上,这意味着一次交换最快可达8纳秒(最坏情况下,如果数据都不在缓存中,可能需要几十纳秒)。
如果您想知道这些数字从何而来,在2GHz的机器上,32条指令需要16纳秒,但Pentium 4是“流水线超标量”架构,每时钟周期可以发出两条整数指令,得出了8纳秒的值。指令流水线和预取会掩盖指令获取时间。但是,在最坏的情况下,当缓存中没有任何内容时,速度取决于缓存命中和替换的模式,因此受限于平台特定的内存周期速度。
这意味着,如果您有很多小任务要做,纤维可能比线程更好,因为线程涉及内核调用和调度程序的调用。纤维在它们运行的线程的时间片内执行,因此在许多短计算纤维之间切换不涉及任何潜在的内核阻塞。在许多情况下消除了同步的需要,也消除了调度程序介入的需要。
简单
纤维的简单性在很多方面类似于线程的简单性:您不必编写复杂的交错代码来同时处理多件事情,而是编写只做一件事的简单代码,并使用许多不同的纤维来处理不同的事情。这使您能够专注于做好一件事,而不是以一种相当复杂且可能脆弱的方式处理许多事情。
无需同步
在抢占式多线程环境中,如果两个线程可以访问同一数据,则该数据必须受到某种形式的同步保护。这可能很简单,比如使用Interlocked...
操作之一(在内存总线上锁定),或者可能需要使用CRITICAL_SECTION
或互斥锁进行同步。后两者的缺点是,如果无法授予访问权限,内核将被调用以取消调度线程,该线程将被排队在该同步对象上,以便在同步对象释放后稍后执行。CRITICAL_SECTION
的区别在于,如果同步对象可以获得(在大多数情况下是最常见的情况),则不需要内核调用,而互斥锁总是需要内核调用。
线程之间的同步是线程发生摩擦的地方。同步就是摩擦。就像在机械系统中一样,摩擦会产生热量并浪费能量。也许最好的解决方案是避免它。
纤维是避免这种情况的一种方式。::SwitchToFiber
是一个“积极的移交”。在纤维切换控制之前,它不会被在该线程中运行的任何其他纤维抢占。因此,一旦一个纤维开始运行,只要它修改的状态只与其他在该线程中执行的纤维共享,就不需要同步状态。同步隐含在由程序员显式控制的纤维调度中。
这不是一个完美的解决方案。一个纤维可以被调度到多个线程中运行;如果共享状态现在可以被运行在不同线程中的纤维访问,那么纤维的特性就无关紧要了;您就遇到了多线程同步问题,并且必须使用CRITICAL_SECTION
或互斥锁。
当您有纤维时,您没有并发。如果一个纤维阻塞,例如在I/O调用上,实际上是运行该纤维的线程阻塞了。在该线程中无法调度其他纤维运行,因为线程本身被阻塞了。您不会获得同一线程中运行的纤维之间的任何并发(尽管您可以在其他线程中运行的纤维之间获得并发)。但是,如果您没有阻塞调用,纤维对于多路复用简单的计算密集型任务特别有用。
CFiber类
第一个类是基本纤维对象的包装器
class CFiber { public: // constructors CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) { fiber = ::CreateFiber(stack, rtn, this); } CFiber() { fiber = NULL; } virtual ~CFiber() { ::DeleteFiber(fiber); } public: // methods BOOL Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) { ASSERT(fiber == NULL); fiber = ::CreateFiber(stack, rtn, this); return fiber != NULL; } BOOL ConvertThreadToFiber() { ASSERT(fiber == NULL); fiber = ::ConvertThreadToFiber(this); return fiber != NULL; } void Attach(LPVOID p) { ASSERT(fiber == NULL); fiber = p; } LPVOID Detach() { LPVOID result = fiber; fiber = NULL; return result; } LPVOID GetFiber() { return fiber; } public: // methods void run() { ASSERT(fiber != NULL); ::SwitchToFiber(fiber); } protected: // data LPVOID fiber; };
CFiber方法
CFiber |
构造函数 |
~CFiber |
析构函数 |
Create |
在现有 |
ConvertThreadToFiber |
用于将线程转换为纤维,以便进行纤维调度 |
Attach |
将现有纤维附加到 |
Detach |
将纤维从CFiber 对象分离 |
GetFiber |
返回当前纤维 |
run |
切换到选定的纤维 |
构造函数/创建
有几个构造函数。这里的关键是假设纤维将看到的“纤维参数”实际上是CFiber
对象,或其派生子类。
CFiber::CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0);
此构造函数将创建一个纤维,该纤维将执行指定的例程rtn
。它有一个可选的堆栈大小参数,默认为进程堆栈大小。
参数
|
|
纤维例程。 |
|
|
所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。 |
另请参阅CFiber::CFiber
的实现。
CFiber::CFiber();
此构造函数创建一个不与任何纤维关联的CFiber
对象。这通常在用户希望纤维参数不是CFiber
对象时使用。
CFiber::~CFiber()
析构函数删除底层纤维对象。
注意:对纤维执行delete
操作必须小心,因为删除当前正在执行的纤维将终止正在执行该纤维的线程。纤维不得删除自身。
BOOL CFiber::Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
给定一个不与纤维关联的CFiber
对象,这将创建一个新纤维并将其附加到CFiber
对象。
参数
|
|
纤维例程。 |
|
|
所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。 |
注释
以下是等效的
(1) CFiber * fiber = new CFiber(function); (2) CFiber * fiber = new CFiber; LPVOID f = ::CreateFiber(0, function, fiber); Fiber->Attach(); BOOL CFiber::ConvertThreadToFiber()
备注
将当前线程转换为可调度的(::SwitchToFiber
的目标)纤维。当前CFiber
对象成为纤维参数。
结果:如果底层::ConvertThreadToFiber
调用成功,则为TRUE
;如果失败,则为FALSE
(使用GetLastError
确定出了什么问题)。
注意:在完成纤维并删除所有纤维后,应调用::ConvertFiberToThread
来释放已分配给线程的纤维特定数据结构。
void CFiber::Attach(LPVOID p)
此操作将一个纤维附加到一个CFiber
对象。现有的CFiber
对象不得已附加了纤维。
参数
|
|
纤维引用 |
注意:与MFC不同,它不维护映射来查看给定纤维是否绑定到多个CFiber
对象。程序这样做是错误的,但没有进行额外的检查。
通过构造函数创建时,纤维参数(由::GetFiberData
返回)是CFiber
对象。如果应用程序希望将不同的对象关联为纤维参数,则可以创建并附加该纤维。
LPVOID CFiber::Detach()
此操作会断开CFiber
与底层纤维的关联。
结果:先前与CFiber
对象关联的纤维。
注意:纤维保留其创建时的纤维参数;因此,如果一个纤维从一个CFiber
对象Detach
并Attach
到另一个,并且它有一个纤维参数是原始CFiber
对象,那么现在这将是一个错误的引用。当预期的行为是CFiber
对象是纤维参数时,不应使用Attach
和Detach
。
LPVOID CFiber::GetFiber();
结果:与CFiber
对象关联的纤维。
void CFiber::run();
使用::SwitchToFiber
将控制权切换到纤维。
队列条目类
这里的目标是有一个“时间共享”的纤维集,纤维只是在适当的时候让出控制权,然后另一个纤维就会运行。在正常操作模式下,纤维通过调用::SwitchToFiber
来让出控制权,并指定下一个要运行的纤维。但是,这需要实际知道下一个纤维应该是什么。在某些应用程序中,这是规范的一部分。在我正在编写的系统中,我只知道我想将控制权让给另一个纤维。选择是做一个轮询纤维调度器,这意味着一个队列。这需要一个队列条目类来表示队列中的纤维。我从CFiber
类派生了它。
class QE : public CFiber { public: // constructor/destructor QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) : CFiber(rtn, stack) { next = NULL; } QE() { next = NULL; } virtual ~QE() { } public: virtual void Display(LPCTSTR s) { } public: // internal state QE * next; }; typedef QE * PQE;
QE方法和成员变量
|
构造函数 |
|
析构函数 |
|
用于调试输出的虚拟方法 |
|
用于将元素插入队列的链接 |
QE::QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0);
此构造函数将创建一个队列条目和一个与该队列条目关联的新纤维,该纤维将执行指定的例程rtn
。它有一个可选的堆栈大小参数,默认为进程堆栈大小。
参数
|
|
纤维例程。 |
|
|
所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。 |
QE::QE();
此构造函数将创建一个不与任何纤维关联的队列条目。可以使用CFiber::Attach
调用来附加纤维。请参阅有关纤维数据的注意事项。
注意:如果::GetFiberData
调用返回的值不是QE
引用,则此代码将无法正常工作。
virtual QE::~QE();
析构函数将删除QE
对象及其关联的纤维。请参阅有关删除运行中纤维的注意事项。不应将delete
运算符应用于表示运行中纤维的QE
。
virtual void QE::Display(LPCTSTR s);
此方法的目的是让QE
的子类实现此虚拟方法,以便进行调试。此类在此虚拟方法的实现中什么都不做。虽然原则上它可以是一个纯虚拟方法,但可能存在创建原始QE
对象的理由,如果该方法是纯虚拟方法,则无法做到这一点。
QE * QE::next;
这提供了由Queue
类管理的队列结构。
注意:C++有一个称为friend
的机制,它允许另一个类访问私有变量。然而,这是设计不佳的,因为它意味着QE
类必须知道将使用它的类的名称。这降低了类的通用性;因此,与其将其标记为protected
并要求已知的类名,不如将其设为public
,从而允许其他队列的实现。
Queue类
我想实现一个FIFO队列来进行轮询调度。我通常会使用MFC的CList
类,但是这段代码必须在非MFC环境中工作。
class Queue { public: Queue() { queueHead = queueTail = emptyFiber = killFiber = NULL; } public: void appendToQueue(PQE qe); PQE removeFromQueue(); public: void next(); void yield(); void kill(); public: void SetEmptyFiber(PQE qe) { emptyFiber = qe; } protected: PQE queueHead; PQE queueTail; PQE emptyFiber; protected: // used for fiber-kill logic PQE killFiber; PQE killTarget; static void CALLBACK killer(LPVOID p); };
这旨在支持纤维的排队。yield
方法将当前队列条目放入队列尾部。next
方法分派队列头部的纤维。kill
方法对当前运行的元素执行delete
,但它非常小心地通过为此目的使用单独的纤维来执行此操作。然后它启动队列中的下一个纤维。
这段代码实现了简单的FIFO排队。可以创建其他类来实现更复杂的排队方式。
Queue方法
队列管理方法 |
|
|
构造函数 |
|
将元素添加到队列尾部 |
|
从队列头部移除元素 |
调度方法 |
|
|
从队列头部出队并切换到该纤维;如果队列为空,则切换到空队列纤维 |
|
将当前纤维调度到队列末尾,并运行队列头部的纤维 |
|
删除当前纤维并调度下一个纤维 |
|
指定队列为空时要切换到的纤维 |
队列管理方法
Queue::Queue();
构造一个空队列。
void Queue::appendToQueue(PQE qe);
将参数指向的QE
放在队列的末尾。
PQE Queue::removeFromQueue();
从队列头部移除队列元素。
结果:指向已出队的QE
条目的指针,如果队列为空,则为NULL
。
队列调度方法
void Queue::next();
从队列头部出队队列元素,并将控制权切换到它所代表的纤维。如果没有队列中的元素,它将切换到“空队列”元素所代表的纤维(参见SetEmptyFiber
)。
void Queue::yield();
当纤维希望让出控制权时,它调用此方法,控制权将转移到队列中的下一个纤维。当前纤维被放置在队列的末尾。当纤维被重新调度时,执行将在yield
调用之后的行上恢复。
void Queue::kill();
当纤维完成计算时,它调用此方法。控制权将转移到队列中的下一个纤维。当前纤维不会被放在队列的末尾,因此不再被调度。
注意:要“挂起”一个纤维以便“重新启动”它,需要构建一个更复杂的机制。
void Queue::SetEmptyFiber(PQE qe);
参数
|
|
一个 |
使用队列
队列首先通过使用appendToQueue
将QE
对象添加到队列来填充。一旦纤维开始运行,它们通常会按轮询FIFO顺序执行。当前机制没有“挂起”纤维的方法;这项泛化留给读者练习。
要启动队列,主线程必须创建一个代表CFiber::ConvertThreadToFiber
调用的QE
。这通常被设置为队列为空时要恢复的默认纤维,因此使用SetEmptyFiber
方法来建立此设置。
代码
以下各节说明了此示例中的代码。
CFiber类
CFiber
类所有方法都在头文件中定义。CFiber
类包装了原始纤维表示,这是一个LPVOID
protected: // data LPVOID fiber;
构造函数/析构函数
CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) { fiber = ::CreateFiber(stack, rtn, this); }
请注意,此构造函数使用::CreateFiber
创建纤维,并将this
作为纤维参数传递。
CFiber() { fiber = NULL; }
这创建了一个未绑定到特定纤维的CFiber
对象;Attach
方法稍后可用于将纤维绑定到此CFiber
对象。请注意,这既强大又有风险:它允许纤维参数不是CFiber
对象,但后面的一些类并非为在这种条件下工作而设计。
virtual ~CFiber() { ::DeleteFiber(fiber); }
这仅仅是删除纤维。然而,当前执行的纤维不能在自身上执行此方法,否则运行该纤维的线程将被终止。
创建/附加
BOOL Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) { ASSERT(fiber == NULL); if(fiber != NULL) return FALSE; fiber = ::CreateFiber(stack, rtn, this); return fiber != NULL; }
在调试模式下,这将处理ASSERT
测试,以确保此CFiber
尚未绑定到纤维。在发布模式下,ASSERT
消失,但如果纤维已绑定,该方法将返回FALSE
。
BOOL ConvertThreadToFiber() { ASSERT(fiber == NULL); if(fiber != NULL) return FALSE; fiber = ::ConvertThreadToFiber(this); return fiber != NULL; }
将要开始调度纤维的线程需要执行ConvertThreadToFiber
。这会调用API来将当前线程映射到一个纤维,并将当前CFiber
对象作为纤维参数传递。
void Attach(LPVOID p) { ASSERT(fiber == NULL); fiber = p; }
此调用应仅用于未绑定到纤维的CFiber
对象。在不先进行Detach
的情况下尝试将CFiber
绑定到另一个纤维是名义上错误的。
LPVOID Detach() { ASSERT(fiber != NULL);
LPVOID result = fiber;
fiber = NULL;
return result;
}
此操作会断开CFiber
与其底层纤维的关联,并返回纤维引用。如果纤维未绑定,则返回值为NULL
。名义上,如果不存在关联,则Detach
是错误的,如果出现这种情况,ASSERT
将在调试模式下导致失败。请注意,没有机制可以更改已Detach
的纤维的Attach
的纤维参数,并且此处描述的其他子类在纤维参数不是CFiber
对象的引用时将无法正常工作。
void run() { ASSERT(fiber != NULL);
::SwitchToFiber(fiber);
}
此操作将当前执行的纤维切换到在其上指定了run
方法的纤维。
队列条目类
此类派生自CFiber
类,因此继承了构造函数和其他方法。QE
构造函数是
QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
: CFiber(rtn, stack) { next = NULL; }
因此,这只是CFiber
的构造,并增加了初始化用于维护队列的链接。
Display
方法用于生成调试输出,但实现实际输出的责任在于子类。
Queue类
Queue
类本质上是一个单向链接队列管理器,结合了纤维调度逻辑。单向链接队列管理器相当简单。我将在此展示纤维调度函数。
yield方法
请注意,为了简化代码,我选择假设纤维参数是实际的QE
对象(或QE
子类的实例)。这意味着如果Attach/Detach
的使用方式使纤维参数不一致,这段代码将无法正常运行,因为没有办法重置纤维参数(虽然有::GetFiberData
API,但没有相应的::SetFiberData
!)。
此方法所做的就是将当前运行的纤维加入队列的末尾,并激活队列中的下一个纤维(使用next
方法)。
void Queue::yield() { PQE qe = (PQE) (::GetFiberData()); qe->Display( _T("yield: returning to queue - ") ); appendToQueue(qe); next(); } // Queue::yield
kill方法
当纤维终止并必须删除时,就会调用此方法。
void Queue::kill () {#ifdef _DEBUG PQE qe = (PQE) (::GetFiberData()); qe->Display(_T("kill")); #endif killTarget = qe; if(killFiber == NULL) { /* create killer fiber */ killFiber = new QE(); LPVOID f = ::CreateFiber(0, killer, this); killFiber->Attach(f); } /* create killer fiber */ killFiber->run(); } // Queue::kill
问题在于纤维不能删除自己。如果对当前正在运行的纤维调用::DeleteFiber
,那么该纤维所在的整个线程将退出,因为这将调用ExitThread
。为了杀死纤维,它将控制权切换到另一个纤维,即killFiber
。这个纤维能够对QE
对象(请记住,它是一个CFiber
的子类,并且本身可能是某个更深层QE
子类的实例)执行delete
操作,这是通过调用::DeleteFiber
实现的,因为此时,被删除的纤维不是正在运行的纤维。
请注意,这似乎与DeleteFiber
的文档不符,后者说明:
如果当前正在运行的纤维调用
DeleteFiber
,则其线程调用ExitThread
并终止。但是,如果当前正在运行的纤维被另一个纤维删除,那么运行被删除纤维的线程很可能会异常终止,因为纤维堆栈已被释放。
这似乎完全没有意义。由于纤维的性质,“正在运行的纤维”不能被另一个纤维(在同一线程中)删除,因为在它被删除时,它本身就不是“正在运行”的纤维!但是,我怀疑如果将该段落替换为(我建议的更改斜体)
如果当前正在运行的纤维调用
DeleteFiber
,则其线程调用ExitThread
并终止。但是,如果当前正在运行的纤维被另一个线程删除,那么运行被删除纤维的线程很可能会异常终止,因为纤维堆栈已被释放。如果一个纤维已被删除,任何尝试使用SwitchToFiber
将其控制权切换回该纤维的操作都可能导致其线程异常终止,因为纤维堆栈已被释放。
我目前正在与微软研究这个问题。当前的文档段落,如其所写,将使得永远无法调用DeleteFiber
!
那么问题就来了:既然纤维在很多方面看起来都像协程,那么参数是如何传递给杀手的呢?在这种情况下,在Queue
类中添加了一个成员变量killTarget
。因为我们处理的是非抢占式纤维,并且我们假设没有从另一个线程对Queue
对象进行并发访问(这是此代码的基本假设),所以不需要担心任何形式的同步;此外,除了正在切换到的纤维之外,没有任何纤维可以获得控制权,因此使用这样的变量没有问题(在抢占式多线程系统中,这将是一个致命的设计错误,但在非抢占式纤维系统中则完全有效)。
为了避免任何显式初始化或最终化的需要,我只在kill fiber
尚不存在时创建它,并且稍后会显示删除它的位置。为什么不这样做,而是将其放在构造函数和析构函数中?
主要是寿命问题。我认为纤维的存在时间不应超过其需要的时间。此外,在确定何时可以删除纤维方面似乎存在一些严重问题,特别是因为队列的析构函数可能在程序执行了::ConvertFiberToThread
调用来释放纤维资源之后运行,因此在这种条件下执行::DeleteFiber
的效果似乎是未定义的。
这确实引出了kill fiber
何时可以被删除的问题。我选择在队列变空时删除它。请注意,这会导致调度回“主纤维”,实际上是空队列纤维。由于队列为空,将不再需要杀死纤维,并且killFiber
纤维可以被安全地删除。请注意,因此,程序可能会选择创建更多的队列条目;在这种情况下,killFiber
会自动重新创建,“按需”。
杀手纤维非常简单;这里的关键是,因为它是一个单独的纤维,所以现在可以安全地对已完成其操作的纤维执行::DeleteFiber
。
void CALLBACK Queue::killer(LPVOID) { delete killTarget; next(); } // Queue::killer
next方法
next
方法从队列中出队下一个元素,并通过调用其纤维上的::SwitchToFiber
来调度它运行。如果没有更多的纤维排队,它将恢复到由SetEmptyFiber
建立的纤维。请注意,未能调用SetEmptyFiber
将被视为致命错误。
void Queue::next() { PQE qe = (PQE)removeFromQueue(); if(qe != NULL) { /* got one */ qe->Display(_T("Dequeued QE")); qe->run(); } /* got one */ else { /* all done */ delete killFiber; killFiber = NULL; TRACE( (_T("Queue empty: switch to EmptyFiber\n")) ); ASSERT(emptyFiber != NULL); // Note that a failure to have set the empty fiber is a fatal // error and is unrecoverable! emptyFiber->run(); } /* all done */ } // Queue::next
其他一些问题
为什么我对killFiber
和emptyFiber
使用QE
结构?既然它们实际上没有被调度,为什么不使用原始的CFiber
对象呢?
在emptyFiber
的情况下,有人可能会选择,而不是我采取的方法,创建一个优先级调度的队列,并将空纤维简单地设为最低优先级的纤维。我认为如果emptyFiber
看起来像一个可调度的纤维,更容易看到如何做到这一点。由于调用者必须创建纤维,这也意味着底层实现可以在不更改用户界面的情况下进行更改。
killFiber
的选择更难证明;在这种情况下,它更多地成为一致性问题而不是功能问题,特别是因为这从未逃脱到用户界面级别。但是,这样做允许我决定执行“延迟杀死”,因为我可以考虑将kill fiber
调度为一项任务来完成,并且可以将其插入队列的头部或稍后的位置。但是,请注意,如果我将其插入到第一个队列项之外的其他位置,那么全局killTarget
的使用将不再有效。这是因为killTarget
为每个要杀死的纤维显式设置,如果我可能有一个以上的待处理杀死请求,这是不够的(在这种情况下,我可能会按需创建纤维,并使用纤维参数,该参数必须是QE
对象;我必须创建一个class Killer : public QE
来维护我需要的上下文)。
问题域
示例程序的目标是获取命令行上的一组文件名,并打印出文件,每个文件一个纤维。为了使其看起来像一个并发程序,该程序被定义为一次只打印几行,然后纤维被解除调度,另一个纤维运行。当到达文件末尾时,纤维不再被调度。程序完成后,所有纤维都被删除。
为了解决这个问题,一个类被派生自QE
类
class CReaderFiber : public QE { public: // constructors CReaderFiber(LPCTSTR f, int c, Queue * q) : QE(reader) { name = f; count = c; queue = q; file = NULL; } virtual ~CReaderFiber() { if(file != NULL) fclose(file); } public: // parameters LPCTSTR name; // name of file int count; // number of lines to write Queue * queue; // the queue shared by all these fibers public: virtual void Display(LPCTSTR s); public: // local state FILE * file; // currently-opened file object char buffer[MAX_LINE]; // local buffer protected: static void CALLBACK reader(LPVOID p); };
这个派生类持有所有特定于问题的 P信息,例如在每次纤维执行期间要写入的行数、文件名、保存输入的缓冲区等。纤维函数本身是一个静态类成员。
/* static */ void CALLBACK CReaderFiber::reader(LPVOID p) { CReaderFiber * rf = (CReaderFiber *)p; TRACE( (_T("reader: called for %s\n"), rf->name) ); rf->file = _tfopen(rf->name, _T("ra")); if(rf->file == NULL) { /* failed */ DWORD err = ::GetLastError(); reportError(err, rf->name); TRACE( (_T("reader: could not open %s\n"), rf->name) ); rf->queue->kill(); return; } /* failed */
文件打开后,我们只需进入纤维中的无限循环,读取和打印行。请注意,此代码假定程序的Unicode版本将读取Unicode文件,而ANSI版本将读取ANSI(8位本机代码页)文件。泛化,即任何版本都可以读取或写入任何类型的文件,留给读者练习。请注意,在纤维循环中,在打印完count
行之后,纤维就会让出。因此,另一个纤维将会运行。
while(TRUE) { /* fiber loop */ for(int i = 0; i < rf->count; i++) { /* read lines */ if(_fgetts(rf->buffer, MAX_LINE, rf->file) == NULL) { /* all done */ TRACE( (_T("reader: done with %s\n"), rf->name) ); rf->queue->killFiber(); ASSERT(FALSE); } /* all done */ _tprintf(_T("%s"), rf->buffer); } /* read lines */ TRACE( (_T("reader: yielding for %s after %d lines\n"), rf->name, rf->count) ); rf->queue->yield(); } /* fiber loop */ } // reader
主程序
命令行语法为
argv[0]
是程序名称argv[1]
是每次迭代要显示的行数argv[2..argc-1]
是文件名
int _tmain(int argc, TCHAR * argv[]) { Queue queue; if(argc < 3) { /* usage */ usage(); return 1; } /* usage */ int lines = _ttoi(argv[1]); if(lines <= 0) { /* failed */ _tprintf(_T("Illegal buffer count (%d)\n"), lines); usage(); return 1; } /* failed */
在完成参数验证的枯燥细节后,真正的工作就开始了。首先,创建一个数组来保存所有CFiber
派生对象。为了简化代码并消除对所有偏移量的担忧,我只创建了一个与argc
数组大小相同的数组,并忽略其中的前两个条目。
CReaderFiber ** fibers = new CReaderFiber*[argc];
为了处理“主纤维”,即转换为纤维的此线程,我将需要一个QE
对象来允许其被调度。ConvertThreadToFiber
方法将完成此操作。
PQE mainFiber = new QE(); if(!mainFiber->ConvertThreadToFiber()) { /* failed conversion */ DWORD err = ::GetLastError(); reportError(err, _T("Converting thread to fiber")); return 1; } /* failed conversion */
接下来,我扫描文件名列表。对于命令行中的每个文件,我创建一个新的CReaderFiber
对象并将其放入纤维数组中。每次创建纤维实例时,我都将其放入队列。
for(int i = 2; i < argc; i++) { /* create fibers */ fibers[i] = new CReaderFiber(argv[i], lines, &queue); if(fibers[i] == NULL) { /* failed */ DWORD err = ::GetLastError(); reportError(err, _T("Creating fiber")); return 1; } /* failed */ queue.appendToQueue(fibers[i]); } /* create fibers */
当所有线程完成后,控制权将返回到此线程,“主纤维”。为了实现这一点,我们必须设置mainFiber
(ConvertThreadToFiber
的结果),以便在队列为空时(所有文件的内容都已打印完毕)将控制权送回此处。
queue.SetEmptyFiber(mainFiber);
此时,我们开始调度纤维。纤维将运行,直到打印完所有文件内容。
queue.next();
当打印完所有内容后,::SwitchToFiber
调用会将执行切换到此纤维,然后执行下面的代码。
TRACE( (_T("Finished\n")) ); delete mainFiber; ::ConvertFiberToThread(); return 0; }
输出示例
提供的命令行参数为
5 a.txt b.txt c.txt
其中文件内容的形式为“File <filename> <line number>”。输出如下所示;分组为5是基于命令行第一个参数的。请注意显示的更改:文件a.txt有200行,b.txt有250行,c.txt有300行。这些是由一个名为“datagen”的小程序生成的,命令行为
datagen 200 "File A" > a.txt
datagen 250 "File B" > b.txt
datagen 300 "File C" > c.txt
输出
File A 1
File A 2
File A 3
File A 4
File A 5
File B 1
File B 2
File B 3
File B 4
File B 5
File C 1
File C 2
File C 3
File C 4
File C 5
File A 6
File A 7
File A 8
File A 9
File A 10
File B 6
File B 7
File B 8
File B 9
File B 10
File C 6
File C 7
File C 8
File C 9
File C 10
File A 11
File A 12
File A 13
File A 14
File A 15
…
File C 191
File C 192
File C 193
File C 194
File C 195
File A 196
File A 197
File A 198
File A 199
File A 200
File B 196
File B 197
File B 198
File B 199
File B 200
File C 196
File C 197
File C 198
File C 199
File C 200
File B 201 <- Note that file A, with 200 lines is no longer in the mix after this point
File B 202
File B 203
File B 204
File B 205
File C 201
File C 202
File C 203
File C 204
File C 205
File B 206
File B 207
File B 208
File B 209
File B 210
File C 206
File C 207
File C 208
File C 209
File C 210
File B 211
File B 212
File B 213
File B 214
File B 215
…
File B 246
File B 247
File B 248
File B 249
File B 250
File C 246
File C 247
File C 248
File C 249
File C 250
File C 251 <- Note that file B, with 250 lines, is no longer in the mix after this point
File C 252
File C 253
File C 254
File C 255
File C 256
File C 257
File C 258
File C 259
File C 260
一些额外的纤维问题
此代码做出了许多简化假设,这些假设不应被视为在一般纤维世界中是限制性的。例如
- 所有纤维都将在单个线程中运行,即创建它们的同一线程
- 纤维一旦创建,将保持附加到同一个
CFiber
对象
这两者都是实现选择。虽然可以将一个纤维由一个线程执行,然后由另一个线程执行(::SwitchToFiber
似乎不关心纤维是在哪个线程创建的),但这样做存在某些风险。如果您假设您的纤维没有同步问题,因为它们是纤维,然后突然开始在单独的线程中执行它们,那么您必须非常小心,您没有引入同步问题,而且在维护过程中,要小心避免出现任何未来的同步问题。
请注意,如果您选择在不同的线程中运行纤维(即,给定的纤维可能在不同的线程中运行;而不是每个线程只在它们自己内部运行纤维的情况),请务必阅读有关/GT编译器选项“支持纤维安全线程局部存储”的内容,然后再继续。当在可能稍后在不同线程中调度的纤维中使用__declspec(thread)
变量时,这会产生影响。
我选择使用::GetFiberData
来获取纤维数据是一个简化的假设,不应被视为如何完成此操作的最终方法。它简化了一个案例的代码,即我正在实现的案例。
通常的杂项
与任何实际程序一样,有几件事不是程序的原始目标,但由于功能、优雅、便利或其他原因而需要完成。本程序也不例外,尽管其中一些组件已经独立开发并在此简单使用。
ASSERT
MFC的ASSERT
宏设计得很好。它只报告一个问题,但它不会中止程序。这是一个宝贵的特性。程序员可以使用ASSERT
宏来辅助调试,但可以选择在返回时进行“优雅恢复”,并允许程序员进行从手动调整调试器中的值到进行程序更改并使用编辑-执行来重新执行行。总的来说,这是一个智能而优雅的设计。
另一方面,C库的assert
函数设计不佳,使用了我称之为“PDP-11思维模式”的方法,指的是最早普及C语言的小型计算机。当时的理念是程序很小,在内部错误时终止程序是可以的。在GUI设计世界中,这总是一个糟糕的设计决定;只有用户才能终止程序。
因此,我需要一个非MFC应用程序的ASSERT
宏。此外,我希望这段代码能够轻松地集成到MFC应用程序中(请注意,本文中的C++类都是独立的,而不是基于MFC类,如CObject
)。
这里的第一个技巧是头文件,assert.h
#pragma once #ifndef ASSERT #ifdef _DEBUG #define ASSERT(x) if(!(x)) \ fail(_T("Assertion Failure %s(%d)\n"), __FILE__, __LINE__) #define VERIFY(x) ASSERT(x) extern void fail(LPCTSTR fmt, ...); #else #define ASSERT(x) #define VERIFY(x) x #endif // _DEBUG #endif // ASSERT
fail
方法很简单
void fail(LPCTSTR fmt, ...) { va_list args; va_start(args, fmt); TCHAR buffer[1024]; // should be large enough _vstprintf(buffer, fmt, args); va_end(args); OutputDebugString(buffer); }
然而,请注意这有多危险!如果数据超过1024个字符怎么办?答案很简单:这将是一个致命的缓冲区溢出情况!
因为我只使用文件名(最大MAX_PATH
个字符(名义上是260个字符))和行号,所以这是安全的。但它不普遍安全!
不幸的是,这在Visual Studio 6中完成,没有解决方案。但在Visual Studio .NET中,在微软饱受其害的缓冲区溢出造成的众多安全漏洞之后,添加了适当的原语。这里需要的重要新调用是_vscprintf/_vscwprintf
,或者在*tchar.h*中“双模态”定义为_vsctprintf
,它会编译为适合ANSI或Unicode应用程序。它们返回格式化字符串(不包括NULL
终止符)所需的字符数。因此,VS.NET的正确编写函数如下所示
void fail(LPCTSTR fmt, ...) { va_list args; va_start(args, fmt); int n = _vsctprintf(fmt, args); LPTSTR buffer = new TCHAR[n + 1]; _vsctprintf(buffer, fmt, args); va_end(args); OutputDebugString(buffer); delete [] buffer; }
在MFC中,这将很容易通过使用CString::FormatV
方法来完成,该方法实现了一个类似于_vsctprintf
的算法,但此代码的前提是我们不使用MFC。
请注意,此代码不会弹出消息框或与用户进行交互。但它允许程序继续。如果我需要它停止,我经常在函数末尾设置一个断点;或者您可以创建一个更复杂的,使用DebugBreak
。
TRACE宏
在debug.cpp文件中,我实现了一个简单的输出跟踪。请注意,它与上面描述的fail
方法非常相似,包括缓冲区溢出错误(如果使用VS.NET,则相同的解决方案适用)。但是,我们遇到了TRACE
的一个额外问题,而ASSERT
没有:可变数量的参数。
C中的宏系统代表了20世纪50年代宏系统的最高水平,远不如许多编程语言所拥有的复杂。一个好的宏系统实际上是图灵完备的,因为您可以在宏中编写编写代码的程序。C宏系统,嗯,说它是“原始的”有点不公平。在许多情况下,石斧要精巧得多。尽管如此,这就是我们必须处理的,所以我们必须尽力而为。
这里的技巧是从Microsoft DDK的KdPrint
宏中借用的。要将可变数量的参数传递给宏调用,您必须将它们包装在额外的括号中,以便在语法上它们看起来像宏的单个参数。因此,TRACE
宏及其伴随函数的定义如下(来自debug.h)
void CDECL trace(LPCTSTR fmt, ...); #ifdef _DEBUG #define TRACE(x) trace x #else #define TRACE(x) #endif
宏调用可能如下
TRACE( (_T("The result for 0x%p is %d (%s)\n"), object, object->value, object->name) );
请注意我倾向于在TRACE
宏的参数括号和参数列表的参数括号之间使用空格。
还请注意使用%p来获取要打印的地址。养成这个习惯非常重要,而不是使用0x%08x或其他非可移植方法。这在程序移植到Win64时将无法正常工作(格式必须手动编辑为0x%16I64x;注意%之后的第一个参数是数字16,下一个序列是限定符I64,表示传入的参数是64位值,最后是x,表示所需的格式)。
报告错误
通常,我使用一个更通用的实现来包装::FormatMessage
,但对于这个简单的示例,一个控制台应用程序,没有GUI界面,并且不使用MFC,一个更简单的实现就足够了。
本着“我的代码中永远不会有两条发出相同的错误消息”的理念,我允许用户传入一个唯一的标识符字符串。对于这个玩具示例,我没有费心通过STRINGTABLE
提供本地化。
void reportError(DWORD err, LPCTSTR reason) { LPTSTR msg; if(FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, NULL, // source of message table err, // error code to format 0, // language ID: default for thread (LPTSTR)&msg, // place to put pointer 0, // size will be computed NULL) == 0) // arglist { /* error */ _tprintf(_T("Error %d (0x%08x)\n"), err); return; } /* error */ _tprintf(_T("Error %s\n%s\n"), reason, msg); LocalFree(msg); } // reportError