65.9K
CodeProject 正在变化。 阅读更多。
Home

托管 I/O 完成端口 (IOCP) - 第 2 部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (30投票s)

2005 年 9 月 12 日

31分钟阅读

viewsIcon

215132

downloadIcon

1599

适用于托管 IOCP 的无锁对象池、无锁队列和线程池。

目录

  1. 引言
  2. 带无锁队列的托管 IOCP
  3. 无锁对象池
  4. 在 .NET 应用程序中使用无锁对象池
  5. 将它们整合在一起 - ManagedIOCP 设计概述
  6. ManagedIOCP 线程池
  7. ManagedIOCP 线程池内部
  8. ManagedIOCP 线程池的可扩展任务框架
  9. Sonic.Net 源代码和演示应用程序
  10. 关注点
  11. 历史
  12. 软件使用

1. 介绍

托管 I/O 完成端口 (IOCP) 是一个名为 Sonic.Net 的 .NET 类库的一部分,我于 2005 年 5 月首次在 CodeProject 上发布了它。Sonic.Net 是一个免费的开源类库,可用于构建高度可伸缩的服务器端 .NET 应用程序。Managed IOCP 的第二部分构建在我第一个 Managed IOCP 文章的基础上。因此,在阅读 Managed IOCP 的第二部分之前,您必须阅读 Managed IOCP 的第一部分。我的第一篇关于 Managed IOCP 的文章标题是 "托管 I/O 完成端口 (IOCP)"。我请求读者阅读我第一篇文章中的讨论线程,因为它们包含大量关于 Managed IOCP 及其用法的澄清和信息。特别是,与 'Craig Neuwirt' 的讨论揭示了我原始 Managed IOCP 实现中的一个关键问题,并帮助我纠正了它,使 ManagedIOCP 变得更好(谢谢 Craig :))。

2. 带无锁队列的托管 IOCP

在深入探讨无锁内容之前,本文可下载 Zip 文件中包含的托管 IOCP 源代码和演示应用程序是使用我的无锁 Queue 类编译的。从 Sonic.Net 项目属性中删除*条件编译器常量* LOCK_FREE_QUEUE 将使 Sonic.Net 程序集能够使用 .NET 同步 Queue 类进行编译。

回到我们的无锁内容,我在 Sonic.Net v1.0 类库中使用了同步的 System.Collection.Queue 类作为 Managed IOCP 的内部对象队列。这为将对象分派到 Managed IOCP 的线程之间以及从 Managed IOCP 检索对象的线程之间提供了锁争用的范围。我不得不使用同步队列,因为队列的 poppush 操作不是原子性的。在 push 操作期间,线程必须首先将队列尾节点的下一个节点设置为指向保存要入队的新对象的节点。接下来,线程必须将尾部指向新节点,因为新节点现在将是队列的尾部。这两个操作不能原子地执行(使用单个 CPU 指令)。这意味着当多个线程将对象推入(入队)到同一个队列实例时,某些入队操作可能会不成功,更危险的是,执行 push 操作的线程甚至不会知道。以下是这种情况...

  1. 线程 A 开始入队(push)操作。
  2. 线程 A 创建了保存要入队的新对象的新节点。
  3. 线程 A 将新节点地址分配给当前尾部的下一个节点。
  4. 线程 B 开始入队(push)操作。
  5. 线程 B 创建了保存要入队的新对象的新节点。
  6. 线程 B 将新节点地址分配给当前尾部的下一个节点。

灾难来了。线程 A 认为它已经将当前尾部的下一个节点指向了它的新节点。但在它将尾部指向新节点(使新节点成为新尾部)之前,线程 B 已经将当前尾部的下一个节点指向了自己的新节点。糟糕!!!线程 A 的新节点现在悬在半空中。灾难尚未结束。请查看以下操作序列,该序列是线程 A 和线程 B 上述操作的延续。

  1. 线程 A 将当前尾部指向其新节点(希望使其新节点成为新尾部)。
  2. 线程 A 退出了入队(push)操作。
  3. 线程 B 将当前尾部指向其新节点,实际上使其新节点成为新尾部。

在上述事件序列中的第 9 步之后,线程 A 实际入队的新节点丢失了。它丢失了,因为尾部的下一个节点和尾部都没有指向它。它已成为一个孤立对象。在 C++ 世界中,这将导致应用程序中出现严重的内存泄漏,因为无法访问新节点及其包含的对象,因此没有人能够释放它。幸运的是,在 .NET 世界中,CLR 的垃圾回收器将前来救援,并最终在某个时候清理孤立节点及其包含的对象。但这里更严重的问题不是孤立对象,而是入队操作丢失,而执行操作的线程(线程 A)却不知道。线程 A 会认为它已成功入队对象,并且不会报告任何错误。这会导致对象丢失,这可能意味着应用程序中的数据丢失。数据丢失,而且没有通知,对于任何应用程序来说都是一个严重的问题。

从上述讨论中应该清楚,以下两个逻辑相关的操作应该使用某种锁定机制进行同步,以便在任何给定时间点,进程中只有一个线程可以执行它们。

  1. 线程 X 将新节点地址分配给当前尾部的下一个节点。
  2. 线程 X 将当前尾部指向其新节点,并使其新节点成为新尾部。

使用锁定同步对公共对象的访问可能并非总是正确的方法。它肯定会阻止数据丢失和损坏,但是当您的应用程序在多处理器环境中运行时,它可能会降低应用程序的性能。这是因为在多处理器环境中,您将体验到线程执行的真正并行性。因此,当两个线程并行运行并尝试将对象入队(push)到同一个队列时,其中一个线程必须等待另一个线程退出锁定代码区域。由于此入队操作是基于 Managed IOCP 的应用程序中非常频繁使用的核心功能,因此线程*可能*会遇到锁争用,并且内核活动可能会显著增加,因为操作系统必须不断地将线程从运行模式切换到挂起模式,反之亦然。当没有对象排队到 Managed IOCP 时,暂停线程是可取的。但是当存在对象时,如果每个线程在其给定 CPU 周期时间内尽可能长时间地保持活动状态,那将是很好的。

我们(Windows 开发人员)到目前为止一直在 Windows 环境中成功使用基于锁的同步。因此我们可以继续使用它。但是还有其他更有前途的替代技术来替代基于锁的同步,尤其是在设计高度可伸缩的服务器端应用程序时。这些被称为无锁算法。这些技术将允许多个线程安全地使用公共对象,而不会损坏对象状态或导致任何数据丢失。这些技术在最近一直在进行深入研究,并正在慢慢进入主流应用程序开发。它们采用较慢的原因是,在实现某些数据结构(如哈希表)时,很难证明这些技术的正确性。但幸运的是,对于被多个线程大量使用的 Managed IOCP 队列,有成熟且经过充分测试的无锁算法。

2.1. 无锁队列

我为了在 Managed IOCP 中使用而实现的无锁队列是建立在设计无锁队列数据结构的一种成熟算法之上的。正如我们之前讨论的,我们需要两个物理操作来执行队列上的单个入队(push)操作。所以我们需要检查的是,当我们读取当前尾部的下一个节点并将其分配给我们的新节点时,没有其他线程修改当前尾部的下一个节点。因此,当前尾部的下一个节点的读取和交换应该原子地成功或失败。由 .NET(硬件和 Windows OS 支持)提供的同步原语 CAS 非常适合这种情况。我在我的第一篇关于 Managed IOCP 的文章的 4.2 节中详细讨论了 CAS(有关我的第一篇关于 Managed IOCP 的文章的链接,请参阅第 1 节)。

此外,我在此部分讨论的数据丢失问题通过使用拖尾技术解决。在此技术中,在上述序列的第 3 步之后,当前尾部的下一个节点将指向线程 A 创建的新节点。因此,当线程 B 开始其入队操作时,它可以检查当前尾部的下一个节点是否为 null。如果不是,则表示其他某个线程已更改当前尾部的下一个节点,但尚未修改尾部本身。因此,线程 B 将将*拖尾*当前尾部前进到指向尾部自己的下一个节点,因为线程 A 在 OS 分配 CPU 时间片时无论如何都会这样做。现在,线程 B 可以重新启动其入队操作。当线程 B 成功更改当前尾部的下一个节点时,它可以退出此下一个节点检查循环,并将尾部指向其新节点,前提是其他某个线程在入队操作的下一个节点检查步骤中尚未前进尾部。

下面的代码演示了用于实现无锁入队操作的拖尾技术

public void Enqueue(object data)
{
    Node tempTail = null;
    Node tempTailNext = null;
    Node newNode = _nodePool.GetObject() as Node; //new Node(data);
    newNode.Data = data;
    do
    {
        tempTail = _tail as Node;
        tempTailNext = tempTail.NextNode as Node;
        if (tempTail == _tail)
        {
            if (tempTailNext == null)
            {
                // If the tail node we are referring to is really the last
                // node in the queue (i.e. its next node is null), then
                // try to point its next node to our new node
                //
                if (Interlocked.CompareExchange(ref tempTail.NextNode, 
                           newNode,tempTailNext) == tempTailNext)
                    break;
            }
            else
            {
                // This condition occurs when we have failed to update
                // the tail's next node. And the next time we try to update
                // the next node, the next node is pointing to a new node
                // updated by other thread. But the other thread has not yet
                // re-pointed the tail to its new node.
                // So we try to re-point to the tail node to the next node of the
                // current tail
                //
                Interlocked.CompareExchange(ref _tail,tempTailNext,tempTail);
            }
        }
    } while (true);

    // If we were able to successfully change
    // the next node of the current tail node
    // to point to our new node, then re-point
    // the tail node also to our new node
    //
    Interlocked.CompareExchange(ref _tail,newNode,tempTail);
    Interlocked.Increment(ref _count);
}

上述代码中一个有趣的地方是,我将 CAS (Interlocked.CompareExchange) 用于对象类型。这是 .NET 支持的一个很棒的功能。这种形式的 CAS 将比较第一个参数指向的对象引用值与第三个参数(比较对象)的值,并使第一个变量指向第二个参数中指定的对象。当前 Interlocked.CompareExchange 对对象类型的缺点是,您不能将自己的引用类型变量与此 .NET API 一起使用。因此,我不得不使用 object 数据类型来定义我的 Node 类中的数据成员以及 ManagedIOCP 类中的 head 和 tail 对象引用。正如您在上面 Enqueue 方法的代码中观察到的,这导致变量从 objectNode 类型的类型转换。

您可以在源代码中查看无锁 Dequeue 操作。阅读上述讨论后,理解无锁 Dequeue (pop) 操作将很容易。我还提供了代码注释以帮助读者理解代码背后的逻辑。

2.2. 带无锁队列的托管 IOCP 的测试结果

我在 Pentium IV 3.0 GHz HT 系统和 Pentium III 700 MHz 单处理器系统上运行了使用无锁队列实现编译的 Managed IOCP 的相同 WinForms 演示应用程序。结果略好于 .NET 同步 Queue。与使用 .NET 同步 Queue 类相比,在使用带无锁队列的 Managed IOCP(使用性能监视器应用程序)时,我注意到本文提供的控制台演示应用程序中的锁争用显着减少。在速度方面,我没有看到明显的优势。这可能是因为我的演示应用程序仅用作验证 Managed IOCP 和其他 Sonic.Net 类功能正确性的测试平台,并对其进行压力测试以识别任何隐藏的多线程错误。我确信,当在实际应用程序场景中与线程中的良好处理一起使用时,带无锁队列的 Managed IOCP 应该表现更好。

我相信,尽管我讨论了所有关于无锁队列的优化,但使用它所获得的性能会根据其使用环境而变化。因此,我要求使用 Managed IOCP 的开发人员使用 .NET 同步 Queue 和我自定义实现的无锁队列来测试他们的应用程序,以了解两种队列类的性能。

3. 无锁对象池

对象池是一种技术,允许我们一遍又一遍地重复使用具有不同状态的现有对象。例如,在无锁队列数据结构中,我有一个 Node 对象,它表示队列中的一个节点。但是 Node 对象的唯一用途是保存推入队列的对象。当一个对象从队列中弹出后,保存该对象的 Node 对象就没有活动引用,并且最终会被 CLR 进行垃圾回收。

如果我可以在一个对象从队列中弹出后,将 Node 对象重新用于其他 push 操作,那将是高效的。这样,我可以减少大量新对象的分配,从而减少应用程序中的 GC 活动。这还将提高应用程序的整体性能,因为垃圾回收的对象数量会更少,并且 CLR 不会频繁地阻碍应用程序的执行以进行垃圾回收。

但是为了维护已释放的 Node 对象列表,我们需要一个可以维护队列的数据结构,该队列不需要新的 Node 分配。这个特殊的队列将像一个链表,我们可以在顶部插入新链接并从底部删除链接(FIFO)。这个队列是特殊的,因为当一个对象(在我们的例子中是 Node)被推入其中时,它不会创建一个新的 Node 来保存它。它假定推入其中的对象包含一个链接成员,该成员可用于链接队列中的下一个 Node。有了这个假设,它可以使用推入其中的对象本身作为 Node。为此,我创建了一个名为“PoolableObject”的类型,它有一个指向相同类型(PoolableObject)对象的成员。因此,如果我将一个“PoolableObject”对象推入我们的对象池队列,它只会将我们新对象的链接数据成员指向队列的当前顶部元素,从而使我们的新对象成为队列的顶部。当您从该队列中弹出对象时,它只会返回队列的底部对象,并将底部元素的上一个元素的链接数据成员设置为 null,从而使其成为最底部可以弹出的元素。

下面的代码显示了“PoolableObject”类型的定义。任何需要池化其对象的类型都可以从该类型派生。

/// <summary>
/// Poolable object type. One can define new poolable types by deriving
/// from this class.
/// </summary>
public class PoolableObject
{
    /// <summary>
    /// Default constructor. Poolable types need to have a no-argument
    /// constructor for the poolable object factory to easily create
    /// new poolable objects when required.
    /// </summary>
    public PoolableObject()
    {
        Initialize();
    }
    /// <summary>
    /// Called when a poolable object is being returned from the pool
    /// to caller.
    /// </summary>
    public virtual void Initialize()
    {
        LinkedObject = null;
    }
    /// <summary>
    /// Called when a poolable object is being returned back to the pool.
    /// </summary>
    public virtual void UnInitialize()
    {
        LinkedObject = null;
    }
    internal object LinkedObject;
}

上述 PoolableObject 类型由一个名为 ObjectPool 的类使用,该类为我们提供了将对象存储在池中的机制。这个类提供了向池中添加新对象和从池中检索现有对象的方法。这个 ObjectPool 类是使用本节开头我们谈到的特殊 FIFO 无锁队列实现的。这个队列不分配 Node 来保存可池化对象,而是将排队的对象本身用作 Node。因此,我们说我们需要一个能够将自身链接到同一类型对象的能力的类型。这里就是上面代码片段中定义的“PoolableObject”类型。

上述 PooledObject 的定义很简单。它有一个默认的无参数构造函数,以便于由名为 PoolableObjectFactory 的工厂类(我稍后会解释)轻松创建可池化对象。它有一个虚拟的 Initialize 方法,派生类可以覆盖它以执行任何初始化。当可池化对象工厂创建新的可池化对象时,以及当 ObjectPool 类从其对象池队列返回对象时,ObjectPool 类会调用此方法。它有一个虚拟的 UnInitialize 方法,派生类可以覆盖它以执行任何反初始化。当可池化对象被添加到其对象池队列时,ObjectPool 会调用此方法。

正如我上面提到的,我正在使用一个工厂类 PoolableObjectFactory 来创建从 PoolableObject 类型派生的新对象。我需要这样做是因为最初创建新的 ObjectPool 时,它的池中不会有任何对象。因此,当应用程序向它请求一个对象时,它会默默地创建一个新对象并将其返回给调用者。但是,对于 ObjectPool 创建新的可池化对象,它不知道从 PoolableObject 派生的对象类型。这是应用程序特定的,只有从 PoolableObject 类型创建新类型的应用程序才知道它将使用 ObjectPool 类池化哪种类型的对象。因此,我提供了一个名为 PoolableObjectFactory 的抽象工厂类型,开发人员可以实现它来创建和返回其可池化类型的对象。如本节前面所述,开发人员创建的可池化类型应从抽象可池化类型“PoolableObject”派生。

下面的代码显示了 PoolableObjectFactory 类型的定义

/// <summary>
/// Defines a factory interface to be implemented by classes
/// that creates new poolable objects
/// </summary>
public abstract class PoolableObjectFactory
{
    /// <summary>
    /// Create a new instance of a poolable object
    /// </summary>
    /// <returns>Instance of user defined
    ///    PoolableObject derived type</returns>
    public abstract PoolableObject CreatePoolableObject();
}

4. 在 .NET 应用程序中使用无锁对象池

上一节(第 3 节)讨论了有关 PoolableObjectPoolableObjectFactory 的大部分详细信息。在本节中,我将向您展示 PoolableObjectPoolableObjectFactory 类型的实际实现。本节将帮助您在 .NET 应用程序中构建自己的可池化对象,这些对象可以由 ObjectPool 类为您管理和池化。

正如我在上一节(第 3 节)中提到的,无锁队列使用的 Node 类型是一个可池化对象。它派生自 PoolableObject 类型。它还展示了派生类对 PoolableObject 类型的 Initialize 方法的简单实现。下面的代码显示了 Node 类型的定义。

/// <summary>
/// Internal class used by all other data structures
/// </summary>
class Node : PoolableObject
{
    public Node()
    {
        Init(null);
    }
    public Node(object data)
    {
        Init(data);
    }
    public override void Initialize()
    {
        Init(null);
    }
    private void Init(object data)
    {
        Data = data;
        NextNode = null;
    }
    public object Data;
    public object NextNode;
}

在上述 Node 类的定义中,您可以看到粗体显示的 PoolableObject 类的重写方法 Initialize。我没有重写 PoolabelObject::UnInitialize 方法,因为 Node 类不需要执行任何反初始化工作。相反,PoolableObject 类提供的默认方法就足够了。请记住,派生类对 UnInitialize 的实现应该调用基类 (PoolableObject) 的 UnInitialize 方法,因为它执行了一项重要的工作,即将其 Link 数据成员设置为 null。这对于 ObjectPool 类的正常运行至关重要。

一旦我们定义了一个新的可池化类型,我们需要提供抽象 PoolableObjectFactory 的实现,ObjectPool 类将使用它来在需要时创建新的可池化对象。下面的代码显示了 PoolableObjectFactory 类的实现,我用它来创建可池化 Node 类型的新实例。

/// <summary>
/// Factory class to create new instances of the Node type
/// </summary>
class NodePoolFactory : PoolableObjectFactory
{
    /// <summary>
    /// Creates a new instance of poolable Node type
    /// </summary>
    /// <returns>New poolable Node object</returns>
    public override PoolableObject CreatePoolableObject()
    {
        return new Node();
    }
}

现在我们有一个可以使用 ObjectPool 类进行池化的类型(Node),以及一个 ObjectPool 可以用来在需要时创建我们的可池化 Node 类型新实例的类型(NodePoolFactory)。现在,要使用可池化 Node 对象,就像创建 ObjectPool 类的实例并向其提供 NodePoolFactory 类的实例引用一样简单。下面是一个代码片段,显示了 ObjectPool 实例的创建,取自无锁队列类。

private ObjectPool _nodePool = 
         new ObjectPool(new NodePoolFactory());

一旦 ObjectPool 被实例化,我们就可以通过使用 ObjectPool 类的 GetObject() 实例方法从池中获取我们可池化对象类型(Node)的对象。下面的代码展示了如何使用 ObjectPool 类从池中获取对象。

Node newNode = _nodePool.GetObject() as Node;

当我们使用完一个可池化对象后,我们应该把它还给池,以便以后可以重复使用。我们可以通过调用 ObjectPool 类上的 AddToPool() 实例方法将一个对象添加回池中。例如,在无锁队列类中,一旦一个对象从 Queue 中弹出,保存该对象的 Node 就可以重复使用来保存任何要入队到 Queue 的新对象。因此,在离开 Dequeue 操作之前,我们将 Node 对象添加到 Queue 类维护的 Node 对象池中。

_nodePool.AddToPool(tempHead);

5. 将它们整合在一起 - ManagedIOCP 设计概述

上图显示了 ManagedIOCP 的设计及其与无锁 ObjectPool 和无锁 Queue 的关系。

6. ManagedIOCP 线程池

线程池一直是具有大量异步和并行计算要求的应用程序不可或缺的一部分。通常,服务器端应用程序使用线程池来实现一致且易于使用的编程模型,以并行和异步执行任务。以 Managed IOCP 为核心技术,我构建了一个线程池,它不仅提供基本的线程管理,还提供以下列出的其他一些重要功能

  1. 线程池中允许的最大线程数(与并发限制不同)。
  2. 线程池中活动线程的并发限制。
  3. 可扩展的任务框架,用于定义可由线程池执行的应用程序任务。

在深入了解 ManagedIOCP 线程池的内部实现之前,我将描述它在 .NET 应用程序中的用法。首先,ManagedIOCP 执行实现名为 ITask 接口的对象。ITask 接口的定义如下所示

/// <summary>
/// Interface used by ThreadPool class for executing
/// Tasks dispatched to it
/// </summary>
public interface ITask
{
    /// <summary>
    /// Executes the corresponding task
    /// </summary>
    /// <param name="tp">ThreadPool onto which
    ///        this Task is dispatched</param>
    void Execute(ThreadPool tp);
    /// <summary>
    /// Specifies to the ThreadPool whether to Execute this task based on whether
    /// this Task is Active or not. This allows for cancellation of Tasks after
    /// they are dispatched to ThreadPool for execution
    /// </summary>
    bool Active {get;set;}
    /// <summary>
    /// Indicates the task that its execution has been completed.
    /// </summary>
    void Done();
}

如上述定义所示,Execute 方法是应该编写执行任务逻辑的地方。一旦您有一个实现 ITask 接口并在其 Execute 方法中包含逻辑的类型,使用 ManagedIOCP 线程池就像创建它的实例并向其分派 ITask 对象一样简单。当线程池选择一个 ITask 对象执行时,它将调用该对象上的 Execute 方法。下面的代码显示了 ITask 接口的虚拟实现以及如何将其与 ManagedIOCP 线程池一起使用。

public class MyTask : ITask
{
    #region ITask Members

    public void Execute(Sonic.Net.ThreadPool tp)
    {
        // Do Some Processing
        // TODO::

        // Dispatch more objects to Thread Pool if required
        MyTask objTsk = new MyTask();
        tp.Dispatch(objTsk);
    }

    public void Done()
    {
        // May be you can pool this object, so that it can
        // be re-used
        // TODO::
    }

    public bool Active
    {
        get
        {
            return _active;
        }
        set
        {
            _active = value;
        }
    }

    #endregion
    
    // By default one can choose the ITask object to be active
    // or not. In this sample I chose it to be active by default
    private bool _active = true;
}

上述代码显示了一个实现了 ITask 接口的类型 MyTask。如果您观察代码,Execute 方法有一个 ThreadPool 对象作为参数,以便正在被线程池执行的对象可以访问 ThreadPool 对象本身,以进一步将对象分派到正在执行当前任务对象的线程池中。

此外,ITask 接口还有另外两个重要成员。当 ITask 对象上的 Execute 方法完成时,线程池会调用 Done 方法。这使得 ITask 对象有机会执行任何清理或自身池化以供重用(这是一个强大的概念,我将在“任务框架”部分简要讨论)。Active 属性指示线程池是否执行此 ITask 对象(是否调用 Execute 方法)。因此,如果一个应用程序在将 ITask 对象分派到线程池后,由于某种原因决定取消任务执行,它可以将任务对象的 Active 属性设置为 false,从而取消任务执行,前提是该任务对象尚未被线程池执行。

下面的代码演示了如何首先创建线程池实例,然后将 MyTask 对象分派给它以进行异步和并行执行

// Create a new instance of the ManagedIOCP Thread Pool class
// with 10 maximum threads and 5 concurrent active threads
ThreadPool tp = new ThreadPool(10,5);

// Create a new new instance of MyTask object and dispatch it
// to the Thread Pool for asynchronous and parallel execution
ITask objTask = new MyTask();
tp.Dispatch(objTask);

7. ManagedIOCP 线程池内部

ManagedIOCP 线程池是作为核心 ManagedIOCP 类的一个简单包装器实现的。当创建 ManagedIOCP 线程池的实例时,它会在内部创建一个 ManagedIOCP 实例,创建所有最大数量的线程,并将这些线程注册到 ManagedIOCP 实例中。当线程池的线程从线程池的 ManagedIOCP 实例中检索对象时,它会将对象转换为 ITask 对象并调用其上的 Execute 方法。

如果您观察 ThreadPool 类的构造函数,它有第二种形式的构造函数,它接受一个名为 ThreadPoolThreadExceptionHandler 的委托。当为该参数提供处理程序时,如果线程池线程在执行 ITask 对象时遇到任何异常,它将调用此委托并继续处理其他对象。如果处理程序抛出任何异常,则忽略该异常。如果未为此委托提供处理程序,则线程将忽略该异常并仍将继续处理其他对象。

7.1. 管理 ManagedIOCP ThreadPool 中的突发和空闲情况

为每个线程池实例一次性创建所有最大线程不会给系统带来开销。因为 ManagedIOCP 线程池中活动线程的数量由线程池的并发限制控制,该限制在线程池实例实例化期间指定,也可以在运行时设置。可能会出现以下情况:活动线程从线程池的 ManagedIOCP 实例中检索 ITask 对象,并在处理它们时进入等待模式(非运行)。这可能会发生,例如,如果 ITask 对象的 Execute 方法同步调用 Web 服务。当这种情况发生时,如果其队列中有任何挂起的 ITask 对象,线程池的 ManagedIOCP 实例将唤醒其他休眠线程来处理这些对象。当这些额外的线程正在处理 ITask 对象时,之前在执行其 ITask 对象时进入休眠模式的线程可能会退出休眠模式并开始运行。这将在线程池中创建一种状态,即在给定时间点,运行的线程数将超过允许的并发线程数。

通过将线程池中的最大线程数与允许的并发线程数相等,可以消除上述情况。但这可能会降低应用程序的可伸缩性。这是因为如果所有正在运行的线程都在等待外部资源/触发器/事件(如 Web 服务调用),尽管应用程序处于空闲状态,它也无法处理任何挂起的请求。

在线程池中,将最大线程数设置为大于允许的并发线程数总是期望的,以便在负载下扩展应用程序并尽可能多地利用系统资源。为了平衡突发情况和空闲情况,ThreadPool 使用的 ManagedIOCP 内置了对暂停不需要的已注册 IOCPHandle 的支持。当一个 IOCPHandle 进入等待状态时,如果当前活动线程数大于或等于允许的并发线程数,则 IOCPHandle 将排队到挂起队列中。这样,尽管 ThreadPool 中的最大线程数大于允许的并发线程数,但等待处理请求的线程将更接近允许的并发线程。这不会阻止实际活动线程数大于允许的并发线程数,但会将差异保持在最小水平。当新对象分派到 ManagedIOCP 时,如果当前活动线程数小于允许的并发线程数*并且*已注册的 IOCPHandle 数大于或等于允许的并发线程数,则 ManagedIOCP 将尝试获取挂起线程 (IOCPHandle)。如果找到一个,则通过设置其 IOCPHandle 的等待事件来选择该线程处理分派。这种情况可能发生在要分派的对象少于允许的并发线程数*或*某些活动线程进入等待模式时。在任何一种情况下,唤醒任何未挂起的线程可能都不会造成开销,并且无论如何都应该能够处理本节中讨论的空闲情况。

这样,动态 ManagedIOCP 应该能够处理基于 IOCP 的线程池设计中常见的突发和空闲情况。动态 ManagedIOCP 默认在 Sonic.Net 库中未启用。与动态 ManagedIOCP 相关的代码位于*条件编译常量* DYNAMIC_IOCP 内部。可以通过在 Sonic.Net 类库项目属性中指定*条件编译常量* DYNAMIC_IOCP 来启用动态 ManagedIOCP。

8. ManagedIOCP 线程池的可扩展任务框架

任务框架提供了一个可扩展的框架,用于创建将由 ManagedIOCP 线程池执行的任务。它提供抽象基类,并实现了 ITask 接口的 Active 属性和 Done 方法。这些抽象基类提供不同种类的任务,例如可等待任务、上下文绑定任务和可等待上下文绑定任务。此外,每个抽象任务类都派生自 PoolableObject 类型,从而提供任务池化。每个抽象任务类型都有一个相关的抽象工厂类,用于创建相应任务类型的实例。这些抽象任务工厂类型维护一个任务对象池。

任务框架中的所有抽象任务类都派生自一个名为 Task 的抽象基类。这个抽象基类实现了 ITask 接口的 Active 属性和 Done 方法,并且是实现 PoolableObject 抽象类的类。其他抽象任务类派生自这个类,并提供它们自己的功能,例如等待任务完成、上下文绑定等。用于创建不同类任务对象的所有抽象工厂类都派生自一个名为 TaskFactory 的抽象基类。这个抽象基类提供任务对象池化。这个抽象基类反过来又派生自 PoolableObjectFactory,其抽象方法必须由希望使用任务框架的应用程序实现。

下图显示了 ManagedIOCP ThreadPool 任务框架

ManagedIOCP ThreadPool Task Frameowrk

下图显示了 ManagedIOCP ThreadPool 任务工厂框架

ManagedIOCP ThreadPool Task Factory Frameowrk

8.1. 创建和使用通用任务

GenericTask 抽象类为要由线程池执行的任务提供 ITask 接口的基本实现。GenericTask 抽象类派生自 Task 抽象类,因此它提供诸如通过将 Active 属性值设置为“false”来取消任务执行的功能。GenericTask 是一个抽象类,因为它不实现 ITask 接口的 Execute 方法。应用程序使用 GenericTask 时,可以根据需要从它派生并实现 Execute 方法。下面的代码显示了一个从 GenericTask 派生并实现 Execute 方法的类

public class MyGenericTask : GenericTask
{
    public override void Execute(ThreadPool tp)
    {
        // Task execution code goes here
        // TODO::
    }
}

一旦我们有了应用程序特定的通用任务类,我们就可以创建它的实例并将其分派给线程池。

MyGenericTask gt = new MyGenericTask();
// tp is an instance of ManagedIOCP ThreadPool class
tp.Dispatch(gt);

我们可以使用 TaskFactory 及其派生抽象类来创建/获取 GenericTask 类的实例。优点是这些抽象工厂类提供任务对象的对象池化。下面的代码显示了一个从 GenerictaskFactory 派生并实现 GetObject 方法的类。此工厂创建/获取应用程序特定 GenericTask 类的实例。

class MyGenericTaskFactory : GenericTaskFactory
{
    public override PoolableObject CreatePoolableObject()
    {
        return new MyGenericTask();
    }
}

一旦我们有了应用程序特定的 GenericTask 类和关联的 GenericTaskFactory 类,我们就可以创建/获取应用程序特定的 GenericTask 类的实例并将它们分派给线程池。

MyGenericTaskFactory gtf = new MyGenericTaskFactory();
MyGenericTask gt = gtf.NewGenericTask(null, null);
// tp is an instance of ManagedIOCP ThreadPool class
tp.Dispatch(gt);

传递给 GenericTaskFactory 类的 NewGenericTask 方法的第一个 null 参数是赋予任务的 ID。它可以设置为有效的非对象,用于唯一标识任务。第二个 null 参数是任何需要与任务关联的应用程序相关对象。这可以用于传递与任务关联的额外信息,这些信息可以在任务执行期间使用。

8.2. 创建和使用可等待任务

WaitableTask 抽象类提供了一个任务,应用程序在将任务分派到线程池后,可以等待该任务由线程池执行。WaitableTask 类的创建和使用与 GenericTask 相同。WaitableTask 没有自己的工厂类,因为它是 GenericTask 的扩展,并提供了一种可等待机制来等待底层 GenericTask 的完成。对 WaitableGenericTask 的等待支持以毫秒为单位的超时。如果在等待操作期间发生超时,WaitableGenericTask 类上的 Wait 方法将返回“false”。下面的代码演示了如何等待 WaitableGenericTask

MyGenericTaskFactory gtf = new MyGenericTaskFactory();
MyWaitableGenericTask gt = gtf.NewGenericTask(null, null);
// tp is an instance of ManagedIOCP ThreadPool class
tp.Dispatch(gt);
// Wait infinitely on the task to complete
bool bTimeOut = gt.Wait(-1);

8.3. 创建和使用上下文绑定任务

ContextBoundGenericTask 抽象类提供了一个任务,其执行在相同上下文中的其他任务中是序列化的。上下文提供了一个逻辑锁定/解锁机制,可由在上下文下执行的任务使用。当任务在执行期间锁定关联上下文时,尝试锁定上下文的其他任务将被挂起,直到锁定上下文的任务将其解锁。ManagedIOCP 任务框架有一个名为 IContext 的接口,它表示一个上下文。任务框架还有一个名为 ContextIContext 接口的默认实现。Context 类使用 Monitor 同步对象提供锁定和解锁语义。

ContextBoundGenericTask 类的创建和使用与 GenericTask 相同。任务框架提供了一个名为 ContextBoundGenericTaskFactory 的单独工厂类。应用程序必须从 ContextBoundGenericTaskFactory 类派生,并实现其 CreatePoolableObject 方法,以创建/获取派生自 ContextBoundGenericTask 类的实例。

创建 ContextBoundGenericTask 派生类时,还有一个额外的步骤,即派生类 Execute 方法中实现的代码应锁定和解锁基类 ContextBoundGenericClass 中作为名为 Context 的属性可用的上下文对象。

下面的代码演示了如何创建和使用应用程序特定的 ContextBoundGenericTask

// Application specific ContextBoundGenericTask class
public class MyContextBoundGenericTask : ContextBoundGenericTask
{
    public override void Execute(ThreadPool tp)
    {
        Context.Lock();
        // Task execution code goes here
        // TODO::
        Context.UnLock();
    }
}
// Application specific ContextBoundGenericTaskFactory class
public class MyContextBoundGenericTaskFactory : 
                ContextBoundGenericTaskFactory
{
    public override PoolableObject CreatePoolableObject()
    {
        return new MyContextBoundGenericTask();
    }
}
// Create an instance of the task factory
MyContextBoundGenericTaskFactory ctxGTF = 
          new MyContextBoundGenericTaskFactory();

// Create a new Context. Each context
// object may have a unique id, which can be
// retrieved using a context id generator
// singleton class provided by Task framework
object ctxId = 
  ContextIdGenerator.GetInstance().GetNextContextId();
Context ctx = new Context(ctxId);

// Create/Acquire an instance of application
// specific ContextBoundGenericTask object
// and associate the context with it
MyContextBoundGenericTask ctxGT = 
  ctxGTF.NewContextBoundGenericTask(null, null,ctx);

// Dispatch the task to ThreadPool for execution.
// tp is an instance of ManagedIOCP ThreadPool class
tp.Dispatch(ctxGT);

9. Sonic.Net 源代码和演示应用程序

以下是文章 ZIP 文件中包含的文件的详细信息

  1. Sonic.Net (文件夹) - 我将此类别库命名为 Sonic.Net(Sonic 代表速度)。命名空间也指定为 Sonic.Net。我在本文中描述的所有类都在此命名空间中定义。文件夹层次结构如下所述
    Sonic.Net
    |
    --> Assemblies
    |
    --> Solution Files
    |
    --> Sonic.Net
    |
    --> Sonic.Net Console Demo
    |
    --> Sonic.Net Demo Application

    Assemblies 文件夹包含 Sonic.Net.dll(包含 ObjectPoolQueueManagedIOCPIOCPHandleThreadPool 类)、Sonic.Net Demo Application.exe(演示 ManagedIOCPIOCPHandle 类用法的演示应用程序)以及 Sonic.Net Console Demo.exe(演示 ThreadPoolObjectPool 类用法的控制台演示应用程序)。

    Solution Files 文件夹包含 Sonic.Net 程序集项目、Sonic.Net 演示应用程序 WinForms 项目和 Sonic.Net 控制台演示项目的 VS.NET 2003 解决方案文件。

    Sonic.Net 文件夹包含 Sonic.Net 程序集源代码。

    Sonic.Net Console Demo 文件夹包含 Sonic.Net 控制台演示应用程序源代码。此演示使用一个将由 ThreadPool 线程读取的文件。请将文件路径更改为您系统上的有效路径。下面的代码显示了代码中要更改的部分。此代码位于 ManagedIOCPConsoleDemo.cs 文件中。

    public static void ReadData()
    {
        StreamReader sr = 
          File.OpenText(@"C:\aditya\downloads\lgslides.pdf");
        string st = sr.ReadToEnd();
        st = null;
        sr.Close();
        Thread.Sleep(100);
    }

    Sonic.Net Demo Application 文件夹包含 Sonic.Net 演示应用程序源代码。

  2. Win32IOCPDemo (文件夹) - 此文件夹包含基于 WinForms 的演示应用程序,用于演示使用 PInvoke 的 Win32 IOCP 用法。编译后,Win32IOCPDemo.exe 将根据您选择的当前构建配置在 Win32IOCPDemo\bin\debugWin32IOCPDemo\bin\Release 文件夹中创建。默认构建配置设置为发布模式。

10. 兴趣点

总而言之,我们现在拥有一个 Sonic.Net 库,它提供无锁数据结构,如 QueueObjectPool,异步和并行编程基础结构类,如 ManagedIOCPThreadPool 和任务框架。除了这些类之外,Sonic.Net 程序集还带有一个名为 StopWatch 的小型实用程序类。StopWatch 类可用于方便地测量经过时间。如果您有兴趣,请查看。除了我随此类别库提供的测试应用程序之外,我相信这种类别库的真正测试是一个好的实际服务器端应用程序。我请求此类别库的用户提供任何反馈/建议,以修复错误并改进它。

我正在开发一个适用于 .NET 2.0 的 Sonic.Net 版本。我正在将 Managed IOCP 迁移到一个 _泛型_ 类,其中要排队的数据定义为模板参数。在这种情况下,我必须使用我的无锁队列来将对象排队到 Managed IOCP,因为 .NET 2.0 尚不支持同步泛型集合。我正在创建一个无锁泛型(模板化)队列,用于泛型 Managed IOCP。一旦完成,我将使用新代码更新本文,并分享我使用 .NET 2.0 泛型的经验。此外,.NET 2.0 最令人兴奋的部分是它支持泛型(模板化)Interlocked.CompareExchange。这意味着我们 Node 类中的数据成员以及 Managed IOCP 类中的头节点和尾节点对象引用可以是 Node 类型而不是 object 类型。这将提高效率并节省运行时的一些类型转换开销。

11. 历史

日期:2006 年 4 月 17 日

修复了与使用 Interlocked.CompareExchange 相关的问题。感谢 Smith Cameron (LexisNexis 组织) 指出此问题。

日期:2005 年 8 月 15 日

Sonic.Net v1.1 - 无锁 QueueObjectPool、具有改进(和增强)线程选择算法的 ManagedIOCP,用于执行分派的对象,基于 ManagedIOCP 的 ThreadPool,以及用于定义要由 ManagedIOCP ThreadPool 执行的任务的可扩展任务框架。

日期:2005 年 5 月 9 日

我还修复了 Windows 演示应用程序中的一个小错误(版本 1.0 中存在)。这个错误可能允许_演示_应用程序中的两个线程使用相同的 Label 对象来显示它们的计数。此版本(1.1)已修复此错误。此错误在 ManagedIOCPTestForm::StartCmd_Click(...) 方法中修复。

日期:2005 年 5 月 4 日

Sonic.Net v1.0(类库,托管 ManagedIOCPIOCPHandle 类实现,带有 .NET 同步 Queue,用于在 Managed IOCP 中保存数据对象)。

12. 软件使用

本软件按“原样”提供,不提供任何明示或暗示的保证。对于本软件可能造成的任何类型的损害或损失,我不承担任何责任。

© . All rights reserved.