65.9K
CodeProject 正在变化。 阅读更多。
Home

并行容器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.62/5 (10投票s)

2011年5月14日

CPOL

5分钟阅读

viewsIcon

32783

一篇描述并发集合的文章。

入门概念

在编程中,队列是一种先进先出(First-In, First-Out)的数据结构。队列用于存放诸如系统中当前正在执行的进程、待处理的数据库事务列表或通过互联网 TCP/IP 网络连接传输的数据包等。生产者/消费者场景常常需要阻塞队列和有界队列。这些队列会在消费者从空队列中出队时阻塞,并在生产者向满队列中入队时阻塞。举个粗略的例子,当一个运行缓慢的硬件设备必须执行一个漫长而耗时的 I/O 例程时,其他操作会被暂停,并被放入一个编织状的等待列表中。为了防止 I/O 冲突,必须协调任务。一组任务(生产者)创建被另一组任务(消费者)使用的数据项。

BlockingCollection<T>

从生产者到消费者的工作流由一个集合来协调,通常是一个队列:生产者将工作项放入集合,以便消费者可以移除并处理它们。在任何给定时刻,集合的内容代表着待处理的工作项。也就是说,那些已经被生产但尚未被消费的项。使用同步原语,生产者可以在有可处理的工作项时通知消费者。集合和原语的组合能够将项的生产与消费解耦。其思想是根据生产或消耗一个项所花费的相对时间来改变生产者和消费者任务的比例。如果比例很高,可以将情况扩展为多阶段,形成一个管道。如果消费滞后,我们可以使用待处理项的数量来调节生产。我们可以使用集合来平滑处理方程一侧或另一侧的峰谷效应。.NET 并行扩展提供了一个名为 BlockingCollection<t> 的集合,它支持阻塞队列和有界队列。

public class BlockingCollection<T> : IEnumerable<T>, ICollection,
             IEnumerable, IDisposable {
    public BlockingCollection(IProducerConsumerCollection<T> collection,
    Int32 boundedCapacity);
    public void Add(T item);
    public Boolean TryAdd(T item, Int32 msTimeout, CancellationToken cancellationToken);
    public void CompleteAdding();
    public T Take();
    public Boolean TryTake(out T item, Int32 msTimeout, CancellationToken cancellationToken);
    public Int32 BoundedCapacity { get; }
    public Int32 Count { get; }
    public Boolean IsAddingCompleted { get; } // true if AddingComplete is called
    public Boolean IsCompleted { get; } // true if IsAddingComplete and Count==0
    public IEnumerable<t> GetConsumingEnumerable(CancellationToken cancellationToken);
    public void CopyTo(T[] array, int index);
    public T[] ToArray();
    public void Dispose();
}

在建立并行模式之前,让我们先检查一下这段代码

using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;

public static class Program  {
    public static void Main() {
      
     var bl = new BlockingCollection<Int32>(new ConcurrentQueue<Int32>());

        // A thread pool thread will do the consuming
        ThreadPool.QueueUserWorkItem(ConsumeItems, bl);

        // Add 5 items to the collection
        for (Int32 item = 0; item < 5; item++) {
            Console.WriteLine("Producing: " + item);
            bl.Add(item);
        }

        // Tell the consuming thread(s) that no more
        // items will be added to the collection
        bl.CompleteAdding();

        Console.ReadLine();  
    }

    private static void ConsumeItems(Object o) {
        var bl = (BlockingCollection<Int32>)o;

        // Block until an item shows up, then process it
        foreach (var item in bl.GetConsumingEnumerable()) {
            Console.WriteLine("Consuming: " + item);
        }

        // The collection is empty and no more items are going into it
        Console.WriteLine("All items have been consumed");
        Console.WriteLine("Press <enter> to finish...");   
    }
}

编译这段代码会得到以下结果

Producing: 0
Producing: 1
Producing: 2
Producing: 3
Producing: 4
Consuming: 0
Consuming: 1
Consuming: 2
Consuming: 3
Consuming: 4
All items have been consumed
Press <enter> to finish...

下面的代码首先创建了两个小型类:BankAccount 类和 Deposit 类。在程序的 main 方法中,我们实例化了一个 BlockingCollection。接着,我们创建并启动生产者,它们将生成存款并将它们放入集合中。在存款存入账户后,我们还创建了一个多对一的延续,它将向消费者发出生产结束的信号。生产停止后,我们创建银行账户,然后创建消费者(它将根据存款更新余额)。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class BankAccount {
        public Int32 Balance  { get;  set; }
        }

 public  class Deposit {
        public Int32 Amount { get; set; }
    }

 public class Program {

   public  static void Main() {
        BlockingCollection<Deposit> blockingCollection
                = new BlockingCollection<Deposit>();

            Task[] producers = new Task[3];
            for (Int32 i = 0; i < 3; i++) {
                producers[i] = Task.Factory.StartNew(() => {
                    // create a series of deposits
                    for (Int32 j = 0; j < 20; j++) {
                        // create the transfer
                        Deposit deposit = new Deposit { Amount = 100 };
                        // place the transfer in the collection
                        blockingCollection.Add(deposit);
                    }
                });
            };

            Task.Factory.ContinueWhenAll(producers, antecedents => {
                // signal that production has ended
                Console.WriteLine("Signaling production end");
                blockingCollection.CompleteAdding();
            });
           
            BankAccount account = new BankAccount();

            Task consumer = Task.Factory.StartNew(() => {
                while (!blockingCollection.IsCompleted) {
                    Deposit deposit;
                    // try to take the next item 
                    if (blockingCollection.TryTake(out deposit)) {
                        // update the balance with the transfer amount
                        account.Balance += deposit.Amount;
                    }
                }
                // print out the final balance
                Console.WriteLine("Final Balance: {0}", account.Balance);
            });

            // wait for the consumer to finish
            consumer.Wait();

            // wait for input before exiting
            Console.WriteLine("Press <Enter> to finish");
            Console.ReadLine();
        }
    }

编译这段代码会得到以下结果

Signaling production end
Final Balance: 6000
Press <Enter> to finish

现在让我们检查一些非阻塞集合。

ConcurrentQueue<T>:一个非阻塞集合

ConcurrentQueue 类实现了一个先进先出(FIFO)队列,这意味着当你从队列中取出项时,它们的顺序与添加的顺序相同。要将项放入 ConcurrentQueue,请调用 Enqueue() 方法。要取出队列中的第一个项,请调用 TryDequeue();要获取队列中的第一个项而不将其取出,请调用 TryPeek()TryDequeue()TryPeek() 接受一个由 out 关键字修饰的集合类型参数,并返回一个 bool 值。如果结果为 true,则参数将包含数据项。如果为 false,则无法获取任何数据项。

using System;;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {

   public static void Main() {

        // create a shared collection 
        ConcurrentQueue<Int32> sharedQueue = new ConcurrentQueue<Int32>();

        // populate the collection with items to process
        for (Int32 i = 0; i < 1000; i++) {
            sharedQueue.Enqueue(i);
        }

        // define a counter for the number of processed items
        Int32 itemCount = 0;

        // create tasks to process the list
        Task[] tasks = new Task[10];
        for (Int32 i = 0; i < tasks.Length; i++) {
            // create the new task
            tasks[i] = new Task(() => {

                while (sharedQueue.Count > 0) {
                    // define a variable for the dequeue requests
                    Int32 queueElement;
                    // take an item from the queue
                    bool gotElement = sharedQueue.TryDequeue(out queueElement);
                    // increment the count of items processed
                    if (gotElement) {
                        Interlocked.Increment(ref itemCount);
                    }
                }

            });
            // start the new task
            tasks[i].Start();
        }

        // wait for the tasks to complete
        Task.WaitAll(tasks);

        // report on the number of items processed
        Console.WriteLine("Items processed: {0}", itemCount);

        // wait for input before exiting
        Console.WriteLine("Press <Enter> to finish....");
        Console.ReadLine();
    }
}

产生以下结果

Items processed: 1000
Press <Enter> to finish....

ConcurrentStack<T>:同样是非阻塞的

System.Collections.Concurrent.ConcurrentStack 类实现了一个后进先出(LIFO)队列,从队列中取出一个项会返回最近添加的项。使用 Push()PushRange() 方法将项添加到堆栈,使用 TryPeek()TryPop()TryPopRange() 方法进行检查和检索。这个类的设计几乎与队列数据类型等效。您可以使用 Push 从堆栈的头部取走元素。如前所述,有一个 TryPeek 方法可以在不修改的情况下返回当前头部元素。堆栈还提供了一个 Clear 方法来清除其内容。这段代码示例与上面的示例类似。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program {

     public static void Main() {

        // create a shared collection 
        ConcurrentStack<int> sharedStack = new ConcurrentStack<int>();

        // populate the collection with items to process
        for (int i = 0; i < 1000; i++) {
            sharedStack.Push(i);
        }

        // define a counter for the number of processed items
        int itemCount = 0;

        // create tasks to process the list
        Task[] tasks = new Task[10];
        for (int i = 0; i < tasks.Length; i++) {
            // create the new task
            tasks[i] = new Task(() => {

                while (sharedStack.Count > 0) {
                    // define a variable for the dequeue requests
                    int queueElement;
                    // take an item from the queue
                    bool gotElement = sharedStack.TryPop(out queueElement);
                    // increment the count of items processed
                    if (gotElement) {
                        Interlocked.Increment(ref itemCount);
                    }
                }

            });
            // start the new task
            tasks[i].Start();
        }

        // wait for the tasks to complete
        Task.WaitAll(tasks);

        // report on the number of items processed
        Console.WriteLine("Items processed: {0}", itemCount);

        // wait for input before exiting
        Console.WriteLine("Press <enter> to finish...");
        Console.ReadLine();
    }
}

编译这段代码会产生这个结果

Items processed: 1000
Press <Enter> to finish...

ConcurrentDictionary:非阻塞

ConcurrentDictionary 类实现了一个键值对集合,是 System.Collections.IDictionary 实现的并发版本。您可以定义所需的并发级别(将更新字典的任务或线程的最大数量)、其首选初始容量以及用于比较键的 IEqualityComparer<tkey> 实现。您可以使用 Item 属性来获取或设置与指定键关联的值。此属性的作用类似于 TKey 的索引器,并返回一个 TValue

using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;

public  class MyParallel<TKey,TValue> {
    private ConcurrentDictionary<TKey, Lazy<TValue>> dictionary;
    private Func<TKey, TValue> valueFactory;

    public MyParallel(Func<TKey, TValue> factory) {
        // set the factory instance variable
        valueFactory = factory;
        // initialize the dictionary
        dictionary = new ConcurrentDictionary<TKey,Lazy<TValue>>();
    }

    public TValue GetValue(TKey key) {
        return dictionary.GetOrAdd(key, 
            new Lazy<TValue>(() => valueFactory(key))).Value;
    }
}

public static class Program {
public  static void Main() {

        // create the cache
        MyParallel<int, double> cache
            = new MyParallel<int, double>(key => {
                Console.WriteLine("Created value for key {0}", key);
                return Math.Pow(key, 2);
            });

        for (int i = 0; i < 10; i++) {
            Task.Factory.StartNew(() => {
                for (int j = 0; j < 20; j++) {
                    Console.WriteLine(
                        "Task {0} got value {1} for key {2}",
                        Task.CurrentId, cache.GetValue(j), j);
                }
            });
        }

        // wait for input before exiting
        Console.WriteLine("Press enter to finish");
        Console.ReadLine();
    }
}

编译这段代码会得到以下结果

Press enter to finish
Created value for key 0
Task 1 got value 0 for key 0
Created value for key 1
Task 1 got value 1 for key 1
Created value for key 2
Task 1 got value 4 for key 2
Created value for key 3
Task 1 got value 9 for key 3
Created value for key 4
Task 1 got value 16 for key 4
Created value for key 5
Task 1 got value 25 for key 5
Created value for key 6
Task 1 got value 36 for key 6
Created value for key 7
Task 1 got value 49 for key 7
Created value for key 8
Task 1 got value 64 for key 8
Created value for key 9
Task 1 got value 81 for key 9
Created value for key 10
Task 1 got value 100 for key 10
Created value for key 11
Task 1 got value 121 for key 11
Task 2 got value 0 for key 0
Task 2 got value 1 for key 1
Task 2 got value 4 for key 2
Task 2 got value 9 for key 3
Task 2 got value 16 for key 4
Task 2 got value 25 for key 5
Task 2 got value 36 for key 6
Task 2 got value 49 for key 7
Task 2 got value 64 for key 8
Task 2 got value 81 for key 9
Task 2 got value 100 for key 10
Task 2 got value 121 for key 11
Created value for key 12
Task 1 got value 144 for key 12
Created value for key 13
Task 1 got value 169 for key 13
Created value for key 14
Task 1 got value 196 for key 14
Created value for key 15
Task 1 got value 225 for key 15
Created value for key 16
Task 1 got value 256 for key 16
Task 2 got value 144 for key 12
Created value for key 17
Task 2 got value 169 for key 13
. . . . . . 
etc ....

映射到键的值是键的数字或 ID 的平方。

ConcurrentBag

根据 Gaston C. Hillar(《Professional Parallel Programming with C#》的作者)的说法,ConcurrentBag 对于同一线程添加元素(生产)和删除元素(消费)的某些场景来说是一个非常高效的集合。它使用了许多不同的机制来最小化对同步及其相关开销的需求。然而,有时它需要锁定,并且在生产者线程与消费者线程完全分离的场景中效率相当低。ConcurrentBag 为访问该包的每个线程维护一个本地队列,并在可能的情况下,以无锁的方式访问本地队列,几乎没有或根本没有争用。ConcurrentBag 代表一个包(bag),它是一个无序的、支持重复项的集合。因此,当排序不重要时,ConcurrentBag 可用于存储和访问对象。这里引用了他作品中的代码(旨在阐明 ConcurrentBag 类)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading.Tasks;
using System.Collections.Concurrent;

public class Program
{
    private static string RemoveLetters(char[] letters, string sentence)
    {
        var sb = new StringBuilder();
        bool match = false;

        for (int i = 0; i < sentence.Length; i++)
        {
            for (int j = 0; j < letters.Length; j++)
            {
                if (sentence[i] == letters[j])
                {
                    match = true;
                    break;
                }
            }
            if (!match)
            {
                sb.Append(sentence[i]);
            }
            match = false;
        }
        return sb.ToString();
    }

    private static string CapitalizeWords(char[] delimiters, 
                   string sentence, char newDelimiter)
    {
        string[] words = sentence.Split(delimiters);
        var sb = new StringBuilder();
        for (int i = 0; i < words.Length; i++)
        {
            if (words[i].Length > 1)
            {
                sb.Append(words[i][0].ToString().ToUpper());
                sb.Append(words[i].Substring(1).ToLower());
            }
            else
            {
                // Word with just 1 letter must be lowercase
                sb.Append(words[i].ToLower());
            }
            sb.Append(newDelimiter);
        }
        return sb.ToString();
    }

    private const int NUM_SENTENCES = 2000000;
    private static ConcurrentBag<string> _sentencesBag;
    private static ConcurrentBag<string> _capWordsInSentencesBag;
    private static ConcurrentBag<string> _finalSentencesBag;
    private static volatile bool _producingSentences = false;
    private static volatile bool _capitalizingWords = false;

    private static void ProduceSentences()
    {

        string[] possibleSentences = 
        { 
           "ConcurrentBag is included in the System.Collections.Concurrent namespace.",
           "Is parallelism important for cloud-computing?",
           "Parallelism is very important for cloud-computing!",
           "ConcurrentQueue is one of the new concurrent " + 
               "collections added in .NET Framework 4",
           "ConcurrentStack is a concurrent collection that represents a LIFO collection",
           "ConcurrentQueue is a concurrent collection that represents a FIFO collection" 
        };

        try
        {
            var rnd = new Random();

            for (int i = 0; i < NUM_SENTENCES; i++)
            {
                var sb = new StringBuilder();
                for (int j = 0; j < possibleSentences.Length; j++)
                {
                    if (rnd.Next(2) > 0)
                    {
                        sb.Append(possibleSentences[rnd.Next(possibleSentences.Length)]);
                        sb.Append(' ');
                    }
                }
                if (rnd.Next(20) > 15)
                {
                    _sentencesBag.Add(sb.ToString());
                }
                else
                {
                    _sentencesBag.Add(sb.ToString().ToUpper());
                }
            }
        }
        finally
        {
            _producingSentences = false;
        }
    }

    private static void CapitalizeWordsInSentences()
    {
        char[] delimiterChars = { ' ', ',', '.', ':', ';', '(', ')', 
               '[', ']', '{', '}', '/', '?', '@', '\t', '"' };

        // Start after Produce sentences began working
        System.Threading.SpinWait.SpinUntil(() => _producingSentences);

        try
        {
            _capitalizingWords = true;
            // This condition running in a loop (spinning) is very inefficient
            // This example uses this spinning for educational purposes
            // It isn't a best practice
            // Subsequent sections and chapters explain an improved version
            while ((!_sentencesBag.IsEmpty) || (_producingSentences))
            {
                string sentence;
                if (_sentencesBag.TryTake(out sentence))
                {
                    _capWordsInSentencesBag.Add(
                       CapitalizeWords(delimiterChars, sentence, '\\'));
                }
            }
        }
        finally
        {
            _capitalizingWords = false;
        }
    }

    private static void RemoveLettersInSentences()
    {
        char[] letterChars = { 'A', 'B', 'C', 'e', 'i', 'j', 'm', 'X', 'y', 'Z' };

        // Start after CapitalizedWordsInsentences began working
        System.Threading.SpinWait.SpinUntil(() => _capitalizingWords);
        // This condition running in a loop (spinning) is very inefficient
        // This example uses this spinning for educational purposes
        // It isn't a best practice
        // Subsequent sections and chapters explain an improved version
        while ((!_capWordsInSentencesBag.IsEmpty) || (_capitalizingWords))
        {
            string sentence;
            if (_capWordsInSentencesBag.TryTake(out sentence))
            {
                _finalSentencesBag.Add(RemoveLetters(letterChars, sentence));
            }
        }
    }

    static void Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        _sentencesBag = new ConcurrentBag<string>();
        _capWordsInSentencesBag = new ConcurrentBag<string>();
        _finalSentencesBag = new ConcurrentBag<string>();

        
        _producingSentences = true;

        Parallel.Invoke(
            () => ProduceSentences(),
            () => CapitalizeWordsInSentences(),
            () => RemoveLettersInSentences()
            );

        Console.WriteLine(
            "Number of sentences with capitalized words in the bag: {0}", 
            _capWordsInSentencesBag.Count);
        Console.WriteLine(
            "Number of sentences with removed letters in the bag: {0}",
            _finalSentencesBag.Count);

        Console.WriteLine(sw.Elapsed.ToString());
        //  Display the results and wait for the user to press a key
        Console.WriteLine("Finished!");
        Console.ReadLine();
    }
}

这段代码可以快速编译,但运行程序大约需要 30 秒才能产生以下结果

Number of sentences with capitalized words in the bag: 0
Number of sentences with removed letters in the bag: 2000000
00:01:04.2583357
Finished!

Parallel 类 Invoke 方法执行的方法展示了生产者/消费者关系。

1.jpg

参考文献

  • CLR via C#, 第 3 版 Jeffrey Richter
  • Parallel Programming with C#, Gaston C. Hillar, Wrox Publishing
© . All rights reserved.