65.9K
CodeProject 正在变化。 阅读更多。
Home

一个 Fiber 类(及其朋友)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.91/5 (22投票s)

2008年7月1日

CPOL

33分钟阅读

viewsIcon

51294

downloadIcon

470

纤维是一种轻量级的协作式线程机制,或者说是一种协程机制,取决于你怎么看待它们。除了提供一种非常高效的类线程实现外,纤维还允许你提供“续体”,也就是执行某个函数、挂起自身,然后......

引言

这个项目源于我遇到了一个需要纤维调度器的客户。问题 arises from the need to convert non-preemptive “threads” from an existing programming domain to Windows. [这是我教学的一部分,不是合同的一部分,我在课堂上展示了这个解决方案;因此,我实际上并没有因为写这个程序而获得报酬,它仍然是我的。这样你就知道了,这不是专有作品。]

我设计的类旨在说明一个轮询纤维调度器。你可以随意将这些类改编到任何你想要的应用程序领域,使用任何你想要的规则。例如,如果你需要一个基于优先级的调度器,你也可以创建一个(稍后会详细介绍)。

这使用了纤维功能的一个子集来实现解决方案。纤维的功能比本解决方案中使用的要广泛得多,我稍后将讨论其中一些问题。

那么,什么是纤维?

纤维是一种非抢占式多路复用机制。事实上,纤维更像是协程的一种实现,这项技术可以追溯到20世纪60年代。纤维由一组寄存器和一个堆栈组成,因此非常像线程,但它不是线程。除非在线程中运行,否则纤维没有身份。当纤维被挂起时,它通过显式选择并命名(通过提供地址)目标纤维来将控制权转移给另一个纤维。执行此操作的操作称为SwitchToFiber。纤维将继续在该线程中运行,直到执行另一个SwitchToFiber

纤维不像线程那样,永远不会退出。如果一个纤维实际上退出了,通过从其顶层纤维函数返回,执行它的线程将隐式调用ExitThread并终止。请注意,对ExitThread的此调用意味着线程本身不会通过其顶层函数退出,因此在那里必须进行的清理工作将不会执行。因此,您应该将线程的实际退出视为严重的编程错误。

MSDN列出了一系列与纤维相关的函数,但文档不完整、令人困惑、有时具有误导性,并且我相信在一种情况下是明显错误的。本文将尝试澄清这些问题,尽管我没有足够的信息来解决某些歧义或填补空白(但我已将此问题转交给我的一位MVP联系人,希望他能为我找到一些答案)。

纤维函数

纤维函数列在“进程和线程函数[base]”主题下。显然,“纤维”不被认为是标题中有价值的提及。

ConvertFiberToThread 此操作撤销ConvertThreadToFiber(Ex)操作,并释放原始转换中使用的资源。
ConvertThreadToFiber 将当前线程转换为纤维。
ConvertThreadToFiberEx 将当前线程转换为纤维,并提供一些附加选项。
CreateFiber 分配一个纤维对象,为其分配一个堆栈,并将EIP设置为指定的起始地址。一个用户指定的PVOID参数将被传递给纤维函数。
CreateFiberEx CreateFiber相同,但提供附加选项。
DeleteFiber 删除由CreateFiber(Ex)创建的纤维。这是摆脱纤维及其堆栈的方法,但它有一些必须考虑的限制。
FiberProc 此类型的typedeftypedef void (CALLBACK * LPFIBER_START_ROUTINE)(PVOID parameter)
FlsAlloc 分配一个纤维本地存储(FLS)索引;要删除它,请使用FlsFree
FlsCallback FlsAlloc建立的函数,在运行纤维的线程终止或纤维被释放时调用。
FlsFree 释放由FlsAlloc分配的纤维本地存储(FLS)索引。
FlsGetValue 检索存储在特定索引处的当前FLS值。
FlsSetValue 设置存储在特定索引处的当前FLS值。
GetCurrentFiber 返回当前纤维的地址。
SwitchToFiber 将控制权从一个纤维切换到另一个纤维。

创建运行中的纤维环境涉及几个步骤。

首先,必须创建一些纤维。当创建线程时,它会被隐式调度运行。当创建纤维时,没有机制使其能够自发运行。相反,纤维会保持“挂起”状态,直到有显式的SwitchToFiber将控制权从当前运行的纤维转移到其他纤维之一。

由于实际运行的是线程,所以只有正在运行的线程才能执行SwitchToFiber。但为了使其正确工作,它必须一个现有的纤维一个现有的纤维。线程不是纤维,所以虽然它可以创建一大堆纤维,但它不能让它们运行。

这通过使用ConvertThreadToFiberConvertThreadToFiberEx调用来解决。这些调用会在当前线程中创建一个“纤维工作区”,从而赋予线程纤维的特性,或者换句话说,“线程具有纤维性质”。现在,底层线程调度器将调度线程,包括现在具有纤维性质的线程。本质上,在ConvertThreadToFiber(Ex)调用返回后,您会发现自己在一个纤维中运行,该纤维正在原始线程中运行。这个纤维现在可以自由地执行到任何其他纤维的SwitchToFiber

在某个时候,当一个纤维不再使用时,您必须调用DeleteFiber来删除该纤维对象。如何执行此操作有一些限制,稍后将进行讨论。但最终,您创建的每个纤维都必须被删除。

当执行ConvertThreadToFiber(Ex)调用时,它们也会返回一个纤维对象引用。当您调用SwitchToFiber到这个纤维时,您就“回到了线程中”,然后您调用ConvertFiberToThread来释放已分配给该线程的纤维特定资源。

基本上就是这样。这并不是全部,我在这里描述的内容代表了纤维宇宙全部功能的子集,但我稍后会对此进行阐述。

为什么使用纤维?

速度。简单。无需同步。

速度

纤维是轻量级的。从一个纤维切换到另一个纤维的操作需要32条指令,完全不涉及内核调用。在一台2GHz的机器上,这意味着一次交换最快可达8纳秒(最坏情况下,如果数据都不在缓存中,可能需要几十纳秒)。

如果您想知道这些数字从何而来,在2GHz的机器上,32条指令需要16纳秒,但Pentium 4是“流水线超标量”架构,每时钟周期可以发出两条整数指令,得出了8纳秒的值。指令流水线和预取会掩盖指令获取时间。但是,在最坏的情况下,当缓存中没有任何内容时,速度取决于缓存命中和替换的模式,因此受限于平台特定的内存周期速度。

这意味着,如果您有很多小任务要做,纤维可能比线程更好,因为线程涉及内核调用和调度程序的调用。纤维在它们运行的线程的时间片内执行,因此在许多短计算纤维之间切换不涉及任何潜在的内核阻塞。在许多情况下消除了同步的需要,也消除了调度程序介入的需要。

简单

纤维的简单性在很多方面类似于线程的简单性:您不必编写复杂的交错代码来同时处理多件事情,而是编写只做一件事的简单代码,并使用许多不同的纤维来处理不同的事情。这使您能够专注于做好一件事,而不是以一种相当复杂且可能脆弱的方式处理许多事情。

无需同步

在抢占式多线程环境中,如果两个线程可以访问同一数据,则该数据必须受到某种形式的同步保护。这可能很简单,比如使用Interlocked...操作之一(在内存总线上锁定),或者可能需要使用CRITICAL_SECTION或互斥锁进行同步。后两者的缺点是,如果无法授予访问权限,内核将被调用以取消调度线程,该线程将被排队在该同步对象上,以便在同步对象释放后稍后执行。CRITICAL_SECTION的区别在于,如果同步对象可以获得(在大多数情况下是最常见的情况),则不需要内核调用,而互斥锁总是需要内核调用。

线程之间的同步是线程发生摩擦的地方。同步就是摩擦。就像在机械系统中一样,摩擦会产生热量并浪费能量。也许最好的解决方案是避免它。

纤维是避免这种情况的一种方式。::SwitchToFiber是一个“积极的移交”。在纤维切换控制之前,它不会被在该线程中运行的任何其他纤维抢占。因此,一旦一个纤维开始运行,只要它修改的状态只与其他在该线程中执行的纤维共享,就不需要同步状态。同步隐含在由程序员显式控制的纤维调度中。

这不是一个完美的解决方案。一个纤维可以被调度到多个线程中运行;如果共享状态现在可以被运行在不同线程中的纤维访问,那么纤维的特性就无关紧要了;您就遇到了多线程同步问题,并且必须使用CRITICAL_SECTION或互斥锁。

当您有纤维时,您没有并发。如果一个纤维阻塞,例如在I/O调用上,实际上是运行该纤维的线程阻塞了。在该线程中无法调度其他纤维运行,因为线程本身被阻塞了。您不会获得同一线程中运行的纤维之间的任何并发(尽管您可以在其他线程中运行的纤维之间获得并发)。但是,如果您没有阻塞调用,纤维对于多路复用简单的计算密集型任务特别有用。

CFiber类

第一个类是基本纤维对象的包装器

class CFiber {
    public: // constructors
       CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
             { fiber = ::CreateFiber(stack, rtn, this); }
       CFiber() { fiber = NULL; }
       virtual ~CFiber() { ::DeleteFiber(fiber); }
    public: // methods
       BOOL Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
          { 
            ASSERT(fiber == NULL);
            fiber = ::CreateFiber(stack, rtn, this);
            return fiber != NULL;
          }
       BOOL ConvertThreadToFiber() {
          ASSERT(fiber == NULL);
          fiber = ::ConvertThreadToFiber(this);
          return fiber != NULL;
       }
       void Attach(LPVOID p) { ASSERT(fiber == NULL); fiber = p; }
       LPVOID Detach() { LPVOID result = fiber; 
                         fiber = NULL; return result; 
                       }
       LPVOID GetFiber() { return fiber; }
    public: // methods
       void run() { ASSERT(fiber != NULL); ::SwitchToFiber(fiber); }
    protected: // data
       LPVOID fiber;
};

CFiber方法

CFiber 构造函数
~CFiber 析构函数
Create

在现有CFiber对象中创建一个新纤维

ConvertThreadToFiber

用于将线程转换为纤维,以便进行纤维调度

Attach

将现有纤维附加到CFiber对象

Detach 将纤维从CFiber对象分离
GetFiber 返回当前纤维
run 切换到选定的纤维

构造函数/创建

有几个构造函数。这里的关键是假设纤维将看到的“纤维参数”实际上是CFiber对象,或其派生子类。

CFiber::CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0);

此构造函数将创建一个纤维,该纤维将执行指定的例程rtn。它有一个可选的堆栈大小参数,默认为进程堆栈大小。

参数

rtn

纤维例程。void CALLBACK rtn(LPVOID p)

stack

所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。

另请参阅CFiber::CFiber的实现。

CFiber::CFiber();

此构造函数创建一个不与任何纤维关联的CFiber对象。这通常在用户希望纤维参数不是CFiber对象时使用。

CFiber::~CFiber()

析构函数删除底层纤维对象。

注意:对纤维执行delete操作必须小心,因为删除当前正在执行的纤维将终止正在执行该纤维的线程。纤维不得删除自身。

BOOL CFiber::Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)

给定一个不与纤维关联的CFiber对象,这将创建一个新纤维并将其附加到CFiber对象。

参数

rtn

纤维例程。void CALLBACK rtn(LPVOID p)

stack

所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。

注释

以下是等效的

(1)
CFiber * fiber = new CFiber(function);
(2)
CFiber * fiber = new CFiber;
LPVOID f = ::CreateFiber(0, function, fiber);
Fiber->Attach();
BOOL CFiber::ConvertThreadToFiber()

备注

将当前线程转换为可调度的(::SwitchToFiber的目标)纤维。当前CFiber对象成为纤维参数。

结果:如果底层::ConvertThreadToFiber调用成功,则为TRUE;如果失败,则为FALSE(使用GetLastError确定出了什么问题)。

注意:在完成纤维并删除所有纤维后,应调用::ConvertFiberToThread来释放已分配给线程的纤维特定数据结构。

void CFiber::Attach(LPVOID p)

此操作将一个纤维附加到一个CFiber对象。现有的CFiber对象不得已附加了纤维。

参数

p

纤维引用

注意:与MFC不同,它不维护映射来查看给定纤维是否绑定到多个CFiber对象。程序这样做是错误的,但没有进行额外的检查。

通过构造函数创建时,纤维参数(由::GetFiberData返回)是CFiber对象。如果应用程序希望将不同的对象关联为纤维参数,则可以创建并附加该纤维。

LPVOID CFiber::Detach()

此操作会断开CFiber与底层纤维的关联。

结果:先前与CFiber对象关联的纤维。

注意:纤维保留其创建时的纤维参数;因此,如果一个纤维从一个CFiber对象DetachAttach到另一个,并且它有一个纤维参数是原始CFiber对象,那么现在这将是一个错误的引用。当预期的行为是CFiber对象是纤维参数时,不应使用AttachDetach

LPVOID CFiber::GetFiber();

结果:CFiber对象关联的纤维。

void CFiber::run();

使用::SwitchToFiber将控制权切换到纤维。

队列条目类

这里的目标是有一个“时间共享”的纤维集,纤维只是在适当的时候让出控制权,然后另一个纤维就会运行。在正常操作模式下,纤维通过调用::SwitchToFiber来让出控制权,并指定下一个要运行的纤维。但是,这需要实际知道下一个纤维应该是什么。在某些应用程序中,这是规范的一部分。在我正在编写的系统中,我只知道我想将控制权让给另一个纤维。选择是做一个轮询纤维调度器,这意味着一个队列。这需要一个队列条目类来表示队列中的纤维。我从CFiber类派生了它。

class QE : public CFiber {
    public: // constructor/destructor
       QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) 
             : CFiber(rtn, stack) { next = NULL; }
       QE() { next = NULL; }
       virtual ~QE() {  }

    public:
       virtual void Display(LPCTSTR s) { }
    public: // internal state
       QE * next;
};

typedef QE * PQE;

QE方法和成员变量

QE

构造函数

~QE

析构函数

显示

用于调试输出的虚拟方法

next

用于将元素插入队列的链接

QE::QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0);

此构造函数将创建一个队列条目和一个与该队列条目关联的新纤维,该纤维将执行指定的例程rtn。它有一个可选的堆栈大小参数,默认为进程堆栈大小。

参数

rtn

纤维例程。void CALLBACK rtn(LPVOID p)

stack

所需的堆栈大小;如果省略,则假定为0(使用默认堆栈大小)。

QE::QE();

此构造函数将创建一个不与任何纤维关联的队列条目。可以使用CFiber::Attach调用来附加纤维。请参阅有关纤维数据的注意事项。

注意:如果::GetFiberData调用返回的值不是QE引用,则此代码将无法正常工作

virtual QE::~QE();

析构函数将删除QE对象及其关联的纤维。请参阅有关删除运行中纤维的注意事项。不应将delete运算符应用于表示运行中纤维的QE

virtual void QE::Display(LPCTSTR s);

此方法的目的是让QE的子类实现此虚拟方法,以便进行调试。此类在此虚拟方法的实现中什么都不做。虽然原则上它可以是一个纯虚拟方法,但可能存在创建原始QE对象的理由,如果该方法是纯虚拟方法,则无法做到这一点。

QE * QE::next;

这提供了由Queue类管理的队列结构。

注意:C++有一个称为friend的机制,它允许另一个类访问私有变量。然而,这是设计不佳的,因为它意味着QE类必须知道将使用它的类的名称。这降低了类的通用性;因此,与其将其标记为protected并要求已知的类名,不如将其设为public,从而允许其他队列的实现。

Queue类

我想实现一个FIFO队列来进行轮询调度。我通常会使用MFC的CList类,但是这段代码必须在非MFC环境中工作。

class Queue {
    public:
        Queue() { queueHead = queueTail = emptyFiber = killFiber = NULL; }
    public:
       void appendToQueue(PQE qe);
       PQE removeFromQueue();
    public:
       void next();
       void yield();
       void kill();
   public:
       void SetEmptyFiber(PQE qe) { emptyFiber = qe; }
    protected:
       PQE queueHead;
       PQE queueTail;
       PQE emptyFiber;
    protected: // used for fiber-kill logic
       PQE killFiber;  
       PQE killTarget;

       static void CALLBACK killer(LPVOID p);
};

这旨在支持纤维的排队。yield方法将当前队列条目放入队列尾部。next方法分派队列头部的纤维。kill方法对当前运行的元素执行delete,但它非常小心地通过为此目的使用单独的纤维来执行此操作。然后它启动队列中的下一个纤维。

这段代码实现了简单的FIFO排队。可以创建其他类来实现更复杂的排队方式。

Queue方法

队列管理方法

队列

构造函数

appendToQueue

将元素添加到队列尾部

removeFromQueue

从队列头部移除元素

调度方法

next

从队列头部出队并切换到该纤维;如果队列为空,则切换到空队列纤维

yield

将当前纤维调度到队列末尾,并运行队列头部的纤维

kill

删除当前纤维并调度下一个纤维

SetEmptyFiber

指定队列为空时要切换到的纤维

队列管理方法

Queue::Queue();

构造一个空队列。

void Queue::appendToQueue(PQE qe);

将参数指向的QE放在队列的末尾。

PQE Queue::removeFromQueue();

从队列头部移除队列元素。

结果:指向已出队的QE条目的指针,如果队列为空,则为NULL

队列调度方法

void Queue::next();

从队列头部出队队列元素,并将控制权切换到它所代表的纤维。如果没有队列中的元素,它将切换到“空队列”元素所代表的纤维(参见SetEmptyFiber)。

void Queue::yield();

当纤维希望让出控制权时,它调用此方法,控制权将转移到队列中的下一个纤维。当前纤维被放置在队列的末尾。当纤维被重新调度时,执行将在yield调用之后的行上恢复。

void Queue::kill();

当纤维完成计算时,它调用此方法。控制权将转移到队列中的下一个纤维。当前纤维不会被放在队列的末尾,因此不再被调度。

注意:要“挂起”一个纤维以便“重新启动”它,需要构建一个更复杂的机制。

void Queue::SetEmptyFiber(PQE qe);

参数

qe

一个QE引用,指向如果队列为空则要调度的纤维。

使用队列

队列首先通过使用appendToQueueQE对象添加到队列来填充。一旦纤维开始运行,它们通常会按轮询FIFO顺序执行。当前机制没有“挂起”纤维的方法;这项泛化留给读者练习。

要启动队列,主线程必须创建一个代表CFiber::ConvertThreadToFiber调用的QE。这通常被设置为队列为空时要恢复的默认纤维,因此使用SetEmptyFiber方法来建立此设置。

代码

以下各节说明了此示例中的代码。

CFiber类

CFiber类所有方法都在头文件中定义。CFiber类包装了原始纤维表示,这是一个LPVOID

    protected: // data
        LPVOID fiber;

构造函数/析构函数

CFiber(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
   { 
    fiber = ::CreateFiber(stack, rtn, this); 
   }

请注意,此构造函数使用::CreateFiber创建纤维,并将this作为纤维参数传递。

CFiber() { fiber = NULL; }

这创建了一个未绑定到特定纤维的CFiber对象;Attach方法稍后可用于将纤维绑定到此CFiber对象。请注意,这既强大又有风险:它允许纤维参数不是CFiber对象,但后面的一些类并非为在这种条件下工作而设计。

virtual ~CFiber() { ::DeleteFiber(fiber); }

这仅仅是删除纤维。然而,当前执行的纤维不能在自身上执行此方法,否则运行该纤维的线程将被终止。

创建/附加

       BOOL Create(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0)
             {
               ASSERT(fiber == NULL);
               if(fiber != NULL)
                  return FALSE;
               fiber = ::CreateFiber(stack, rtn, this);
               return fiber != NULL;
             }

在调试模式下,这将处理ASSERT测试,以确保此CFiber尚未绑定到纤维。在发布模式下,ASSERT消失,但如果纤维已绑定,该方法将返回FALSE

BOOL ConvertThreadToFiber() {
          ASSERT(fiber == NULL);
          if(fiber != NULL)
             return FALSE;
          fiber = ::ConvertThreadToFiber(this);
          return fiber != NULL;
       }

将要开始调度纤维的线程需要执行ConvertThreadToFiber。这会调用API来将当前线程映射到一个纤维,并将当前CFiber对象作为纤维参数传递。

void Attach(LPVOID p) { ASSERT(fiber == NULL); fiber = p; }

此调用应仅用于未绑定到纤维的CFiber对象。在不先进行Detach的情况下尝试将CFiber绑定到另一个纤维是名义上错误的。

LPVOID Detach() { ASSERT(fiber != NULL);
                  LPVOID result = fiber;
                  fiber = NULL;
                  return result;
                }

此操作会断开CFiber与其底层纤维的关联,并返回纤维引用。如果纤维未绑定,则返回值为NULL。名义上,如果不存在关联,则Detach是错误的,如果出现这种情况,ASSERT将在调试模式下导致失败。请注意,没有机制可以更改已Detach的纤维的Attach的纤维参数,并且此处描述的其他子类在纤维参数不是CFiber对象的引用时将无法正常工作。

void run() { ASSERT(fiber != NULL);
             ::SwitchToFiber(fiber);
           }

此操作将当前执行的纤维切换到在其上指定了run方法的纤维。

队列条目类

此类派生自CFiber类,因此继承了构造函数和其他方法。QE构造函数是

QE(LPFIBER_START_ROUTINE rtn, SIZE_T stack = 0) 
             : CFiber(rtn, stack) { next = NULL; }

因此,这只是CFiber的构造,并增加了初始化用于维护队列的链接。

Display方法用于生成调试输出,但实现实际输出的责任在于子类。

Queue类

Queue类本质上是一个单向链接队列管理器,结合了纤维调度逻辑。单向链接队列管理器相当简单。我将在此展示纤维调度函数。

yield方法

请注意,为了简化代码,我选择假设纤维参数是实际的QE对象(或QE子类的实例)。这意味着如果Attach/Detach的使用方式使纤维参数不一致,这段代码将无法正常运行,因为没有办法重置纤维参数(虽然有::GetFiberData API,但没有相应的::SetFiberData!)。

此方法所做的就是将当前运行的纤维加入队列的末尾,并激活队列中的下一个纤维(使用next方法)。

void Queue::yield()
    {
     PQE qe = (PQE) (::GetFiberData());
     qe->Display( _T("yield: returning to queue - ") );
     appendToQueue(qe);
     next();
    } // Queue::yield

kill方法

当纤维终止并必须删除时,就会调用此方法。

void Queue::kill ()
    {#ifdef _DEBUG
     PQE qe = (PQE) (::GetFiberData());
     qe->Display(_T("kill"));
#endif
     killTarget = qe;
     if(killFiber == NULL)
        { /* create killer fiber */
         killFiber = new QE();
         LPVOID f = ::CreateFiber(0, killer, this);
         killFiber->Attach(f);
        } /* create killer fiber */
     killFiber->run();    } // Queue::kill

问题在于纤维不能删除自己。如果对当前正在运行的纤维调用::DeleteFiber,那么该纤维所在的整个线程将退出,因为这将调用ExitThread。为了杀死纤维,它将控制权切换到另一个纤维,即killFiber。这个纤维能够对QE对象(请记住,它是一个CFiber的子类,并且本身可能是某个更深层QE子类的实例)执行delete操作,这是通过调用::DeleteFiber实现的,因为此时,被删除的纤维不是正在运行的纤维。

请注意,这似乎与DeleteFiber的文档不符,后者说明:

如果当前正在运行的纤维调用DeleteFiber,则其线程调用ExitThread并终止。但是,如果当前正在运行的纤维被另一个纤维删除,那么运行被删除纤维的线程很可能会异常终止,因为纤维堆栈已被释放。

这似乎完全没有意义。由于纤维的性质,“正在运行的纤维”不能被另一个纤维(在同一线程中)删除,因为在它被删除时,它本身就不是“正在运行”的纤维!但是,我怀疑如果将该段落替换为(我建议的更改斜体

如果当前正在运行的纤维调用DeleteFiber,则其线程调用ExitThread并终止。但是,如果当前正在运行的纤维被另一个线程删除,那么运行被删除纤维的线程很可能会异常终止,因为纤维堆栈已被释放。如果一个纤维已被删除,任何尝试使用SwitchToFiber将其控制权切换回该纤维的操作都可能导致其线程异常终止,因为纤维堆栈已被释放

我目前正在与微软研究这个问题。当前的文档段落,如其所写,将使得永远无法调用DeleteFiber

那么问题就来了:既然纤维在很多方面看起来都像协程,那么参数是如何传递给杀手的呢?在这种情况下,在Queue类中添加了一个成员变量killTarget。因为我们处理的是非抢占式纤维,并且我们假设没有从另一个线程对Queue对象进行并发访问(这是此代码的基本假设),所以不需要担心任何形式的同步;此外,除了正在切换到的纤维之外,没有任何纤维可以获得控制权,因此使用这样的变量没有问题(在抢占式多线程系统中,这将是一个致命的设计错误,但在非抢占式纤维系统中则完全有效)。

为了避免任何显式初始化或最终化的需要,我只在kill fiber尚不存在时创建它,并且稍后会显示删除它的位置。为什么不这样做,而是将其放在构造函数和析构函数中?

主要是寿命问题。我认为纤维的存在时间不应超过其需要的时间。此外,在确定何时可以删除纤维方面似乎存在一些严重问题,特别是因为队列的析构函数可能在程序执行了::ConvertFiberToThread调用来释放纤维资源之后运行,因此在这种条件下执行::DeleteFiber的效果似乎是未定义的。

这确实引出了kill fiber何时可以被删除的问题。我选择在队列变空时删除它。请注意,这会导致调度回“主纤维”,实际上是空队列纤维。由于队列为空,将不再需要杀死纤维,并且killFiber纤维可以被安全地删除。请注意,因此,程序可能会选择创建更多的队列条目;在这种情况下,killFiber会自动重新创建,“按需”。

杀手纤维非常简单;这里的关键是,因为它是一个单独的纤维,所以现在可以安全地对已完成其操作的纤维执行::DeleteFiber

void CALLBACK Queue::killer(LPVOID)
   {
     delete killTarget;
     next();
   } // Queue::killer

next方法

next方法从队列中出队下一个元素,并通过调用其纤维上的::SwitchToFiber来调度它运行。如果没有更多的纤维排队,它将恢复到由SetEmptyFiber建立的纤维。请注意,未能调用SetEmptyFiber将被视为致命错误。

void Queue::next()
    {
     PQE qe = (PQE)removeFromQueue();
     if(qe != NULL)
        { /* got one */
         qe->Display(_T("Dequeued QE"));
         qe->run();
        } /* got one */
     else
        { /* all done */
         delete killFiber;
         killFiber = NULL;
         TRACE( (_T("Queue empty: switch to EmptyFiber\n")) );
         ASSERT(emptyFiber != NULL);
         // Note that a failure to have set the empty fiber is a fatal 
         // error and is unrecoverable!
         emptyFiber->run();
        } /* all done */
    } // Queue::next

其他一些问题

为什么我对killFiberemptyFiber使用QE结构?既然它们实际上没有被调度,为什么不使用原始的CFiber对象呢?

emptyFiber的情况下,有人可能会选择,而不是我采取的方法,创建一个优先级调度的队列,并将空纤维简单地设为最低优先级的纤维。我认为如果emptyFiber看起来像一个可调度的纤维,更容易看到如何做到这一点。由于调用者必须创建纤维,这也意味着底层实现可以在不更改用户界面的情况下进行更改。

killFiber的选择更难证明;在这种情况下,它更多地成为一致性问题而不是功能问题,特别是因为这从未逃脱到用户界面级别。但是,这样做允许我决定执行“延迟杀死”,因为我可以考虑将kill fiber调度为一项任务来完成,并且可以将其插入队列的头部或稍后的位置。但是,请注意,如果我将其插入到第一个队列项之外的其他位置,那么全局killTarget的使用将不再有效。这是因为killTarget为每个要杀死的纤维显式设置,如果我可能有一个以上的待处理杀死请求,这是不够的(在这种情况下,我可能会按需创建纤维,并使用纤维参数,该参数必须是QE对象;我必须创建一个class Killer : public QE来维护我需要的上下文)。

问题域

示例程序的目标是获取命令行上的一组文件名,并打印出文件,每个文件一个纤维。为了使其看起来像一个并发程序,该程序被定义为一次只打印几行,然后纤维被解除调度,另一个纤维运行。当到达文件末尾时,纤维不再被调度。程序完成后,所有纤维都被删除。

为了解决这个问题,一个类被派生自QE

class CReaderFiber : public QE {
    public: // constructors
       CReaderFiber(LPCTSTR f, int c, Queue * q) : QE(reader) {
                                name = f;
                                count = c;
                                queue = q;
                                file = NULL;
                               }
       virtual ~CReaderFiber() { if(file != NULL) fclose(file); }
    public: // parameters
       LPCTSTR name;     // name of file
       int count;        // number of lines to write
       Queue * queue;    // the queue shared by all these fibers
    public:
       virtual void Display(LPCTSTR s);
    public: // local state
       FILE * file;      // currently-opened file object
       char buffer[MAX_LINE];  // local buffer
    protected:
       static void CALLBACK reader(LPVOID p);
 };

这个派生类持有所有特定于问题的 P信息,例如在每次纤维执行期间要写入的行数、文件名、保存输入的缓冲区等。纤维函数本身是一个静态类成员。

/* static */ void CALLBACK CReaderFiber::reader(LPVOID p)
   {
    CReaderFiber * rf = (CReaderFiber *)p;
 
    TRACE( (_T("reader: called for %s\n"), rf->name) );
    rf->file = _tfopen(rf->name, _T("ra"));
    if(rf->file == NULL)
       { /* failed */
        DWORD err = ::GetLastError();
        reportError(err, rf->name);
        TRACE( (_T("reader: could not open %s\n"), rf->name) );
        rf->queue->kill();
        return;
       } /* failed */

文件打开后,我们只需进入纤维中的无限循环,读取和打印行。请注意,此代码假定程序的Unicode版本将读取Unicode文件,而ANSI版本将读取ANSI(8位本机代码页)文件。泛化,即任何版本都可以读取或写入任何类型的文件,留给读者练习。请注意,在纤维循环中,在打印完count行之后,纤维就会让出。因此,另一个纤维将会运行。

     while(TRUE)
       { /* fiber loop */
        for(int i = 0; i < rf->count; i++)
           { /* read lines */
            if(_fgetts(rf->buffer, MAX_LINE, rf->file) == NULL)
               { /* all done */
                TRACE( (_T("reader: done with %s\n"), rf->name) );
                rf->queue->killFiber();
                ASSERT(FALSE); 
               } /* all done */
            _tprintf(_T("%s"), rf->buffer);
           } /* read lines */
        TRACE( (_T("reader: yielding for %s after %d lines\n"), 
                                               rf->name, rf->count) );
        rf->queue->yield();
       } /* fiber loop */
   } // reader

主程序

命令行语法为

  • argv[0]是程序名称
  • argv[1]是每次迭代要显示的行数
  • argv[2..argc-1]是文件名
int _tmain(int argc, TCHAR * argv[])
{
    Queue queue;

    if(argc < 3)
    { /* usage */
        usage();
        return 1;
    } /* usage */
 
    int lines = _ttoi(argv[1]);

    if(lines <= 0)
    { /* failed */
        _tprintf(_T("Illegal buffer count (%d)\n"), lines);
        usage();
        return 1;
    } /* failed */

在完成参数验证的枯燥细节后,真正的工作就开始了。首先,创建一个数组来保存所有CFiber派生对象。为了简化代码并消除对所有偏移量的担忧,我只创建了一个与argc数组大小相同的数组,并忽略其中的前两个条目。

    CReaderFiber ** fibers = new CReaderFiber*[argc];

为了处理“主纤维”,即转换为纤维的此线程,我将需要一个QE对象来允许其被调度。ConvertThreadToFiber方法将完成此操作。

     PQE mainFiber = new QE();

     if(!mainFiber->ConvertThreadToFiber())

     { /* failed conversion */

         DWORD err = ::GetLastError();
         reportError(err, _T("Converting thread to fiber"));

         return 1;

     } /* failed conversion */

接下来,我扫描文件名列表。对于命令行中的每个文件,我创建一个新的CReaderFiber对象并将其放入纤维数组中。每次创建纤维实例时,我都将其放入队列。

     for(int i = 2; i < argc; i++)

     { /* create fibers */
        fibers[i] = new CReaderFiber(argv[i], lines, &queue);

        if(fibers[i] == NULL)

        { /* failed */

            DWORD err = ::GetLastError();
            reportError(err, _T("Creating fiber"));

            return 1;

        } /* failed */
        queue.appendToQueue(fibers[i]);

} /* create fibers */

当所有线程完成后,控制权将返回到此线程,“主纤维”。为了实现这一点,我们必须设置mainFiberConvertThreadToFiber的结果),以便在队列为空时(所有文件的内容都已打印完毕)将控制权送回此处。

    queue.SetEmptyFiber(mainFiber);

此时,我们开始调度纤维。纤维将运行,直到打印完所有文件内容。

    queue.next();

当打印完所有内容后,::SwitchToFiber调用会将执行切换到此纤维,然后执行下面的代码。

    TRACE( (_T("Finished\n")) );
    delete mainFiber;
    ::ConvertFiberToThread();
    return 0;
   }

输出示例

提供的命令行参数为

5 a.txt b.txt c.txt

其中文件内容的形式为“File <filename> <line number>”。输出如下所示;分组为5是基于命令行第一个参数的。请注意显示的更改:文件a.txt有200行,b.txt有250行,c.txt有300行。这些是由一个名为“datagen”的小程序生成的,命令行为

datagen 200 "File A" > a.txt
datagen 250 "File B" > b.txt
datagen 300 "File C" > c.txt

输出

File A 1
File A 2
File A 3
File A 4
File A 5
File B 1
File B 2
File B 3
File B 4
File B 5
File C 1
File C 2
File C 3
File C 4
File C 5
File A 6
File A 7
File A 8
File A 9
File A 10
File B 6
File B 7
File B 8
File B 9
File B 10
File C 6
File C 7
File C 8
File C 9
File C 10
File A 11
File A 12
File A 13
File A 14
File A 15
…
File C 191
File C 192
File C 193
File C 194
File C 195
File A 196
File A 197
File A 198
File A 199
File A 200
File B 196
File B 197
File B 198
File B 199
File B 200
File C 196
File C 197
File C 198
File C 199
File C 200
File B 201   <- Note that file A, with 200 lines is no longer in the mix after this point
File B 202
File B 203
File B 204
File B 205
File C 201
File C 202
File C 203
File C 204
File C 205
File B 206
File B 207
File B 208
File B 209
File B 210
File C 206
File C 207
File C 208
File C 209
File C 210
File B 211
File B 212
File B 213
File B 214
File B 215
…
File B 246
File B 247
File B 248
File B 249
File B 250
File C 246
File C 247
File C 248
File C 249
File C 250
File C 251   <- Note that file B, with 250 lines, is no longer in the mix after this point
File C 252
File C 253
File C 254
File C 255
File C 256
File C 257
File C 258
File C 259
File C 260

一些额外的纤维问题

此代码做出了许多简化假设,这些假设不应被视为在一般纤维世界中是限制性的。例如

  • 所有纤维都将在单个线程中运行,即创建它们的同一线程
  • 纤维一旦创建,将保持附加到同一个CFiber对象

这两者都是实现选择。虽然可以将一个纤维由一个线程执行,然后由另一个线程执行(::SwitchToFiber似乎不关心纤维是在哪个线程创建的),但这样做存在某些风险。如果您假设您的纤维没有同步问题,因为它们是纤维,然后突然开始在单独的线程中执行它们,那么您必须非常小心,您没有引入同步问题,而且在维护过程中,要小心避免出现任何未来的同步问题。

请注意,如果您选择在不同的线程中运行纤维(即,给定的纤维可能在不同的线程中运行;而不是每个线程只在它们自己内部运行纤维的情况),请务必阅读有关/GT编译器选项“支持纤维安全线程局部存储”的内容,然后再继续。当在可能稍后在不同线程中调度的纤维中使用__declspec(thread)变量时,这会产生影响。

我选择使用::GetFiberData来获取纤维数据是一个简化的假设,不应被视为如何完成此操作的最终方法。它简化了一个案例的代码,即我正在实现的案例。

通常的杂项

与任何实际程序一样,有几件事不是程序的原始目标,但由于功能、优雅、便利或其他原因而需要完成。本程序也不例外,尽管其中一些组件已经独立开发并在此简单使用。

ASSERT

MFC的ASSERT宏设计得很好。它只报告一个问题,但它不会中止程序。这是一个宝贵的特性。程序员可以使用ASSERT宏来辅助调试,但可以选择在返回时进行“优雅恢复”,并允许程序员进行从手动调整调试器中的值到进行程序更改并使用编辑-执行来重新执行行。总的来说,这是一个智能而优雅的设计。

另一方面,C库的assert函数设计不佳,使用了我称之为“PDP-11思维模式”的方法,指的是最早普及C语言的小型计算机。当时的理念是程序很小,在内部错误时终止程序是可以的。在GUI设计世界中,这总是一个糟糕的设计决定;只有用户才能终止程序。

因此,我需要一个非MFC应用程序的ASSERT宏。此外,我希望这段代码能够轻松地集成到MFC应用程序中(请注意,本文中的C++类都是独立的,而不是基于MFC类,如CObject)。

这里的第一个技巧是头文件,assert.h

#pragma once
#ifndef ASSERT
#ifdef _DEBUG
 
#define ASSERT(x) if(!(x)) 
\
      fail(_T("Assertion Failure %s(%d)\n"), __FILE__, __LINE__)
#define VERIFY(x) ASSERT(x)
extern void fail(LPCTSTR fmt, ...);
#else
#define ASSERT(x)
#define VERIFY(x) x
#endif // _DEBUG
#endif // ASSERT

fail方法很简单

void fail(LPCTSTR fmt, ...)
   {
    va_list args;
    va_start(args, fmt);
    TCHAR buffer[1024]; 
// should be large enough
    _vstprintf(buffer, fmt, args);
    va_end(args);
    OutputDebugString(buffer);
   }

然而,请注意这有多危险!如果数据超过1024个字符怎么办?答案很简单:这将是一个致命的缓冲区溢出情况!

因为我只使用文件名(最大MAX_PATH个字符(名义上是260个字符))和行号,所以这是安全的。但它普遍安全!

不幸的是,这在Visual Studio 6中完成,没有解决方案。但在Visual Studio .NET中,在微软饱受其害的缓冲区溢出造成的众多安全漏洞之后,添加了适当的原语。这里需要的重要新调用是_vscprintf/_vscwprintf,或者在*tchar.h*中“双模态”定义为_vsctprintf,它会编译为适合ANSI或Unicode应用程序。它们返回格式化字符串(不包括NULL终止符)所需的字符数。因此,VS.NET的正确编写函数如下所示

void fail(LPCTSTR fmt, ...)
   {
    va_list args;
    va_start(args, fmt);
    int n = _vsctprintf(fmt, args);
    LPTSTR buffer = new TCHAR[n + 1];
    _vsctprintf(buffer, fmt, args);
    va_end(args);
    OutputDebugString(buffer);
    delete [] buffer;
   }

在MFC中,这将很容易通过使用CString::FormatV方法来完成,该方法实现了一个类似于_vsctprintf的算法,但此代码的前提是我们不使用MFC。

请注意,此代码不会弹出消息框或与用户进行交互。但它允许程序继续。如果我需要它停止,我经常在函数末尾设置一个断点;或者您可以创建一个更复杂的,使用DebugBreak

TRACE宏

debug.cpp文件中,我实现了一个简单的输出跟踪。请注意,它与上面描述的fail方法非常相似,包括缓冲区溢出错误(如果使用VS.NET,则相同的解决方案适用)。但是,我们遇到了TRACE的一个额外问题,而ASSERT没有:可变数量的参数。

C中的宏系统代表了20世纪50年代宏系统的最高水平,远不如许多编程语言所拥有的复杂。一个好的宏系统实际上是图灵完备的,因为您可以在宏中编写编写代码的程序。C宏系统,嗯,说它是“原始的”有点不公平。在许多情况下,石斧要精巧得多。尽管如此,这就是我们必须处理的,所以我们必须尽力而为。

这里的技巧是从Microsoft DDK的KdPrint宏中借用的。要将可变数量的参数传递给宏调用,您必须将它们包装在额外的括号中,以便在语法上它们看起来像宏的单个参数。因此,TRACE宏及其伴随函数的定义如下(来自debug.h

void CDECL trace(LPCTSTR fmt, ...);

#ifdef _DEBUG
#define TRACE(x) trace x
#else
#define TRACE(x)
#endif

宏调用可能如下

TRACE( (_T("The result for 0x%p is %d (%s)\n"), 
           object, object->value, object->name) );

请注意我倾向于在TRACE宏的参数括号和参数列表的参数括号之间使用空格。

还请注意使用%p来获取要打印的地址。养成这个习惯非常重要,而不是使用0x%08x或其他非可移植方法。这在程序移植到Win64时将无法正常工作(格式必须手动编辑为0x%16I64x;注意%之后的第一个参数是数字16,下一个序列是限定符I64,表示传入的参数是64位值,最后是x,表示所需的格式)。

报告错误

通常,我使用一个更通用的实现来包装::FormatMessage,但对于这个简单的示例,一个控制台应用程序,没有GUI界面,并且不使用MFC,一个更简单的实现就足够了。

本着“我的代码中永远不会有两条发出相同的错误消息”的理念,我允许用户传入一个唯一的标识符字符串。对于这个玩具示例,我没有费心通过STRINGTABLE提供本地化。

void reportError(DWORD err, LPCTSTR reason)
   {
    LPTSTR msg;
    
    if(FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
                     NULL,   // source of message table
                     err,    // error code to format
                     0,      // language ID: default for thread
                     (LPTSTR)&msg, // place to put pointer
                     0,            // size will be computed
                     NULL) == 0)   // arglist
       { /* error */
        _tprintf(_T("Error %d (0x%08x)\n"), err);
        return;
       } /* error */

    _tprintf(_T("Error %s\n%s\n"), reason, msg);
    LocalFree(msg);
   } // reportError
© . All rights reserved.