65.9K
CodeProject 正在变化。 阅读更多。
Home

用于进程间同步和通信的 C# 框架

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (54投票s)

2004 年 8 月 3 日

16分钟阅读

viewsIcon

306758

downloadIcon

8158

如何共享资源并实现线程和进程之间丰富的消息/数据传递架构(SOA)

0. 引言

Microsoft 提供了 .NET Framework 中各种有用的线程同步原语,从监视器到读写锁。在进程间层面,缺少的是类似的机制,以及简单的消息传递机制,这些机制在服务(客户端/服务器或 SOA)和生产者/消费者模式下,在线程和进程层面都非常有用。我试图通过一个简单、自包含的框架来填补这一空白,该框架使用信号量、邮箱、内存映射文件、阻塞通道和简单的消息流控制器等结构,用于线程间和进程间的同步与通信(IPC)。本文提供的类集也作为一个库项目(开源,BSD 许可证)提供,托管在 www.cdrnet.net/projects/threadmsg/

此框架的雄心在于简短而简单

  1. 抽象:消息处理线程不应关心其消息是发送到下一个线程,还是通过 MSMQ 消息队列发送到另一台计算机。
  2. 简洁:只需一次方法调用即可将消息传递给其他进程。

注意:为节省空间,我删除了文章中代码示例的所有 XML 注释 - 如果您需要有关方法及其参数的更多详细信息,请查看随附的源代码。

1. 从一个示例开始

为了演示进程间消息传递可以多么简单,我将从一个小示例开始:一个控制台应用程序,可以根据命令行参数以读取器或写入器的身份启动。在写入器进程中,您可以输入一些文本并将其发送到邮箱(按回车键),读取器显示从邮箱接收到的所有消息。您可以启动任意数量的写入器和读取器,但每条消息只会显示在一个读取器上。

[Serializable]
struct Message
{
  public string Text;
}

class Test
{
  IMailBox mail;

  public Test()
  {
    mail = new ProcessMailBox("TMProcessTest",1024);
  }

  public void RunWriter()
  {
    Console.WriteLine("Writer started");
    Message msg;
    while(true)
    {
      msg.Text = Console.ReadLine();
      if(msg.Text.Equals("exit"))
        break;
      mail.Content = msg;
    }
  }

  public void RunReader()
  {
    Console.WriteLine("Reader started");
    while(true)
    {
      Message msg = (Message)mail.Content;
      Console.WriteLine(msg.Text);
    }
  }

  [STAThread]
  static void Main(string[] args)
  {
    Test test = new Test();
    if(args.Length > 0)
      test.RunWriter();
    else
      test.RunReader();
  }
}

一旦使用构造函数创建了邮箱(此处为 ProcessMailBox),接收消息就像获取 Content 属性一样容易,发送消息就像设置它一样容易。如果没有可用的数据,getter 会阻塞线程;如果已存在读者未请求的内容,setter 会阻塞。得益于这种阻塞,应用程序完全是中断驱动的,不会以任何方式占用 CPU(无轮询或忙等待)。只要它们用 SerializableAttribute 装饰,就可以允许任何类型的消息/对象。

然而,幕后发生的事情要复杂一些:消息通过进程间共享内存的少数几种方式之一进行传输:内存映射文件(MMF),在本例中是仅存在于系统页面文件中的虚拟文件。对该文件的访问使用两个 Win32 信号量进行同步。消息被二进制序列化写入文件,这就是为什么需要 SerializableAttribute。MMF 和 Win32 信号量都需要直接的 NT 内核系统调用,但由于 .NET Framework 提供的方便的 Marshal 类,因此不需要 unsafe 代码。本文稍后将讨论更多详细信息。

2. .NET 世界中的线程间和进程间同步

线程和进程之间的通信需要共享内存或用于将数据传入和传出进程/线程的内置机制。在共享内存的情况下,还需要一套同步原语来允许并发访问。

单个进程中的所有线程共享一个通用的逻辑地址空间(堆),但从 Windows 2000 开始,没有办法在进程之间共享内存。但是,进程可以读取和写入同一个文件,WinAPI 提供了各种系统调用来简化将文件映射到进程的地址空间以及使用仅作为内核对象(“节”)存在的虚拟文件,这些文件指向系统页面文件中的内存块。对于线程间共享堆和进程间共享文件,并发访问可能导致数据不一致。我们简要讨论了几种确保协作进程或线程有序执行并保持数据一致性的机制。

2.1 线程同步

.NET Framework 和 C# 使用 Monitorlock 语句提供了非常简单直接的线程同步机制(本文不讨论 .NET Framework 的 Mutex 类型)。无论本文提供其他什么原语,lock 都是推荐的线程同步方法。

void Work1()
{
  NonCriticalSection1();
  Monitor.Enter(this);
  try
  {
    CriticalSection();
  }
  finally
  {
    Monitor.Exit(this);
  }
  NonCriticalSection2();
}
void Work2()
{
  NonCriticalSection1();
  lock(this)
  {
    CriticalSection();
  }
  NonCriticalSection2();
}

Work1Work2 是等效的。在 C# 中,第二个示例更受欢迎,因为它更短且不易出错。

2.2 线程间信号量

经典的同步原语之一(由 Edsger Dijkstra引入)是计数信号量。信号量是带有计数器和两个操作的对象:Acquire(也称为 'P' 或 'Wait')和 Release(也称为 'V' 或 'Signal')。信号量在 Acquire 时递减计数器并阻塞(可选超时),如果计数器为零(在递减之前),而在 Release 时递增计数器而不阻塞。虽然信号量是简单的结构,但实现起来有些棘手。幸运的是,内置 Monitor 的阻塞行为会有所帮助。

public sealed class ThreadSemaphore : ISemaphore
{
  private int counter;
  private readonly int max;

  public ThreadSemaphore() : this(0, int.Max) {}
  public ThreadSemaphore(int initial) : this(initial, int.Max) {}
  public ThreadSemaphore(int initial, int max)
  {
    this.counter = Math.Min(initial,max);
    this.max = max;
  }

  public void Acquire()
  {
    lock(this)
    {
      counter--;
      if(counter < 0 && !Monitor.Wait(this))
        throw new SemaphoreFailedException();
    }
  }

  public void Acquire(TimeSpan timeout)
  {
    lock(this)
    {
      counter--;
      if(counter < 0 && !Monitor.Wait(this,timeout))
        throw new SemaphoreFailedException();
    }
  }

  public void Release()
  {
    lock(this)
    {
      if(counter >= max)
        throw new SemaphoreFailedException();
      if(counter < 0)
        Monitor.Pulse(this);
      counter++;
    }
  }
}

信号量对于更复杂的阻塞场景很有用,例如我们稍后将讨论的通道。您也可以使用信号量来互斥锁定关键部分(Work3,见下文),但强烈建议使用上面 Work2 中所示的内置 lock 关键字。

请注意,计数信号量如果使用不当,可能会成为危险的对象。只要遵循一个基本规则,您就处于安全的一方:切勿在 Acquire 失败时调用 Release,而是在成功后调用它,无论其后发生什么。Work3 中的 finally 语句是强制执行此规则的一种方法,但请注意,Acquire 调用必须在 try 语句之外,因为如果在 Acquire 失败时不能调用 Release

ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
  NonCriticalSection1();
  s.Acquire();
  try
  {
    CriticalSection();
  }
  finally
  {
    s.Release();
  }
  NonCriticalSection2();
}

2.3 进程间信号量

为了同步进程间的资源访问,我们需要进程级别的前述原语。不幸的是,.NET Framework 中没有进程级别的 Monitor 类。但是,Win32 API 提供了信号量内核对象,可用于同步进程间的访问。Robin Galloway-Lunn 在《Using Win32 Semaphores in C#》中介绍了如何将 Win32 信号量映射到 .NET 世界。我们的实现看起来相似。

[DllImport("kernel32",EntryPoint="CreateSemaphore",
     SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
  SecurityAttributes auth, int initialCount,
    int maximumCount, string name);

[DllImport("kernel32",EntryPoint="WaitForSingleObject",
 SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
 uint hHandle, uint dwMilliseconds);

[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
 SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
  uint hHandle, int lReleaseCount, out int lpPreviousCount);
    
[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
  CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);
public class ProcessSemaphore : ISemaphore, IDisposable
{
  private uint handle;
  private readonly uint interruptReactionTime;

  public ProcessSemaphore(string name) : this(
   name,0,int.MaxValue,500) {}
  public ProcessSemaphore(string name, int initial) : this(
   name,initial,int.MaxValue,500) {}
  public ProcessSemaphore(string name, int initial,
   int max, int interruptReactionTime)
  {       
    this.interruptReactionTime = (uint)interruptReactionTime;
    this.handle = NTKernel.CreateSemaphore(null, initial, max, name);
    if(handle == 0)
      throw new SemaphoreFailedException();
  }

  public void Acquire()
  {
    while(true)
    { //looped 0.5s timeout to make NT-blocked threads interruptable.
      uint res = NTKernel.WaitForSingleObject(handle, 
       interruptReactionTime);
      try {System.Threading.Thread.Sleep(0);} 
      catch(System.Threading.ThreadInterruptedException e)
      {
        if(res == 0)
        { //Rollback 
          int previousCount;
          NTKernel.ReleaseSemaphore(handle,1,out previousCount);
        }
        throw e;
      }
      if(res == 0)
        return;
      if(res != 258)
        throw new SemaphoreFailedException();
    }
  }

  public void Acquire(TimeSpan timeout)
  {
    uint milliseconds = (uint)timeout.TotalMilliseconds;
    if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
      throw new SemaphoreFailedException();  
  }

  public void Release()
  {
    int previousCount;
    if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))
      throw new SemaphoreFailedException();  
  }

  #region IDisposable Member
  public void Dispose()
  {
    if(handle != 0)
    {
      if(NTKernel.CloseHandle(handle))
        handle = 0;
    }
  }
  #endregion
}

重要的是该信号量被命名。这使得其他进程可以通过输入相同的名称来创建同一个信号量的句柄。为了使阻塞线程可中断,我们使用超时和 Sleep(0) 进行(脏)的变通。我们需要中断支持来安全地关闭线程。但是,建议在没有更多线程被阻塞之前释放信号量,以允许应用程序干净退出。

您可能还注意到,线程间和进程间信号量共享相同的接口。这种模式在所有类上都实现了,从而实现了引言中提到的抽象。但请注意,为了性能起见,您不应将进程间实现用于线程间场景,也不应将线程间实现用于单线程场景。

3. 进程间共享内存:内存映射文件

我们已经看到了如何同步线程和进程的共享资源访问。对于传递消息,缺少的是共享资源本身。对于线程来说,这就像声明一个类成员变量一样简单,但对于进程,我们需要一种称为内存映射文件(MMF)的技术,该技术由 Win32 API 提供。使用 MMF 的难度并不比使用上面讨论的 Win32 信号量大多少。我们首先需要一个此类映射文件的句柄,使用 CreateFileMapping 系统调用。

[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",
     SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr CreateFileMapping(uint hFile, 
 SecurityAttributes lpAttributes, uint flProtect,
  uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);
    
[DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",
 SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, 
  uint dwDesiredAccess, uint dwFileOffsetHigh,
  uint dwFileOffsetLow, uint dwNumberOfBytesToMap);
    
[DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",
 SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name, 
     FileAccess access, int size)
{
  if(size < 0)
    throw new ArgumentException("Size must not be negative","size");

  IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,
   (uint)access,0,(uint)size,name);
  if(fileMapping == IntPtr.Zero)
    throw new MemoryMappingFailedException();

  return new MemoryMappedFile(fileMapping,size,access);
}

我们更喜欢直接在系统页面文件中的虚拟文件,因此我们提供 -1 (0xFFFFFFFF) 作为文件句柄来创建我们的映射文件句柄。我们还指定了所需的字节大小和名称,以允许其他进程同时访问同一个文件。拥有这样的文件后,我们可以将该文件的多个部分(以字节为单位的偏移量和大小指定)映射到我们的本地地址空间。我们使用 MapViewOfFile 系统调用来执行此操作。

public MemoryMappedFileView CreateView(int offset, int size,
      MemoryMappedFileView.ViewAccess access)
{
  if(this.access == FileAccess.ReadOnly && access == 
    MemoryMappedFileView.ViewAccess.ReadWrite)
    throw new ArgumentException(
     "Only read access to views allowed on files without write access",
     "access");
  if(offset < 0)
    throw new ArgumentException("Offset must not be negative","size");
  if(size < 0)
    throw new ArgumentException("Size must not be negative","size");
  IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
   (uint)access,0,(uint)offset,(uint)size);
  return new MemoryMappedFileView(mappedView,size,access);
}

在 unsafe 代码中,我们可以直接获取返回的指针(mappedView)并将其转换为我们的数据结构。尽管如此,由于我们不希望使用 unsafe 代码,因此我们使用 Marshal 类向其读取和写入字节和整数。offset 参数用于指定相对于视图偏移量开始读取或写入映射文件的位置。

public byte ReadByte(int offset)
{
  return Marshal.ReadByte(mappedView,offset);
}
public void WriteByte(byte data, int offset)
{
  Marshal.WriteByte(mappedView,offset,data);
}

public int ReadInt32(int offset)
{
  return Marshal.ReadInt32(mappedView,offset);
}
public void WriteInt32(int data, int offset)
{
  Marshal.WriteInt32(mappedView,offset,data);
}
public void ReadBytes(byte[] data, int offset)
{
  for(int i=0;i<data.Length;i++)
    data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public void WriteBytes(byte[] data, int offset)
{
  for(int i=0;i<data.Length;i++)
    Marshal.WriteByte(mappedView,offset+i,data[i]);
}

但是,我们希望将整个对象树写入文件并从文件中读取,因此我们需要更高级的访问器,并支持自动二进制序列化。

public object ReadDeserialize(int offset, int length)
{
  byte[] binaryData = new byte[length];
  ReadBytes(binaryData,offset);
  System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
    = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
  System.IO.MemoryStream ms = new System.IO.MemoryStream(
   binaryData,0,length,true,true);
  object data = formatter.Deserialize(ms);
  ms.Close();
  return data;
}
public void WriteSerialize(object data, int offset, int length)
{
  System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
    = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
  byte[] binaryData = new byte[length];
  System.IO.MemoryStream ms = new System.IO.MemoryStream(
   binaryData,0,length,true,true);
  formatter.Serialize(ms,data);
  ms.Flush();
  ms.Close();
  WriteBytes(binaryData,offset);
}

请注意,对象的序列化大小不应超过视图大小。序列化大小始终大于对象本身的大小。我没有尝试直接将内存流绑定到映射视图而不是字节数组,但这应该有效,甚至可能带来轻微的性能提升。

4. 邮箱:在线程和进程之间传递消息

邮箱与电子邮件或 NT Mailslots 无关。它是一种安全的共享内存结构,只能保存一个对象。内容通过属性进行读写。如果邮箱不包含对象,读取内容的线程将阻塞,直到另一个线程写入某些内容。如果它已经包含内容,试图写入它的线程将阻塞,直到另一个线程首先读取内容。内容只能读取一次——读取后其引用会自动删除。我们已经开发了构建此类邮箱所需的所有内容。

4.1 线程间邮箱

构建邮箱非常简单,使用两个信号量:一个在邮箱为空时发出信号,另一个在邮箱已满时发出信号。要从邮箱读取,首先等待直到邮箱已满,然后在读取后发出空信号量。要写入,需要等待直到邮箱为空,然后在写入后发出满信号量。请注意,空信号量在开始时发出信号。

public sealed class ThreadMailBox : IMailBox
{
  private object content;
  private ThreadSemaphore empty, full;

  public ThreadMailBox()
  {
    empty = new ThreadSemaphore(1,1);
    full = new ThreadSemaphore(0,1);
  }

  public object Content
  {
    get
    {
      full.Acquire();
      object item = content;
      empty.Release();
      return item;
    }
    set 
    {
      empty.Acquire();
      content = value;
      full.Release();
    }
  }
}

4.2 进程间邮箱

进程间版本几乎和线程间实现一样简单。唯一的区别是我们现在使用进程间信号量,并且我们读取和写入内存映射文件而不是简单的类成员变量。由于序列化可能会失败,我们提供了一个小的回滚异常处理程序来撤销对邮箱状态所做的任何更改。有许多可能的错误源(无效句柄、访问被拒绝、文件大小、缺少 SerializableAttribute…)。

public sealed class ProcessMailBox : IMailBox, IDisposable
{
  private MemoryMappedFile file;
  private MemoryMappedFileView view;
  private ProcessSemaphore empty, full;

  public ProcessMailBox(string name,int size)
  {
    empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);
    full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);
    file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",
      MemoryMappedFile.FileAccess.ReadWrite,size);
    view = file.CreateView(0,size,
     MemoryMappedFileView.ViewAccess.ReadWrite);
  }

  public object Content
  {
    get
    {
      full.Acquire();
      object item;
      try {item = view.ReadDeserialize();}
      catch(Exception e)
      {  //Rollback
        full.Release();
        throw e;
      }
      empty.Release();
      return item;
    }

    set 
    {
      empty.Acquire();
      try {view.WriteSerialize(value);}
      catch(Exception e)
      {  //Rollback
        empty.Release();
        throw e;
      }
      full.Release();
    }
  }

  #region IDisposable Member
  public void Dispose()
  {
    view.Dispose();
    file.Dispose();
    empty.Dispose();
    full.Dispose();
  }
  #endregion
}

现在我们拥有了文章开头 IPC 消息传递示例所需的所有工具。您可能希望滚动回示例,因为它演示了如何使用 ProcessMailBox

5. 通道:排队消息传输

邮箱的一个重要问题是它们一次只能容纳一个对象。如果长处理链(通过邮箱连接)中的某个工作者对某个特定命令的处理时间比平时长一点,整个链条会立即被阻塞。通常,具有缓冲消息传递通道更有利,您可以在有时间时提取传入消息,而不会阻塞发送者。通道提供这种缓冲,是比简单邮箱更好的替代方案。同样,我们将讨论线程间和进程间的实现。

5.1 可靠性

邮箱和通道之间的另一个重要区别是通道具有一些可靠性功能,例如自动将发送失败的消息(因为在等待锁时线程被中断)转储到内部转储容器。这意味着通道处理线程可以安全关闭而不会丢失任何消息。这由两个抽象类 ThreadReliabilityProcessReliability 维护——每个通道实现都派生自其中之一。

5.2 线程间通道

线程间通道基于邮箱,但使用同步队列而不是变量作为消息缓冲区。得益于计数信号量模型,通道在队列为空时阻塞接收,在队列已满时阻塞发送。您不会遇到任何入队/出队失败。我们通过初始化空信号量为通道大小,满信号量为零来实现这一点。如果发送消息的线程在被阻塞在空信号量时被中断,我们会将消息复制到转储容器,并让异常传播。在接收方法中不需要转储,因为您在被中断时不会丢失任何消息。请注意,线程只能在被阻塞时(即调用信号量的 Acquire() 时)被中断。

public sealed class ThreadChannel : ThreadReliability, IChannel
{
  private Queue queue;
  private ThreadSemaphore empty, full;

  public ThreadChannel(int size)
  {
    queue = Queue.Synchronized(new Queue(size));
    empty = new ThreadSemaphore(size,size);
    full = new ThreadSemaphore(0,size);
  }

  public void Send(object item)
  {
    try {empty.Acquire();}
    catch(System.Threading.ThreadInterruptedException e)
    {
      DumpItem(item);
      throw e;
    }
    queue.Enqueue(item);
    full.Release();
  }

  public void Send(object item, TimeSpan timeout)
  {
    try {empty.Acquire(timeout);}
    ...
  }

  public object Receive()
  {
    full.Acquire();
    object item = queue.Dequeue();
    empty.Release();
    return item;
  }

  public object Receive(TimeSpan timeout)
  {
    full.Acquire(timeout);
    ...
  }
  
  protected override void DumpStructure()
  {
    lock(queue.SyncRoot)
    {
      foreach(object item in queue)
        DumpItem(item);
      queue.Clear();
    }
  }
}

5.3 进程间通道

构建进程间通道有点困难,因为您首先需要一种提供缓冲区的方法。可能的解决方案是使用进程间邮箱,并根据所需行为排队发送或接收方法。为了避免此解决方案的几个缺点,我们将直接在内存映射文件中实现一个队列。第一个类 MemoryMappedArray 将文件分成多个可以通过索引访问的块。第二个类 MemoryMappedQueue 在该数组周围构建一个经典的环形缓冲区(有关更多详细信息,请参阅随附的源代码)。为了允许直接字节/整数访问和二进制序列化,首先调用 Enqueue/Dequeue,然后根据需要使用读/写方法(队列会自动将它们放在正确的位置)。数组和队列实现都不是线程安全或进程安全的,因此我们必须使用模拟互斥量的进程间信号量来控制互斥访问(我们也可以封装 Win32 Mutex)。除了这两个类之外,通道实现与进程间邮箱基本相同。同样,我们必须在 Send() 方法中处理线程中断以及可能的序列化失败。

public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable
{
  private MemoryMappedFile file;
  private MemoryMappedFileView view;
  private MemoryMappedQueue queue;
  private ProcessSemaphore empty, full, mutex;

  public ProcessChannel( int size, string name, int maxBytesPerEntry)
  {
    int fileSize = 64+size*maxBytesPerEntry;

    empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);
    full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);
    mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);
    file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",
      MemoryMappedFile.FileAccess.ReadWrite,fileSize);
    view = file.CreateView(0,fileSize,
     MemoryMappedFileView.ViewAccess.ReadWrite);
    queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);
    if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
      throw new MemoryMappedArrayFailedException();
  }

  public void Send(object item)
  {
    try {empty.Acquire();}
    catch(System.Threading.ThreadInterruptedException e)
    {
      DumpItemSynchronized(item);
      throw e;
    }
    try {mutex.Acquire();}
    catch(System.Threading.ThreadInterruptedException e)
    {
      DumpItemSynchronized(item);
      empty.Release();
      throw e;
    }
    queue.Enqueue();
    try {queue.WriteSerialize(item,0);}
    catch(Exception e)
    {
      queue.RollbackEnqueue();
      mutex.Release();
      empty.Release();
      throw e;
    }
    mutex.Release();
    full.Release();
  }

  public void Send(object item, TimeSpan timeout)
  {
    try {empty.Acquire(timeout);}
    ...
  }

  public object Receive()
  {
    full.Acquire();
    mutex.Acquire();
    object item;
    queue.Dequeue();
    try {item = queue.ReadDeserialize(0);}
    catch(Exception e)
    {
      queue.RollbackDequeue();
      mutex.Release();
      full.Release();
      throw e;
    }
    mutex.Release();
    empty.Release();
    return item;
  }

  public object Receive(TimeSpan timeout)
  {
    full.Acquire(timeout);
    ...
  }
  
  protected override void DumpStructure()
  {
    mutex.Acquire();
    byte[][] dmp = queue.DumpClearAll();
    for(int i=0;i<dmp.Length;i++)
      DumpItemSynchronized(dmp[i]);
    mutex.Release();
  }

  #region IDisposable Member
  public void Dispose()
  {
    view.Dispose();
    file.Dispose();
    empty.Dispose();
    full.Dispose();
    mutex.Dispose();
  }
  #endregion
}

6. 消息路由

我们讨论了如何同步资源访问以及如何使用邮箱或通道在线程和进程之间传递消息。在使用阻塞通道时,您可能会遇到问题,例如当您需要在同一线程中监听多个通道时。为了解决这些情况,有一些小型类模块可用:通道转发器、多路复用器和解复用器以及通道事件网关。您可以使用由两个抽象类 SingleRunnableMultiRunnable 提供的简单 IRunnable 模式来定义您自己的通道处理器(有关更多详细信息,请参阅随附的源代码)。

6.1 通道转发器

通道转发器所做的只是监听一个通道并将接收到的消息转发到另一个通道。如果需要,转发器可以将每条接收到的消息放入一个用常量数字标记的信封中,然后再转发(此功能用于多路复用器,见下文)。

public class ChannelForwarder : SingleRunnable
{
  private IChannel source, target;
  private readonly int envelope;

  public ChannelForwarder(IChannel source, 
   IChannel target, bool autoStart, bool waitOnStop)
    : base(true,autoStart,waitOnStop)
  {
    this.source = source;
    this.target = target;
    this.envelope = -1;
  }
  public ChannelForwarder(IChannel source, IChannel target, 
   int envelope, bool autoStart, bool waitOnStop)
    : base(true,autoStart,waitOnStop)
  {
    this.source = source;
    this.target = target;
    this.envelope = envelope;
  } 

  protected override void Run()
  {  //NOTE: IChannel.Send is interrupt save and 
     //automatically dumps the argument. 
    if(envelope == -1)
      while(running)
        target.Send(source.Receive());
    else
    {
      MessageEnvelope env;
      env.ID = envelope;
      while(running)
      {
        env.Message = source.Receive();
        target.Send(env);
      }
    }
  }
}

6.2 通道多路复用器和解复用器

多路复用器监听多个输入通道,并将每条接收到的消息(包含一个信封以识别输入通道)转发到一个公共输出通道。这可用于同时监听多个通道。另一方面,解复用器监听一个公共输入通道,并根据消息信封将其转发到一个或多个输出通道。

public class ChannelMultiplexer : MultiRunnable
{
  private ChannelForwarder[] forwarders;

  public ChannelMultiplexer(IChannel[] channels, int[] ids, 
    IChannel output, bool autoStart, bool waitOnStop)
  {
    int count = channels.Length;
    if(count != ids.Length)
      throw new ArgumentException("Channel and ID count mismatch.","ids");

    forwarders = new ChannelForwarder[count];
    for(int i=0;i<count;i++)
      forwarders[i] = new ChannelForwarder(channels[i],
       output,ids[i],autoStart,waitOnStop);

    SetRunnables((SingleRunnable[])forwarders);
  }
}
public class ChannelDemultiplexer : SingleRunnable
{
  private HybridDictionary dictionary;
  private IChannel input;

  public ChannelDemultiplexer(IChannel[] channels, int[] ids, 
    IChannel input, bool autoStart, bool waitOnStop)
    : base(true,autoStart,waitOnStop)
  {
    this.input = input;

    int count = channels.Length;
    if(count != ids.Length)
      throw new ArgumentException("Channel and ID count mismatch.","ids");

    dictionary = new HybridDictionary(count,true);
    for(int i=0;i<count;i++)
      dictionary.add(ids[i],channels[i]);
  }

  protected override void Run()
  {  //NOTE: IChannel.Send is interrupt save and 
     //automatically dumps the argument.
    while(running)
    {
      MessageEnvelope env = (MessageEnvelope)input.Receive();
      IChannel channel = (IChannel)dictionary[env.ID];
      channel.send(env.Message);
    }
  }
}

6.3 通道事件网关

通道事件网关从通道接收消息,并为每条接收到的消息触发一个事件。此类可能对事件驱动的应用程序(如 GUI)很有用,或者使用系统 ThreadPool 初始化次要活动。但请记住,对于 WinForms,您不能直接从事件处理程序访问任何 UI 控件,而必须使用 Invoke(),因为处理程序在网关线程中执行,而不是在 UI 线程中执行。

public class ChannelEventGateway : SingleRunnable
{
  private IChannel source;
  public event MessageReceivedEventHandler MessageReceived;

  public ChannelEventGateway(IChannel source, bool autoStart,
   bool waitOnStop) : base(true,autoStart,waitOnStop)
  {
    this.source = source;
  }
  
  protected override void Run()
  {
    while(running)
    {
      object c = source.Receive();
      MessageReceivedEventHandler handler = MessageReceived;
      if(handler != null)
        handler(this,new MessageReceivedEventArgs(c));
    }
  }
}

7. 披萨外卖店演示

就是这样;我们讨论了框架最重要的结构和技术(本文忽略了 Rendezvous 和 Barrier 实现等其他类)。我们以开始文章的方式结束本文:通过演示。这次我们来看一个小型披萨外卖店模拟。本页顶部的屏幕截图显示了此模拟的运行情况:四个并行进程相互通信。下图显示了数据/消息如何使用进程间通道在四个进程之间流动,以及在进程内部使用更快的线程间通道和邮箱。

为了启动,顾客点了一个披萨和一杯饮料。他通过客户界面中的一个方法调用来完成此操作,该方法将 Order 消息发布到 CustomerOrders 通道。订单接收者(始终等待新订单)将两个厨师指令(一个用于披萨,一个用于饮料)发布到 CookInstruction 通道。同时,他通过 CashierOrder 队列将订单转发给收银员。收银员向定价服务器询问适当的价格,并向客户发送 Invoice,希望快速获得 Payment 回复。与此同时,厨师会注意到厨师指令,并将成品发送给打包员。打包员等待订单完成,然后将包裹转发给客户。

要运行此演示,请打开 4 个命令 shell(cmd.exe),使用“PizzaDemo.exe cook”启动任意数量的厨师,使用“PizzaDemo.exe backend”启动后端,以及使用“PizzaDemo.exe facade”启动具有客户界面的外观进程(将“PizzaDemo”替换为您的程序集名称)。请注意,一些线程(如厨师)每次会睡眠几秒钟以增加真实感。按回车键停止并退出进程。如果在操作过程中按 Enter 键,您会发现在最后转储报告中会转储一些消息。在实际应用程序中,转储容器会存储到磁盘。

该演示使用了本文介绍的几种机制。例如,收银员加载了一个 ChannelMultiplexer 来监听客户付款和订单,并为定价服务使用了两个邮箱。装运网关是 ChannelEventGateway,允许客户在食品包准备好后立即通过事件获得通知。所有进程也应该可以作为 Windows NT 服务和终端服务器安装程序运行。

8. 结论

我们讨论了如何构建面向服务的架构以及如何在 C# 中实现进程间通信和同步。但是,这并非解决此类问题的唯一方法。例如,使用如此多的线程在更大的项目中可能是一个严重的问题。完全缺少事务支持和替代通道/邮箱实现,如命名管道和 TCP 套接字。架构中可能还存在一些缺陷,请告知我。

9. 参考文献

10. 历史

  • 2004 年 7 月 28 日:初版文章
  • 2004 年 8 月 4 日:ThreadSemaphore 现在允许检查非法释放。如果检测到此类释放,则会抛出通常表示严重(但有时难以察觉)编程错误的异常。本文中的所有结构现在都使用检查。我还添加了一个小注释在第 2.2 节,以指出正确的信号量处理,并更新了两个下载。
© . All rights reserved.