65.9K
CodeProject 正在变化。 阅读更多。
Home

一种通用的、类型安全的方式来传输 .NET 对象到 COM+ 队列组件

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.63/5 (7投票s)

2004年4月22日

5分钟阅读

viewsIcon

51015

downloadIcon

519

本文介绍了一种简单的方式,可以将任何 .NET 受管对象作为参数传递给 COM+ 队列组件。

引言

COM+ 队列组件是在你需要一个可靠的、事务性的机制来异步传输数据时一个绝佳的解决方案。然而,在 COM+ 中传递对象并不那么容易;你的对象必须实现 IPersistStream 接口,这非常繁琐。
.NET 提供了非常容易使用的对象序列化和可靠的消息队列 - 但没有事务或其他 COM+ 功能。

我想要的是两者的结合:编程的简易性和 COM+ 的功能。这在 J2EE 世界中已经存在了 - 所以我猜它也会出现在下一个 .NET 版本中 ;-)

在本文中,我将介绍一个名为 QCTransferObject 的类,你可以使用它将任何对象或对象图作为参数传递给队列组件。通过继承 QCTransferObject,你可以使其具有类型安全性。

背景

我的解决方案基于 CodeProject 上两篇早期文章的组合

  1. 如何将受管对象作为队列组件的参数传递
  2. 使用 .NET 队列组件进行远程日志记录

第一篇文章详细介绍了如何编写一个实现 IPersistStream 的类,以便它可以被传输并在 COM+ 队列组件中用作参数。但对于你需要的每个不同类,都需要大量的工作:对于每个成员对象,你必须编写代码将其转换为字节数组,以及从字节数组转换回来。如果这些成员对象包含其他也需要传输的对象,情况就更加复杂了,以此类推。

第二篇文章介绍了一种将任何 .NET 受管对象序列化为字节数组,以及从字节数组反序列化的方法。这用于将日志信息发送到队列组件。然而,接口 void LogExceptionMessage(string logString, byte[] arrBytes); 仍然使用原始类型 byte[],而我想要的是对象。

显然,结合上述两种技术,我们可以得到一个支持 IPersistStream 的类,它只有一个成员:一个可序列化的对象。由于该成员的类型是 Object,它可以是任何东西,甚至是对象图。使用第二篇文章中的字节数组序列化和反序列化,这个支持 IPersistStream 的类可以以通用的方式(反)序列化该成员对象。

使用代码

核心类:QCTransferObject

QCTransferObject 类是一个支持 IPersistStream 的类,你可以用它将任何可序列化的对象图传输到基于 .NET 构建的 COM+ 队列组件。它包含在 QCUtil 包中,该包由 3 部分组成:

  1. 导入 COM IPersistIPersistStream 接口。
  2. QCTransferObject 类。
  3. 一个 Codec 实用类,用于将任何可序列化对象转换为字节数组,以及从字节数组转换回来。

这是完整的代码。你的代码只会使用 QCTransferObject

using System;
using System;
using System.IO;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Runtime.Serialization.Formatters.Binary;
namespace QCUtil
{
  // Import definitions IPersist and IPersistStream
  #region Interfaces of the IPersistStream
  [InterfaceType(ComInterfaceType.InterfaceIsIUnknown),
    Guid("0000010c-0000-0000-C000-000000000046")]
  public interface IPersist {
    void GetClassID( /* [out] */ out Guid pClassID);
  };

  [InterfaceType(ComInterfaceType.InterfaceIsIUnknown),
    Guid("00000109-0000-0000-C000-000000000046")]
  public interface IPersistStream : IPersist {
    new void GetClassID(out Guid pClassID);
 
    [PreserveSig]
    int IsDirty( );        
    void Load([In] UCOMIStream pStm);
    void Save([In] UCOMIStream pStm, [In, 
     MarshalAs(UnmanagedType.Bool)] bool fClearDirty);
    void GetSizeMax(out long pcbSize);
  };
  #endregion

  // QCTransferObject can transfer itself as an 
  // object to a serviced COM+ queued component,
  // including its payload which must be a serializable object. 
  // If the payload is a (serializable)
  // object graph the complete graph 
  // is transferred (.NET serialization semantics).
  //[Serializable]
  [Guid("c547e5f2-aa59-4902-a3b3-015ffffcc4bb")]
  public class QCTransferObject : IPersistStream {
    // Payload must be a serializable object or object graph.
    private Object m_Payload;
    public Object payload {
      get { return m_Payload; }
      set { m_Payload = value; }
    }

    public QCTransferObject() {}

    public override String ToString() {
      String s = "QCTransferObject: payload=";
      return ((payload == null)? s + "null" : s + payload.ToString());
    }
    
    // The guid property is virtual in case you 
    // want to use typed subclasses.
    private Guid m_Guid = new Guid(
      "c547e5f2-aa59-4902-a3b3-015ffffcc4bb");
    protected virtual Guid guid {
      get { 
        Debug.WriteLine(@"QCTransferObject::guid="+m_Guid+"\n");
        return m_Guid; 
      }
    }

    // IPersistStream implementation. 
    // This makes this object go over the wire.
    #region IPersistStream Members
    public bool m_bRequiresSave;
  
    // Class ID will be of this class, 
    // or any subtype if typed transfer is used.
    public void GetClassID(out Guid pClassID) {
      Debug.WriteLine(@"QCTransferObject::GetClassID\n");
      pClassID = guid;
    }

    public int IsDirty() {
      Debug.WriteLine(@"QCTransferObject::IsDirty\n");
      return m_bRequiresSave ? 0 : -1;
    }

    // Called on player side (COM+ queued component server),
    // when this object (or subclass object)
    // is instantiated from byte array present in stream.
    public unsafe void Load(UCOMIStream pStm) {
      Debug.WriteLine(@"QCTransferObject::Load\n");
      Int32 cb;
      byte [] arrLen = new Byte[2];
      if (null==pStm)
        return ;
      //read the length of the string;
      Int32* pcb = &cb;
      pStm.Read(arrLen, 2, new IntPtr(pcb));
      //calculate the length.
      cb = 256 * arrLen[1] + arrLen[0];
      //read the stream to get the string.
      byte [] arr = new byte[cb];  // BUG BUG BUG
      pStm.Read(arr, cb, new IntPtr(pcb));

      payload = Codec.ByteArrayToObject(arr);
    }

    // Called on recorder side (client sidd) when this
    // object (or subclass object) is to
    // be sent over the network as a byte array.
    public unsafe void Save(UCOMIStream pStm, bool fClearDirty) {
      Debug.WriteLine(@"QCTransferObject::Save\n");
      Int32 cb;
      Int32* pcb = &cb;
      byte[] arrLen = new byte[2];

      //convert the string into a byte array.
      byte [] arr = Codec.ObjectToByteArray(payload);
      arrLen[0] = (byte)(arr.Length % 256);
      arrLen[1] = (byte)(arr.Length / 256);

      if (null==pStm)
        return ;

      //save the array in the stream
      pStm.Write(arrLen, 2, new IntPtr(pcb));
      pStm.Write(arr, arr.Length, new IntPtr(pcb));
    }

    public void GetSizeMax(out long pcbSize) {
      Debug.WriteLine(@"QCTransferObject::GetSizeMax\n");
      byte [] arr = Codec.ObjectToByteArray(payload);
      //the total size is equal to the length of the string plus 2.
      pcbSize = arr.Length +2;
    }
    #endregion
  }

  // Utility class that serializes any Serializable 
  // object into a byte array and vice versa.
  public class Codec {
    // Convert an object to a byte array
    public static byte[] ObjectToByteArray(Object obj) {
      if(obj == null)
        return null;
      BinaryFormatter bf = new BinaryFormatter();
      MemoryStream ms = new MemoryStream();
      bf.Serialize(ms, obj);
      return ms.ToArray();
    }
    // Convert a byte array to an Object
    public static Object ByteArrayToObject(byte[] arrBytes) {
      MemoryStream memStream = new MemoryStream();
      BinaryFormatter binForm = new BinaryFormatter();

      memStream.Write(arrBytes, 0, arrBytes.Length);
      memStream.Seek(0, SeekOrigin.Begin);

      Object obj = (Object) binForm.Deserialize(memStream);

      return obj;
    }
  }
}

示例 COM+ 队列组件。

该示例模拟了一个运输部门,它接受一个 QCTransferObject,其中包含单个 OrderDTO 对象,该对象又包含聚合的 OrderLineDTO 对象。

OrderDTOOrderLineDTO 对象都很简单,是 [Serializable] 对象,如下所示。

  
// Order Domain Transfer Object. Serializable. Represents an Order, typically
// used between clients and (ordinary, synchronous) COM+ components.
[Serializable]
public class OrderDTO {
  private int m_Id;
  private ArrayList m_OrderLines;
  ...
  public override String ToString() {...}
}

// OrderLine Domain Transfer Object. Serializable.
// Represents an OrderLine, contained in OrderDTO, 
// as such indirectly
// used between clients and (ordinary, synchronous) 
// COM+ components.
[Serializable]
public class OrderLineDTO {
  private int m_ProdRef;
  private String m_ProdDesc;
  ...
  public override String ToString() {...}
}

队列组件 IShipOrderRequestHandler 只是调用接收到的对象的 ToString() 方法,并将结果输出到应用程序日志。(我对 .NET 跟踪功能还不熟悉 - 抱歉)。请注意,它是一个事务性组件,每次调用后都会提交([AutoComplete] 属性)。

using System;
using System.EnterpriseServices;
using System.Diagnostics;
using QCUtil;
using Orders;

// Make this a server COM+ application.
[assembly: ApplicationActivation(ActivationOption.Server)]
// Enable queueing and listener, only one 
// thread will be processing object messages.
[assembly: ApplicationQueuing(Enabled=true, 
    QueueListenerEnabled=true, MaxListenerThreads=1)]

namespace QCSolution
{
  // COM+ Queued Component Interface
  [InterfaceQueuing(Enabled=true)]
  public interface IShipOrderRequestHandler
  {
    // Non typed object transfer
    void shipOrderRequestGeneric(QCTransferObject to);
    // Typed object transfer, specific to Orders
    void shipOrderRequest(OrderTransfer ot);
  }

  // COM+ Queued Component Implementation
  [InterfaceQueuing(Interface = "IShipOrderRequestHandler")]
  [Transaction]
  public class ShipOrderRequestHandler : 
    ServicedComponent, IShipOrderRequestHandler
  {
    public ShipOrderRequestHandler() {}

    [AutoComplete]
    public void shipOrderRequestGeneric(QCTransferObject to) {
      EventLog.WriteEntry("ShippingApp", 
       "QCTranserferObject received, ToString="+
       to.ToString(),EventLogEntryType.Information);
    }
    [AutoComplete]
    public void shipOrderRequest(OrderTransfer ot) {
      EventLog.WriteEntry("ShippingApp", "OrderTransfer received, order="
       +ot.orderDto.ToString(),EventLogEntryType.Information);
    }
  }
}

有两个接口方法,第二个方法是类型化的,稍后会进行解释。

最后,我们有一个客户端应用程序,它绑定到队列并向队列组件发送一个包含一些聚合 OrderLineDTO 对象的 OrderDTO。一个控制台测试应用程序就是这样做的。

  
class Tester
{
  public static OrderDTO helperCreateDto(int id) {
    OrderDTO order = new OrderDTO(id);
    order.ShipTo = "test to";
    order.AddOrderLine(new OrderLineDTO(3,"Tandpasta",10,5));
    order.AddOrderLine(new OrderLineDTO(1,"Floss draad",7,2));
    return order;
  }
  [STAThread]
  static void Main(string[] args)
  {
    IShipOrderRequestHandler iHandler 
      = (IShipOrderRequestHandler)Marshal.BindToMoniker(
       "queue:/new:QCSolution.ShipOrderRequestHandler");
    QCTransferObject to = new QCTransferObject();
    OrderDTO orderDto = helperCreateDto(1);
    to.payload = orderDto;
    iHandler.shipOrderRequestGeneric(to);
      
    Marshal.ReleaseComObject(iHandler);
  }
}

构建和运行应用程序。

涉及的步骤相当多。也许有更快捷的方法,但至少这种方法是有效的。

用于构建

  1. 将解决方案加载到 Visual Studio .NET 2003 中。
  2. 确保允许使用不安全代码。
  3. 重新生成项目。

然后,用于运行

  1. 使用“组件服务”,定义一个新的 COM+ **服务器应用程序**,名为 QCSolution
  2. 按如下方式配置其队列。
  3. 在“计算机管理”中检查队列是否已创建。
  4. 使用“组件服务”,通过指向 QCSolution.dll 文件,将新组件添加到 QCSolution。
  5. 在命令窗口中,将目录更改到 dll 所在的位置并执行
    gacutil -i QCSolution.dll 注册该组件,以便其他组件(如独立的 OrderManagement 组件)可以使用 Shipping 服务和 QCTransferObject 类。
  6. 通过执行 Tester 类来测试应用程序。

客户端成功执行后,检查“计算机管理”中的应用程序日志。其中一个事件应该显示此内容:

如果您没有看到此消息,请检查队列。其中一个重试队列将包含您的消息,该消息最终会进入死信队列。

在重新构建和重试时,我注意到了一些问题。请确保 QCSolution COM+ 应用程序已停止运行。在再次注册之前,使用 gacutil -u QCSolution 注销有问题的实现。有时甚至需要删除组件 QCSolution.ShipOrderRequestHandler 并重新注册它。

一个类型化的传输对象。

void shipOrderRequestGeneric(QCTransferObject to) 操作不是自文档化的,因为你需要查阅文档(大多数时候是代码 ;-))才能知道此操作仅适用于 OrderDTO 对象。实际上,情况更糟,因为你可以将任何类型的对象包装在 QCTransferObject 中发送到队列组件,这可能导致运行时意外或错误的结果。

一种解决方案是按如下方式更改接口:
void shipOrderRequest(OrderTransfer to)
并提供一个 OrderTransfer 的实现,该实现仅包装 OrderDTO 对象,并且在其他方面与 QCTransferObject 行为相同。

如下面的代码所示,这可以通过继承非常容易地完成。

// Transfer object, containing one OrderDTO 
// with aggregated OrderLineDTO objects,
// so it can be transferred to a COM+ queued component.
[Guid("12e0bcf4-21a5-4a47-bf02-c010cc2a30ba")]
public class OrderTransfer : QCTransferObject {
  public OrderTransfer() {}
    
  // This property makes it typed.
  public OrderDTO orderDto {
    get { return (OrderDTO)payload; }
    set { payload = value; }
  }

  // Every QC Transfer Object must have its own GUID. See also
  // [Guid] attribute higher up.
  private Guid m_Guid = new Guid(
   "12e0bcf4-21a5-4a47-bf02-c010cc2a30ba");
  protected override Guid guid {
    get { 
      Debug.WriteLine(@"OrderTransfer::guid="+m_Guid+"\n");
      return m_Guid; 
    }
  }
}

只需确保使用 uuidgen 为您自己的类生成唯一的 GUID。

© . All rights reserved.