设计和实现 Windows 用户模式应用程序中的同步原语






4.19/5 (6投票s)
在原生 Windows 用户模式应用程序中创建轻量级同步实体的另一种方法
并发无处不在。
J. Duffy. "Concurrent Programming on Windows" (Windows 并发编程)
引言
并行处理器现在在各种主流设备上都非常普遍。即使您的设备只有一个处理器,十有八九其架构也是多核的。并行计算的好处是显而易见的:缩短开发周期。
但是,在提供将操作分解为组成部分以便独立部分可以在独立处理器(核心)上运行的能力的同时,多任务处理也带来了一系列新的问题:独立任务的同步。使用的共享资源必须受到保护,防止被并行任务同时修改。此类修改必须串行化。在独立任务使用和修改某些共享资源的情况下,开发人员必须创建任务通信协议,以防止共享资源损坏。
概念
- 我们无法检查这些对象的状态(Windows 中没有
TryEnterExclusiveMode
或TryEnterSharedMode
之类的函数)。线程只能等待进入共享或独占模式; - 我们无法为读取和写入操作引入优先级;
- 我们无法更改递归调用行为策略。
为什么这很重要?
想象一下,您实现了一个复杂的算法,用于在内存中渲染一些奇特的、数据结构(例如图)。每个节点都分配在内存中。所有节点的大小都不同。一旦节点被线程创建并嵌入到结构中,任何其他线程都可以读取和修改它,也就是说,节点可能会被重新分配甚至释放。哪种内存模型会更可取?显然,只使用内存堆不是一个好主意。在并发环境中,您必须在堆上进行串行化。而这将使并行线程的使用付之东流。
更好的方法是使用多个堆来分配节点,这样每个线程集中的线程都可以检查堆的状态,以拾取另一个可能未使用的堆。所有这些都可以应用于节点。任何时候,每个节点都可以处于未使用、读取或写入状态。很自然地,如果某个节点对线程的控制流很重要,它必须等待该节点的状态改变。但是,如果线程正在根据某些标准查找未使用节点,则可以跳过已使用的节点。因此,您必须使用具有令人满意属性的同步对象。
机制
最重要的是,对象是状态机。因此,您必须定义这些对象的状态集。但是这个集合的内容取决于将使用的策略。下面我提出了自己在这方面的观点。显然,只有线程在改变状态时才能拥有同步对象。所有其他线程必须等待对象状态的改变。换句话说,所有对象的状态查询都必须在时间上串行化。其次,同样重要的是原子性级别。同步对象的原子性是显而易见的:状态是原子的,状态的改变要么完整执行,要么根本不执行。但是对象内部的原子性取决于环境,即系统原子性。Windows 提供了大量“原子”函数,可确保操作被原子地执行。因此,我建议使用它们
(在数据对齐限制方面)。
我从最简单的同步对象类型开始——临界代码(代码段、监视器等)。对于这类对象,有三种状态:已获取、已释放和维护(我称之为这种状态)
typedef enum { lsAcquired, lsReleased, lsMaintenanceMode } LATCHSTATE;
维护模式意味着某个线程正在查询或更改对象的状态。因此,我最简单的同步对象接口如下:
class __declspec(novtable) ISyncLatch { public: virtual void Acquire(void) = 0; virtual void Release(void) = 0; virtual BOOL TryAcquire(const DWORD dwMilliseconds) = 0; };
是的,就像经典的临界区一样。以下是实现:
class CLatch : public ISyncLatch { private: DWORD mf_dwOwnerThread; volatile __declspec(align(16)) LATCHSTATE mf_LatchState; ... public: ... virtual void Acquire(void); virtual void Release(void); virtual BOOL TryAcquire(const DWORD dwMilliseconds); };
首先,我们需要查询对象当前状态
LATCHSTATE LatchState; LONG lnSpinSwitchCount(0); // Wait while latch is in maintenance mode. Another thread already acquired latch state. while (lsMaintenanceMode == (LatchState = static_cast<LATCHSTATE>(::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), lsMaintenanceMode)))) { if (lnSpinSwitchCount < THREAD_WAIT_SPIN_COUNT) { ::YieldProcessor(); lnSpinSwitchCount++; } else ::SwitchToThread(); }
正如您所见,这是一种将对象状态原子地更改为维护模式的简单方法。如果对象已处于维护模式,我不会做任何更改。当前对象的状态存储在本地 LatchState 变量中。这里可以看到 Acquire() 方法的实现:
virtual void Acquire(void) { LATCHSTATE LatchState; do { LONG lnSpinSwitchCount(0); // Wait while latch is in maintenance mode. Another thread already acquired latch state. while (lsMaintenanceMode == (LatchState = static_cast<LATCHSTATE>(::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), lsMaintenanceMode)))) { if (lnSpinSwitchCount < THREAD_WAIT_SPIN_COUNT) { ::YieldProcessor(); lnSpinSwitchCount++; } else ::SwitchToThread(); } if (lsAcquired == LatchState) { if (::GetCurrentThreadId() != mf_dwOwnerThread) { // Latch is acquired by another thread. ::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), LatchState); ::SwitchToThread(); } else { // There is recursion here. Thread already acquires the latch. ::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), LatchState); return; } } } while(lsAcquired == LatchState); // Now latch is released. Acquire it. mf_dwOwnerThread = ::GetCurrentThreadId(); ::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), lsAcquired); }
您可以根据需要更改递归获取策略:跳过它(就像我所做的)、增加递归计数(就像在原生临界区中一样)或抛出异常(例如,模拟 SRWLock)。选择您需要的方式。mf_dwOwnerThread
成员用于定义递归调用。在 源代码 中,您可以检查此对象实例。通过这种方式,可以模拟事件、通知(手动重置事件)和信号灯。
现在,让我们实现具有共享和独占模式的同步对象,即“多读取者 - 独占写入者”设计模式。这里的状态集是:
typedef enum { lmUnused, lmAnalyzed, lmReadMode, lmWriteMode } LOCKMODE;
同步对象接口
class __declspec(novtable) IMultiReadExclusiveWrite { public: virtual void BeginRead(void) = 0; virtual void BeginWrite(void) = 0; virtual void EndRead(void) = 0; virtual void EndWrite(void) = 0; virtual BOOL TryBeginRead(void) = 0; virtual BOOL TryBeginWrite(void) = 0; virtual BOOL IsReadBegan(void) = 0; virtual BOOL IsWriteBegan(void) = 0; virtual int WaitingReadCount(void) = 0; virtual int WaitingWriteCount(void) = 0; virtual int ActiveReadersCount(void) = 0; };
此对象实现测试包含在 源代码 中。测试结果可能如下所示:
最后,最复杂、功能最强大的同步对象是“多读取者 - 独占写入者”并带有“可升级读取”模式。这意味着以“可升级读取”模式拥有对象的线程可以原子地将其状态升级为写入模式或降级为读取模式。此对象的状态集与前一个对象相同,但接口已更改:
class __declspec(novtable) IUpgradableMultiReadExclusiveWrite { public: virtual void BeginRead(void) = 0; virtual void BeginWrite(void) = 0; virtual void EndRead(void) = 0; virtual void EndWrite(void) = 0; virtual BOOL TryBeginRead(void) = 0; virtual BOOL TryBeginWrite(void) = 0; virtual void BeginUpgradableRead(void) = 0; virtual BOOL TryBeginUpgradableRead(void) = 0; virtual void EndUpgradableRead(void) = 0; virtual BOOL IsReadBegan(void) = 0; virtual BOOL IsWriteBegan(void) = 0; virtual int WaitingReadCount(void) = 0; virtual int WaitingWriteCount(void) = 0; virtual int WaitingUpgradeCount(void) = 0; virtual int ActiveReadersCount(void) = 0; }
在“可升级读取”模式下,可以构建这样的代码:
BOOL fbWrite(FALSE); Lock.BeginUpgradableRead(); __try { ... ReadSharedResource(...); ... if (SomePredicate) { Lock.BeginWrite(); fbWrite = TRUE; ... WriteSharedResource(...); ... } } __finally { if (fbWrite) Lock.EndWrite(); else Lock.EndUpgradableRead(); }
您可以在同步对象中实现自己的读取者和写入者线程策略,可以引入操作优先级等。
现在是讨论此方法的缺点的时候了。如果您从 源代码 运行此对象实现测试,您会看到代码运行时有巨大的延迟。实际上,正如您所见,许多线程的时间片仅用于调用 SwitchToThread() 函数。如果您运行 源代码,您将看到为此方法付出的代价。首先,我修改了非常早期对象实现的第一个代码
while (lsMaintenanceMode == (LatchState = static_cast<LATCHSTATE>(::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), lsMaintenanceMode)))) { if (lnSpinSwitchCount < THREAD_WAIT_SPIN_COUNT) { ::YieldProcessor(); lnSpinSwitchCount++; } else SpinOnce(); }
并引入了两个新方法:
void CSpinWaitingLatch::SpinOnce(void) { switch((::InterlockedIncrement(reinterpret_cast<LONG volatile*>(&mf_lnSpinSwitchCount)) - 1) % 3) { case 0: ::SwitchToThread(); break; case 1: ::Sleep(0); break; case 2: ::Sleep(1); break; } } void CSpinWaitingLatch::WaitOnce(void) { ::InterlockedIncrement(reinterpret_cast<LONG volatile*>(&mf_lnWaitSwitchCount)); ::SwitchToThread(); }
对于 8 个线程(在 2 核 CPU 上)结果是:
这里有 643470 + 206 次线程上下文切换。这个数字取决于各种因素,例如您的网络活动、正在运行的任务等。考虑到每次切换都会消耗数千个 CPU 周期,浪费了大量的周期。显然,必须通过冻结等待的线程来避免冗余切换。
Windows 为我们提供了一对互补函数:SuspendThread 和 ResumeThread。它们使用线程句柄作为参数。但是,如果线程调用 SuspendThread 来阻塞自身,如何解除阻塞?谁知道被阻塞的线程?因此,需要在应用程序中某个地方存储和管理有关被阻塞线程的簿记信息,换句话说,需要某种线程调度程序。因此,我必须引入一个单独的线程来控制和调度(安排)这些线程。所有的簿记工作都在这个线程中完成。
线程通过消息与调度器线程交互,因为 Windows 提供了丰富的函数来实现这一点。
方法 #1。
线程通过 PostThreadMessage 调用向调度器发布消息。调度器线程中的消息队列是通过标准调用创建的:
::PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
结果是:
同样,您将获得不同的结果。不算差,但不够。此方法的主要缺点是调度器与目标线程之间的弱同步。消息不是立即出队,而是会在队列中等待一段时间才能出队。这种延迟可能是巨大的。自从消息发布到出队并处理为止,线程一直在运行,并且可能已获取同步对象的状态(即将其置于维护模式)。如果我们此时阻塞调度器,就会导致死锁:线程拥有对象并被阻塞,其他线程正在等待对象状态的改变。这就是为什么我必须在查询对象状态之前发布取消阻塞消息:
LONG lnSpinSwitchCount(0); // Wait while latch is in maintenance mode. Another thread already acquired latch state. ::PostThreadMessageW(dwThreadDispatcherID, DM_THREAD_UNBLOCK, static_cast<WPARAM>(::GetCurrentThreadId()), 0); while (lsMaintenanceMode == (LatchState = static_cast<LATCHSTATE>(::InterlockedExchange(reinterpret_cast<volatile LONG*>(&mf_LatchState), lsMaintenanceMode)))) { if (lnSpinSwitchCount < THREAD_WAIT_SPIN_COUNT) { ::YieldProcessor(); lnSpinSwitchCount++; } else SpinOnce(); }
这只是为了避免死锁。因此,我需要构建一个强大的调度模型。
方法 #2。
线程通过 SendMessage 调用将消息发送到调度器。调度器线程创建一个仅消息窗口(“Message”类名)。不要忘记子类化窗口过程。这是为什么?众所周知,::SendMessage 调用会阻塞调用线程,直接调用目标窗口过程并返回该调用的结果(如果窗口过程使用它,则返回 ReplyMessage 的结果)。因此,自 SendMessage 调用以来,线程就被阻塞了。结果:
好多了。
结论
1. 所有描述的对象都用于您的应用程序进程内部。自然,您无法将其引用传递到进程外部(系统或第三方模块,除非该第三方是您自己,当然)。
2. 您可以通过向调度器提供额外的信息来改进它。例如,您可以将以下接口引入同步对象:
typedef enum { soCritical, soNotification, soEvent, ... } SYNC_OBJ_TYPE; class __declspec(novtable) ISycnObjectTypeInfo { public: virtual SYNC_OBJ_TYPE GetObjectType(void) = 0; };
每个对象将返回其自己的类型。因此,您可以提供包含线程句柄和同步对象指针的消息,您的调度器可以知道哪个特定线程阻塞在哪个特定对象上。通过实现上述接口,调度器将能够正确地转换对象指针。
3. 被放弃的对象。如果您使用过互斥体,您一定知道被放弃的互斥体。在 Windows 编程的可用文献中,对被放弃的对象没有明确的解释(无论如何我都找不到)。对我来说,这意味着您失去了应用程序的流程控制(其中一个线程存在逻辑错误)。但在某些情况下,您需要跟踪这种情况以记录该事件并保存信息,或尝试恢复它(至少在调试阶段)。Vista 线程池非常适合此目的。让您在线程句柄上注册的等待发送(或发布)消息到调度器。如果您的调度器知道所有被跟踪的线程和同步对象,它就可以尝试切换同步对象的状态以避免死锁。
就是这样。