65.9K
CodeProject 正在变化。 阅读更多。
Home

异步上下文处理器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.41/5 (20投票s)

2004年8月11日

14分钟阅读

viewsIcon

97332

downloadIcon

748

.NET 应用程序的异步消息处理基础结构。

Sample Image - ACPArch.png

1. 引言

.NET 异步编程模式结合 .NET 线程池,为构建异步应用程序提供了一个良好的起点。但要构建以异步消息处理为核心的可维护、可扩展和可扩展的企业级应用程序,还有很多工作要做。

ACP 是一个框架,它以接口和基础结构类的形式提供基本构建块,用于设计和开发使用异步消息处理的企业级 .NET 应用程序。本文介绍了 ACP 的组件以及如何在 .NET 应用程序中使用它们。ACP 的源代码和演示应用程序可从此文章下载。

ACP 采用基于接口的设计。基于接口的设计结合 .NET 中的反射 API,使我们能够构建具有可插入组件的可扩展应用程序。请查看“自定义/扩展 ACP”部分(第 3 节),了解 ACP 如何利用 .NET 的这一功能来允许 ACP 框架本身中具有可插入组件。

ACP 组件

让我们开始列出构建一个可维护、可扩展且可扩展的企业应用程序(在本例中为 .NET 应用程序)的异步消息处理基础结构所需的关键组件,以及它们如何使用 ACP 中的不同接口进行定义。

  1. 消息。我们需要一个好的 .NET 类型来模拟任何类型的消息。ACP 以 IMessage 接口的形式提供此功能,其方法和属性模拟了 SOAP 消息(相当符合行业标准以实现互操作性)。
  2. 消息处理器。我们需要一个能够理解和处理我们消息的类型。在 ACP 中,这称为消息处理器,由名为 'IMessageProcessor' 的接口定义。此接口允许将自定义消息处理器对象插入到使用 ACP 构建的应用程序中,以处理任何新类型的消息。
  3. 上下文。我们需要一个上下文来建立特定消息类型中逻辑上相关消息之间的关系。在 ACP 的当前实现中,上下文使用名为 IContext 的接口定义。与上下文相关的所有消息都按顺序处理。这提供了逻辑上相关消息任务之间的数据完整性。
  4. 上下文管理器。我们需要一个上下文管理器来创建和维护上下文。在 ACP 中,这由名为 'IContextManager' 的接口定义。
  5. 上下文处理器。我们需要一个对象来调度和执行与上下文相关的消息处理。此对象可以是 ACP 主机应用程序同一 AppDomain 中的一个线程,也可以是不同进程中的另一个 AppDomain,甚至是完全不同的计算机。在 ACP 中,这称为上下文处理器,并使用名为 'IContextProcessor' 的接口定义。将上下文处理器视为对象,可以设计和开发更具可扩展性和可扩展性的应用程序。如今,用于异步消息处理的线程和内存队列可能适合应用程序的要求。但是,明天,随着系统负载的增长,使用不同的系统和更健壮可靠的消息传递基础结构(如 MSMQ)进行异步消息处理将更具可扩展性。
  6. 上下文处理器池。为每个上下文维护一个上下文处理器将是一项非常昂贵的任务,而且可能无法很好地扩展。设想一个企业销售系统,该系统由数百个供应商应用程序和数千个最终用户访问。在此类应用程序中,为每个上下文维护一个上下文处理器将很快导致系统崩溃。更好的方法是维护一个上下文处理器池,该池可以处理多个上下文的消息处理。

    在 ACP 中,这使用名为 IContextProcessorPool 的接口定义。此接口允许指定上下文处理器池的名称、池的最大和最小上下文处理器数量,以及一个非常重要的池属性,该属性定义了池中上下文处理器的行为。这称为“消息处理模式”。此值可以是“拉取”或“推送”。

    在拉取模型中,每个上下文处理器将持续监视其分配的上下文是否有任何消息。这适用于具有最少上下文处理器数量的实时应用程序。

    在推送模型中,当上下文有消息时,每个上下文都会通知其分配的上下文处理器。此模型适用于需要更多上下文但处理不太紧急的情况。

    这两种模型都存在权衡。拉取模型是 CPU 密集型的,而推送模型是内存密集型的。目前,.NET 框架提供了一个进程范围的线程池。但是,对于严重依赖异步消息传递的企业应用程序来说,这足够吗?我的回答是否定的。.NET 线程池因多种原因而难以管理。线程池线程没有标识和行为,一旦将请求提交给线程池,就无法取消它。此外,它们是在进程内的。基于 ACP 的上下文处理器和上下文处理器池具有标识、可配置的行为,并且是位置无关的,这使得它们成为设计可维护、可扩展且可扩展的异步消息传递应用程序的理想选择。

  7. 上下文处理器池管理器。在企业应用程序中,需要区别处理不同模块的执行。因此,拥有一个具有固定设置的上下文处理器池可能无法满足应用程序运行的所有不同上下文。为了有效地处理这个问题,ACP 定义了一个名为上下文处理器池管理器的对象,使用一个接口(猜测)'IContextProcessorPoolManager'。这允许我们创建和维护多个具有不同设置(不同的消息处理模式,推送/拉取,不同的最大和最小上下文处理器数量等)的上下文处理器池。

2. 在 .NET (C#) 应用程序中使用 ACP

好的,以上所有理论讨论都基于一些抽象接口。但是,谁将提供实现呢?幸运的是,ACP 提供了所有上述接口的默认通用实现,以及 IMessageProcessor 的演示接口实现,后者在很大程度上是特定于应用程序的。演示的 IMessageProcessor 实现是一个数据库消息处理器,可以异步处理 select 查询。

本文不解释 ACP 接口默认实现的内部逻辑的详细步骤。ACP 代码组织良好,我尝试以得体的方式记录代码(使用 C# XML 代码注释标签的代码注释),以便一般的 .NET 和 C# 开发人员可以轻松弄清楚内部发生了什么。但是,我将介绍在 .NET (C#) 应用程序中使用 ACP 的重要方面,以提供一个初步了解并解释某些重要概念,这些概念可能会花费您宝贵的时间自行从代码中弄清楚。

步骤 - 1. 标识并创建上下文(们) [消息队列]

首先,在 ACP 中,如《简介》中所述,一切都围绕着上下文。因此,在应用程序中使用 ACP 的第一步是确定上下文对您的应用程序意味着什么。基本上,上下文意味着一组必须按顺序处理的消息。但这只是一个非常抽象的定义。为了帮助您,这里有一个典型的上下文示例...

例如,在采购订单应用程序中,在订单放置过程中,您希望更新数据库,然后向客户发送有关后续信息的电子邮件。这两项任务必须按顺序执行。因此,理想情况下,这些任务属于一个上下文。您应该足够小心地区分上下文和数据库/应用程序级别的事务。事务可以被视为一个上下文,通过在上下文中执行事务涉及的所有任务来处理。但反之则不成立。即,上下文不等于数据库事务,其中给定事务实例的所有操作都会成功或失败。而上下文不关心它处理的消息的成功或失败,更关心消息的处理顺序,以便消息按照发布到上下文的顺序进行处理。

不要迷失在术语中。ACP 的核心是消息的排队和处理。ACP 将这个简单的概念封装在一个强大的基础结构中,用于基于异步消息处理设计和开发 .NET 应用程序。异步消息设计将允许应用程序扩展以处理重负载、松耦合、高度互操作和可扩展,并最终易于维护。此外,ACP **并非旨在**取代 .NET 或 Microsoft Windows 平台中可用的任何异步消息传递技术。相反,ACP 提供了一个框架,它抽象了应用程序底层的**消息** **队列**、**消息调度**和**消息处理**子系统,从而提供了对应用程序如何演进的更多控制。

最后,代码片段... 以下代码片段展示了如何从 .NET (C#) 应用程序创建新上下文或获取现有上下文。此代码片段摘自演示应用程序 (ACPDemo),因此请查阅演示应用程序以了解所用变量的含义。

// Get a ContextManager
IContextManager ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
    // Create a Context if it does not exists
    IContext ctx = null;
    if (pubsCtx != null)
    {
        ctx = ctxMgr.GetContext(pubsCtx);
    }
    else
    {
        ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
        pubsCtx = ctx.ID;
        ctx.MessageProcessor.MessageProcessingCompleted = 
          new MessageProcessCompleteHandler(
          MessageProcessor_MessageProcessingCompleted_Pubs);
    }
}

代码片段中的要点(粗体词)...

  1. 'ProviderManager' 是一个 ACP 类,它允许应用程序动态选择上下文管理器实现类。这些类及其程序集可以在应用程序配置文件(xxx.exe.config)中配置。有关自定义 ACP 的更多详细信息,请参阅第 3 节。在我们的代码片段中,我使用了上下文管理器 'Generic' 的键名,该键名映射到 ACP 提供的 'IContextManager' 接口的默认实现。此映射在演示应用程序配置文件(ACPDemo.exe.config)中定义。
  2. 'IContextManager' 的 'GetContext' 方法采用上下文的唯一 ID(在 ACP 提供的 'IContextManager' 的默认实现中,这是从创建上下文时获取的当前 DateTime 的“Ticks”的字符串形式),并检索上下文对象上的 IContext 接口。ACP 附带一个默认的 'IContext' 实现,它使用内存同步队列来存储消息。
  3. 'IContextManager' 的 'CreateContext' 方法创建一个新上下文。请注意 'ContextMsgQType'。这是一个枚举,有两个值。“ContextLevel”和“Global”。'ContextLevel' 意味着创建的上下文将拥有自己的私有消息队列来存储发布给它的消息。“Global”意味着创建的上下文将共享一个 AppDomain 范围的通用队列来存储发布给它的消息。
  4. 'DBMsgProc' 是一个键,它映射到 'IMessageProcessor' 接口的演示实现。更多关于此内容将在第 3 步中介绍。

步骤 - 2. 定义上下文处理器池以调度和执行消息处理 [消息调度]

接下来是定义上下文处理器池,用于调度和执行与上下文相关的消息。以下来自演示应用程序 (ACPDemo) 的代码片段展示了如何创建上下文处理器池并将其分配给其中一个上下文处理器的上下文。

// Get a ContextProcessorPoolManager
IContextProcessorPoolManager cppMgr = 
   ProviderManager.GetContextProcessorPoolManager("Generic");
if (cppMgr != null)
{
    // Create/Get a Context Processor Pool with 'Push' Mode.
    IContextProcessorPool cpp = 
       cppMgr.CreateContextProcessorPool("Test", 
                 ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker from the pool to the Context
        cpp.AssignContext(ctx);

代码片段中的要点...

  1. ProviderManager 的静态方法 'GetContextProcessorPoolManager' 用于获取映射到键 'Generic' 的上下文处理器池管理器实现的 'IContextProcessorPoolManager' 接口引用。在此情况下,'Generic' 键映射到 ACP 提供的 'IContextProcessorPoolManager' 的默认实现。此映射在演示应用程序配置文件(ACPDemo.exe.config)中定义。如“简介”(第 1 节)所述,上下文处理器池管理器用于管理基于名称的上下文处理器池映射。
  2. 'IContextProcessorPoolManager' 的 'CreateContextProcessorPool' 方法将使用给定的名称和“上下文处理器池模式”(模式在《简介》部分讨论)创建一个新的上下文处理器池(实现 'IContextProcessorPool')。ACP 提供的 'IContextProcessorPoolManager' 的默认实现将在属性匹配时(即,如果名称和模式与现有池相同)检索现有池对象。
  3. 'IContextProcessorPool' 的 'AssignContext' 方法将创建一个新的上下文处理器或获取一个现有的上下文处理器(使用轮循算法选择),并将其分配给该上下文。这将为调度和执行已分配上下文的消息提供一个上下文处理器。ACP 提供的上下文处理器的默认实现使用线程实现。

步骤 - 3. 为上下文(们)定义消息处理器类型 [消息处理]

ACP 的设计允许在创建上下文时指定要使用的消息处理器类型。此设计允许为同一上下文类型的不同实例指定不同的消息处理器类型。以下来自演示应用程序 (ACPDemo) 的代码片段展示了如何为上下文实例分配消息处理器。

// Get a ContextManager
IContextManager    ctxMgr = ProviderManager.GetContextManager("Generic");
if (ctxMgr != null)
{
    // Create a Context if it does not exists
    IContext ctx = null;
    if (pubsCtx != null)
    {
        ctx = ctxMgr.GetContext(pubsCtx);
    }
    else
    {
        ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
        pubsCtx = ctx.ID;
        ctx.MessageProcessor.MessageProcessingCompleted = 
           new MessageProcessCompleteHandler(
           MessageProcessor_MessageProcessingCompleted_Pubs);
    }
}

代码片段中的要点...

'DBMsgProc' 字符串是 ACPDemo 应用程序配置文件(ACPDemo.exe.config)中的一个键,它映射到 ACP 中的 'IMessageProcessor' 接口的演示实现类。此演示实现具有处理数据库消息的良好代码基础,但目前还不完整。但是,ACP 中 'IMessageProcessor'(类 DatabaseQueryMessageProcessor)的当前实现为 ACP 的用户提供了创建消息处理器的任务的良好起点。

我将在未来几天更新此部分,提供一个相当完整的 'DatabaseQueryMessageProcessor' 实现,并尝试强调一些指南和技巧。

步骤 - 4. 将消息发布到上下文(们)并处理已处理的消息 [消息处理]

现在我们有了用于发布消息的上下文、用于调度消息的上下文处理器以及用于处理发布到上下文的消息的消息处理器。现在是实际的故事了。

  1. 如何将消息发布到上下文?

    您可以使用上下文对象的 'IContext' 接口上的 'QueueMessage' 方法,将消息发布到上下文以供处理。以下是从演示应用程序 (ACPDemo) 中提取的代码片段。您可以在代码片段中看到粗体显示的 'QueueMessage'。

    IContextProcessorPool cpp = 
       cppMgr.CreateContextProcessorPool("Test", 
       ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker to the Context
        cpp.AssignContext(ctx);
    
        // Create a DB message and queue it using context
        DBMessage msg = new DBMessage();
    
        string conStr = ConfigurationSettings.AppSettings["conStr"];;
        string query = ConfigurationSettings.AppSettings["query2"];;
        DBRequest request = new 
                DBRequest(conStr,query,DBRequest.DBQueryType.Select);
    
        msg.Request = request;
    
        // Queue the Message to be processed asynchronously by 
        // the Context Processor assigned to the context
        ctx.QueueMessage(msg,true);
        timer2.Enabled = true;
  2. 如何检索已处理的消息?

    消息处理器上的 'IMessageProcessor' 接口提供了一个委托 'MessageProcessingCompleted',该委托在消息处理器完成对排队到相应上下文的每条消息的处理后调用。以下是从演示应用程序 (ACPDemo) 中提取的代码片段。您可以看到 'MessageProcessingCompleted' 委托被分配给了演示应用程序的“form”类中的一个私有方法。在处理完相应上下文的每条消息后,将调用此方法。

    ctx = ctxMgr.CreateContext(ContextMsgQType.ContextLevel,"DBMsgProc");
    authCtx = ctx.ID;
    ctx.MessageProcessor.MessageProcessingCompleted = new 
      MessageProcessCompleteHandler(
      MessageProcessor_MessageProcessingCompleted);
    
    private void MessageProcessor_MessageProcessingCompleted(IMessage msg)
    {
        object[] args = new object[] { msg };
        this.Invoke(new DisplayData(this.ShowDataset),args);
    }
  3. 如何等待直到给定的上下文处理完所有待处理消息?

    'IContext' 接口提供了一个名为 'WaitOnAsyncOperations' 的方法,调用者可以使用该方法等待直到调用上下文的所有待处理消息都已处理完毕。此方法支持指定等待超时,以便调用者不必无限期地阻塞。以下是从演示应用程序 (ACPDemo) 中提取的代码片段。您可以在代码片段中看到粗体显示的 'WaitOnAsyncOperations' 方法调用的用法。

    IContextProcessorPool cpp = 
      cppMgr.CreateContextProcessorPool("Test", 
                ContextProcessorPoolMode.Push);
    if (cpp != null)
    {
        // Assign a ContextWorker to the Context
        cpp.AssignContext(ctx);
    
        // Create a DB message and queue it using context
        DBMessage msg = new DBMessage();
    
        string conStr = ConfigurationSettings.AppSettings["conStr"];
        string query = ConfigurationSettings.AppSettings["query1"];
        DBRequest request = new DBRequest(conStr,query,
                                      DBRequest.DBQueryType.Select);
    
        msg.Request = request;
    
        // Queue the Message to be processed asynchronously by 
        // the Context Processor assigned to the context
        ctx.QueueMessage(msg,true);
        timer1.Enabled = true;
    
        // Wait on our select operation to complete
        while(ctx.WaitOnAsyncOperations(1) == false)
        {
            Application.DoEvents();
        }
  4. 如何询问上下文是否已完成某条消息的处理?

    我正在处理这个问题,将在未来几天更新。

3. 自定义/扩展 ACP

ACP 中的可扩展点

ACP 的以下 3 个组件是可自定义的(这三个组件共同构成了 ACP)。

  1. 上下文管理器(创建和维护上下文)。
  2. 上下文处理器池管理器(创建和维护上下文处理器池)。
  3. 消息处理器(处理消息)。

可以通过在 ACP 的托管应用程序配置文件中指定与上述三个组件相关的接口的实现程序集和类名来实现自定义。

配置 ACP 中的可扩展点

您需要在 ACP 主机应用程序配置文件中的 'acpSettings' 节点下指定一个键值对。以下 XML 片段显示了 ACP 中各种可插入组件的配置。

  1. 消息处理器
    <MessageProcessors>
       <add key="DBMsgProc" 
           value="DBMsgProc.dll, DBMessageProcessorClass" />
    </MessageProcessors>
  2. 上下文管理器
    <ContextManagers>
       <add key="MyContextManager" 
           value="MyContextManager.dll, MyContextManagerClass" />
    </ContextManagers>
  3. 上下文处理器池管理器
    <ContextProcessorPoolManagers>
       <add key="MyContextProcessorPoolManager" 
           value="MyContextProcessorPoolManager.dll, 
                  MyContextProcessorPoolManagerClass" />
    </ContextProcessorPoolManagers>

注意: 有关示例配置文件,请参阅演示应用程序 (ACPDemo) 的应用程序配置文件(ACPDemo.exe.config)。

4. 演示应用程序

我尝试在演示应用程序中整合基本功能,该应用程序异步处理数据库消息。对于演示,我使用了 SQL Server 和 MSDE 附带的“Pubs”数据库。要使用演示,您需要根据演示exe配置文件(ACPDemo.exe.config)中的内容修改 SQL Server/MSDE 连接字符串和查询,该文件随演示下载一起提供。

演示应用程序性质简单,目前开发用于展示 ACP 在 .NET 应用程序中的快速使用。演示应用程序显示了两个 SQL 查询(在演示应用程序配置文件 ACPDemo.exe.config 中配置)并行运行。对于演示应用程序中的不完整之处,请原谅我。我计划在未来几天增强演示应用程序,并相应地更新文章代码。

5. 值得关注的要点

这只是我的第一部分工作。我计划创建下一篇文章,其中将涵盖使用 ACP 的更高级技术,例如...

  1. 取消已排队等待上下文的消息处理请求。
  2. 从 ACP 基础结构中获取运行时统计信息,例如,ContextPools、Contexts、Messages 等的数量。
  3. DatabaseMessageProcessor 实现中的更多功能,例如处理事务、自动提交和异常时自动回滚。
  4. 在 ACP 的当前版本中,Context 类使用内存队列实现,而 Context Processor 使用 .NET 线程实现。在我即将发布的 ACP 文章系列中,我将讨论使用 MSMQ 和专用 Windows 服务实现 Context 和 Context Processor。

这是即将发布的 ACP 系列文章的简要预览。

最后,我希望 ACP 框架能为在 .NET 世界中从事异步消息传递应用程序开发的人们提供一个良好的基础结构。此外,我将在未来几天更新本文档,以进行任何更正,并填补任何遗漏的概念/信息。

6. 历史记录

ACP v1.0。

7. 使用说明

本软件按“原样”提供,不附带任何明示或暗示的保证。对于本软件可能造成的任何损坏或业务损失,本人概不负责。

© . All rights reserved.