65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库:n 之 5

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (57投票s)

2011年3月26日

CPOL

9分钟阅读

viewsIcon

151324

downloadIcon

1748

探究任务并行库的使用。

演示代码源:TasksArticle5.zip

引言

这是我提出的TPL系列文章的第五部分。上次我介绍了Parallel For和Foreach,并涵盖了以下内容:

  • PLinq 简介
  • 有用的扩展方法
  • 简单的 PLinq 示例
  • 排序
  • 使用范围
  • 处理异常
  • 取消 PLinq 查询
  • 分区以获得可能更好的性能
  • 使用自定义聚合

这次我们将探讨如何使用一些新类,尽管我不会将其正式归类为 TPL 的一部分,但在使用 TPL 时很可能会用到它们。这些新类位于 System.Collections.Concurrent 命名空间中,以下是 .NET 4 中引入的该命名空间中的类列表:

类名 描述
BlockingCollection<T> 为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和边界功能.
ConcurrentBag<T> 表示对象的线程安全、无序集合。
ConcurrentDictionary<TKey, TValue> 表示一个线程安全的键值对集合,可以由多个线程并发访问。
ConcurrentQueue<T> 表示一个线程安全的先进先出 (FIFO) 集合。
ConcurrentStack<T> 表示一个线程安全的后进先出 (LIFO) 集合。
OrderablePartitioner<TSource> 表示将可排序数据源拆分为多个分区的特定方式。
Partitioner 为数组、列表和可枚举对象提供常见的分区策略。
Partitioner<TSource> 表示将数据源拆分为多个分区的特定方式。

从这张表中可以看出,我们之前已经见过其中一些类,例如 OrderedPartitioner<T>Partitioner<T>/Partitioner

 

 

文章系列路线图

这是可能包含 6 篇文章中的第 5 篇,我希望大家会喜欢。下面是我想要涵盖的大致内容。

  1. 启动任务/触发操作/异常处理/取消/UI同步
  2. 延续 / 取消链式任务
  3. 并行 For / 自定义分区器 / 聚合操作
  4. 并行 LINQ
  5. 管道(本文)
  6. 高级场景 / 任务的 v.Next

现在我意识到有些人会简单地阅读这篇文章,并表示它与 MSDN 上目前可用的内容相似,我部分同意这一点,但是,我选择继续撰写这些文章有几个原因,如下所示:

  • 只有前几篇文章会展示与MSDN相似的想法,之后我认为我将深入探讨的材料将不会出现在MSDN上,并且将是我代表我进行的一些TPL研究的结果,我将在文章中概述这些研究,因此您将受益于我的研究,您可以直接阅读...是的,很棒
  • 这里会有实时输出的屏幕截图,这是MSDN上没有多少的,这可能有助于一些读者加强文章的文本内容。
  • 这里可能有一些读者甚至从未听说过任务并行库,因此不会在MSDN中遇到它,你知道那个老故事,你必须首先知道你在寻找什么。
  • 我喜欢线程文章,所以喜欢做,所以我做了,会做,已经做了,并将继续做。

话虽如此,如果读者阅读了这篇文章后,真的认为它与MSDN太过相似(我仍然希望不会如此),也请告诉我,我会尝试调整即将发布的文章以作修改。

 

目录 

正如我在本文介绍中所说,在 System.Collections.Concurrent 命名空间中我们可以使用一些新类,在本文中我将重点介绍 BlockingCollection 的使用。

 

BlockingCollection

MSDN 对 BlockingCollection<T> 的描述如下:

下面的部分摘自 http://msdn.microsoft.com/en-us/library/dd997371.aspx,截至2011年3月25日

BlockingCollection<T> 是一个线程安全集合类,它提供以下功能:

  • 生产者-消费者模式的实现。
  • 多个线程并发添加和取出项目。
  • 可选的最大容量。
  • 当集合为空或满时,插入和删除操作会阻塞。
  • 插入和删除“尝试”操作,不会阻塞或阻塞到指定时间段。
  • 封装任何实现 IProducerConsumerCollection<T> 的集合类型
  • 使用取消令牌进行取消。
  • 两种使用 foreach(Visual Basic 中的 For Each)进行枚举的方式
    • 只读枚举。
    • 枚举时移除项目。

使用 BlockingCollection<T>,我们可以创建我称之为“管道”的东西,它包含多个阶段,每个阶段都利用生产者-消费者模式。因此,想象一下我们有一个三阶段的管道,第一个阶段会生成项目,第二个阶段会消耗这些项目,第二个阶段会消耗第一个阶段的输出,并为第三个阶段提供数据...你明白了吧。

那么让我们看一些示例管道。

 

BlockingCollection 基础

正如我所说,本文将重点关注 BlockingCollection<T>,它利用了生产者-消费者模式。那么这是如何发生的呢?我们来看看吧。

生产者

生产者负责创建项目,并且应该告知消费者生产已完成。一个非常简单的示例如下所示:

static void CreateInitialRange(BlockingCollection<int> output)
{
    try
    {
        for (int i = 1; i < 10; i++)
        {
            output.Add(i);
        }
    }
    finally
    {
        output.CompleteAdding();
    }
}

 

从上面的代码我们可以看出,我们向一个阻塞的 BlockingCollection<T> 中添加项目,当我们完成生产时,我们需要通过 BlockingCollection<T>.CompletedAdding 方法通知消费者不再有预期项目。

消费者

消费者显然是负责消费生产者产生的结果的那一方,那么它具体是如何做到的呢?实际上,它非常简单。我们真正需要做的就是使用 BlockingCollection<T>.GetConsumingEnumerable(),消费者可以使用它来消费生产者产生的值。以下是紧接上面代码片段的另一段代码片段:

static void DoubleTheRange(BlockingCollection<int> input, BlockingCollection<int> output)
{
    try
    {
        foreach (var number in input.GetConsumingEnumerable())
        {
            output.Add((int)(number * number));
        }
    }
    finally
    {
        output.CompleteAdding();
    }
}

 

简单管道

演示项目名称:SimplePipeline

现在我谈论了 BlockingCollection<T> 一点,让我们看看如何将这些生产者-消费者集合链接起来,形成更有意义的东西(我称之为管道)。

这是一个简单的控制台应用程序的完整代码列表,它执行以下管道:

  • 创建一个初始整数范围(生产阶段)
  • 将初始整数范围翻倍(来自生产者,也作为下一阶段的生产者)
  • 写入倍增阶段生产者的结果
class Program
{
    static void Main(string[] args)
    {
        var buffer1 = new BlockingCollection<int>(10);
        var buffer2 = new BlockingCollection<int>(10);

        var f = new TaskFactory(TaskCreationOptions.LongRunning,
                                TaskContinuationOptions.None);

        //Start the phases of the pipeline
        var stage1 = f.StartNew(() => CreateInitialRange(buffer1));
        var stage2 = f.StartNew(() => DoubleTheRange(buffer1, buffer2));
        var stage3 = f.StartNew(() => WriteResults(buffer2));
        //wait for the phases to complete
        Task.WaitAll(stage1, stage2, stage3);

        Console.ReadLine();
    }



    static void CreateInitialRange(BlockingCollection<int> output)
    {
        try
        {
            for (int i = 1; i < 10; i++)
            {
                output.Add(i);
                Console.WriteLine("CreateInitialRange {0}", i);
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }


    static void DoubleTheRange(
		BlockingCollection<int> input, 
		BlockingCollection<int> output)
    {
        try
        {
            foreach (var number in input.GetConsumingEnumerable())
            {
                output.Add((int)(number * number));
            }
        }
        finally
        {
            output.CompleteAdding();
        }
    }


    static void WriteResults(BlockingCollection<int> input)
    {
        foreach (var squaredNumber in input.GetConsumingEnumerable())
        {
            Console.WriteLine("Result is {0}", squaredNumber);
        }
    }

}

以下是运行此代码的结果:

 

更复杂的管道

演示项目名称:WPFImagePipeline

我还包含了一个更复杂的管道,它与上面的示例功能几乎相同,但使用WPF应用程序来显示最终消费者的结果。我认为这可能更容易让人们理解这个演示中发生了什么。

我应该指出,这个演示试图查找图像,这些图像可以是本地文件系统上的,也可以是基于网络的。所创建的管道类型取决于以下 App.Config 设置:

如果您连接到互联网,只需确保 LocalImageFolder 指向您电脑上有效的图片路径。

我不想过多地讨论这段代码,因为它与讨论关系不大,但我认为对这个演示的工作原理有个大致的了解可能会有所帮助,所以我们开始吧。

 

View

只有一个窗口,其中包含以下XAML(我犹豫了是否要展示,但最终我决定展示,因为文章中包含一个有用的线程控件,需要一些XAML才能使其工作)

<controls:AsyncHost AsyncState="{Binding Path=AsyncState, Mode=OneWay}">

    <Grid x:Name="mainGrid" Background="White" Margin="5"
            controls:AsyncHost.AsyncContentType="Content">


        <ListBox ItemsSource="{Binding ProcessedImages}" 
            ItemTemplate="{StaticResource ImageInfoDataTemplate}" 
            BorderThickness="0"
            BorderBrush="Transparent"
            ItemContainerStyle="{StaticResource ImageInfoListBoxItemStyle}">
            <ListBox.ItemsPanel>
                <ItemsPanelTemplate>
                    <controls:ScatterPanel Background="White">
                    </controls:ScatterPanel>
                </ItemsPanelTemplate>
            </ListBox.ItemsPanel>

        </ListBox>
    </Grid>
    <controls:AsyncBusyUserControl 
        controls:AsyncHost.AsyncContentType="Busy" 
        AsyncWaitText="{Binding Path=WaitText, Mode=OneWay}" 
        Visibility="Hidden" />
    <controls:AsyncFailedUserControl 
        controls:AsyncHost.AsyncContentType="Error" 
        Error="{Binding Path=ErrorMessage, Mode=OneWay}" 
        Visibility="Hidden" />
</controls:AsyncHost>

一般的想法是主窗口只是在 ListBox 中显示图像列表。这些图像可以通过管道从本地硬盘(参见 App.Config)或通过随机 Google 搜索获取。

线程组件助手被称为 AsyncHost,并绑定到此窗口使用的 ViewModel 公开的 AsyncState 属性。根据该绑定属性的值,AsyncHost 将显示以下三项之一:

  1. AsyncState = Content,将显示标记为附加属性设置为“Content”的元素,在此示例中为 ListBox
  2. AsyncState = Busy,将显示标记为附加属性设置为“Busy”的元素,在此示例中是名为 AyncBusyUserControl 的自定义控件。
  3. AsyncState = Error,将显示标记为附加属性设置为“Error”的元素,在此示例中是名为 AyncFailedUserControl 的自定义控件。

所有这些控件都在下载中,您可以随意查看这些控件,它们非常有用,我发现在任何WPF UI项目中,当我执行可能失败的长时间运行任务时,我经常使用它们。

这就是相关的 XAML

 

ViewModel

现在我们已经看到了视图中最相关的部分,让我们看看视图模型,它看起来像这样:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Common;
using WPFImagePipeline.Services;
using WPFImagePipeline.Model;
using System.Configuration;
using System.IO;
using WPFImagePipeline.Controls;

namespace WPFImagePipeline.ViewModels
{
    public class MainWindowViewModel : INPCBase
    {
        private ILocalImagePipelineService localImagePipelineService;
        private IGoogleImagePipeLineService googleImagePipelineService;
        private IMessageBoxService messageBoxService;
        private List<ImageInfo> processedImages = new List<ImageInfo>();
        private bool useWebBasedImages = false;
        private string localImageFolder = "";
        private string defaultImagePath = @"C:\Users\Public\Pictures\Sample Pictures";


        private string waitText;
        private string errorMessage;
        private AsyncType asyncState = AsyncType.Content;


        public MainWindowViewModel(
            ILocalImagePipelineService imagePipelineService, 
            IGoogleImagePipeLineService googleSearchProvider,
            IMessageBoxService messageBoxService)
        {
            this.localImagePipelineService = imagePipelineService;
            this.googleImagePipelineService = googleSearchProvider;
            this.messageBoxService = messageBoxService;

            imagePipelineService.PipeLineCompleted += ImagePipelineService_PipeLineCompleted;
            googleSearchProvider.PipeLineCompleted += GooglePipelineService_PipeLineCompleted;

            AsyncState = AsyncType.Content;
            WaitText ="Fetching images";

        }




        public void DoIt()
        {
            AsyncState = AsyncType.Busy;
            bool result=false;
            if (Boolean.TryParse(
                ConfigurationManager.AppSettings["UseWebBasedImages"].ToString(), 
                out useWebBasedImages))
            {
                if (useWebBasedImages)
                {
                    googleImagePipelineService.StartPipeline();
                }
                else
                {
                    ShowUsingLocalImages();
                }
            }
            else
            {
                ShowUsingLocalImages();   
            }
        }


        public List<ImageInfo> ProcessedImages
        {
            get { return processedImages; }
            set
            {
                if (processedImages != value)
                {
                    processedImages = value;
                    NotifyPropertyChanged("ProcessedImages");
                }
            }
        }


        public AsyncType AsyncState
        {
            get { return asyncState; }
            set
            {
                if (asyncState != value)
                {
                    asyncState = value;
                    NotifyPropertyChanged("AsyncState");
                }
            }
        }



        public string WaitText
        {
            get { return waitText; }
            set
            {
                if (waitText != value)
                {
                    waitText = value;
                    NotifyPropertyChanged("WaitText");
                }
            }
        }


        public string ErrorMessage
        {
            get { return errorMessage; }
            set
            {
                if (errorMessage != value)
                {
                    errorMessage = value;
                    NotifyPropertyChanged("ErrorMessage");
                }
            }
        }




        private void ShowUsingLocalImages()
        {
            localImageFolder = ConfigurationManager.AppSettings["LocalImageFolder"].ToString();
            if (!String.IsNullOrEmpty(localImageFolder))
            {
                if (Directory.Exists(localImageFolder))
                {
                    localImagePipelineService.StartPipeline(localImageFolder);
                }
                else
                {
                    messageBoxService.ShowMessage(
			"The LocalImageFolder folder you specified does not exist");
                }
            }
            else
            {
                localImagePipelineService.StartPipeline(
			@"C:\Users\Public\Pictures\Sample Pictures");
            }
        }


        private void ImagePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
        {
            ProcessedImages = e.GatheredImages;
            AsyncState = AsyncType.Content;
        }

        private void GooglePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
        {
            ProcessedImages = e.GatheredImages;
            AsyncState = AsyncType.Content;
        }
       

    }
}

需要注意的是,MainWindowViewModel 使用了两个服务,一个用于本地图像管道,另一个用于 Google 图像管道。这两个服务如下所示。使用哪个服务取决于 App.Config 中的“UseWebBasedImages”设置。

 

LocalImagePipelineService:本地管道

此服务执行本地图像搜索管道。其思路是,当管道完成添加时,它将引发 PipeLineCompleted 事件,MainWindowViewModel 将监听此事件,然后获取图像并将其设置到 MainWindowViewModel 中的一个属性上,该属性将更新 XAML 中的 ListBox 将监听的绑定,然后图像就会显示出来。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using WPFImagePipeline.Model;
using System.Threading.Tasks;
using System.IO;

namespace WPFImagePipeline.Services
{

    public class ImagePipelineCompletedArgs : EventArgs
    {
        public List<ImageInfo> GatheredImages { get; private set; }

        public ImagePipelineCompletedArgs(List<ImageInfo> gatheredImages)
        {
            this.GatheredImages = gatheredImages;
        }
    }


    public interface ILocalImagePipelineService
    {
        void StartPipeline(string yourImageFolder);
        event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

    }



    public class LocalImagePipelineService : ILocalImagePipelineService
    {
        private Object locker = new Object();


        private FileInfo[] GetAllMatchingImageFiles(string yourImageFolder)
        {
            string lookfor = "*.png;*.jpg;*.gif;*.tif";
            string[] extensions = lookfor.Split(new char[] { ';' });

            List<FileInfo> myfileinfos = new List<FileInfo>();
            DirectoryInfo di = new DirectoryInfo(yourImageFolder);

            foreach (string ext in extensions)
            {
                myfileinfos.AddRange(di.GetFiles(ext));
            }

            return myfileinfos.ToArray();
        }


        private void CreateImageUrls(string yourImageFolder, BlockingCollection<string> urls)
        {
            try
            {
                FileInfo[] files = GetAllMatchingImageFiles(yourImageFolder);
                Random rand = new Random();
                int added = 0;
                do
                {
                    int idx = rand.Next(0, files.Count());
                    urls.Add(files[idx].FullName);
                    ++added;
                } while (added < 100);
            }
            finally
            {
                urls.CompleteAdding();
            }
        }

        private void CreateImageInfos(BlockingCollection<string> urls, 
            BlockingCollection<ImageInfo> initialImageInfos)
        {
            try
            {
                foreach (string url in urls.GetConsumingEnumerable())
                {
                    int idx = url.LastIndexOf(@"\") + 1;
                    initialImageInfos.Add(new ImageInfo(url, url.Substring(idx,url.Length-idx)));
                }
            }
            finally
            {
                initialImageInfos.CompleteAdding();
            }
        }


        private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
        {
            List<ImageInfo> localInfos = new List<ImageInfo>();

            try
            {
                foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
                {
                    lock (locker)
                    {
                        localInfos.Add(imageInfo);
                    }
                }
            }
            finally
            {
                OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
            }
        }



        #region IImagePipelineService Members

        public void StartPipeline(string yourImageFolder)
        {
            BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
            BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);

            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning, 
                TaskContinuationOptions.None);

            Task stage1 = factory.StartNew(() => CreateImageUrls(yourImageFolder,buffer1));
            Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
            Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));

        }


        public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

        protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
        {
            if (PipeLineCompleted != null)
            {
                PipeLineCompleted(this, e);
            }
        }
        #endregion
    }
}

GoogleImagePipelineService:网络管道

此服务使用免费的 .NET Google 搜索 API Dll 执行网络图片搜索管道。和之前一样,其思路是当管道完成添加时,它将引发 PipeLineCompleted 事件,MainWindowViewModel 将监听此事件,然后获取图片并将其设置到 MainWindowViewModel 中的一个属性上,该属性将更新 XAML 中 ListBox 将监听的绑定,然后图片就会显示出来。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Model;
using System.Threading;
using System.Threading.Tasks;
using Gapi.Search;
using System.Collections.Concurrent;

namespace WPFImagePipeline.Services
{
    #region IGoogleSearchProvider
    public interface IGoogleImagePipeLineService
    {
        void StartPipeline();
        event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
    }
    #endregion

    #region GoogleSearchProvider
    public class GoogleImagePipeLineService : IGoogleImagePipeLineService
    {

        private List<string> randomKeyWords = new List<string>() 
            {   "pitbull", "shark", "dog", "parrot", "robot", 
                "cheerleader", "gun", "skull", "plane", "manga", 
                "bikini","model","snake","spider" 
            };
        private Random rand = new Random();
        private List<string> urls = new List<string>();
        private Object locker = new Object();



        private void CreateImageUrls(BlockingCollection<string> urls)
        {
            try
            {
                int added = 0;

                do
                {
                    string keyword = randomKeyWords[rand.Next(0, randomKeyWords.Count)];
                    SearchResults searchResults = Searcher.Search(SearchType.Image, keyword);

                    if (searchResults.Items.Count() > 0)
                    {
                        foreach (var searchResult in searchResults.Items)
                        {
                            urls.Add(searchResult.Url);
                            ++added;
                        }
                    }
                } while (added < 100);


            }
            finally
            {
                urls.CompleteAdding();
            }
        }

        private void CreateImageInfos(BlockingCollection<string> urls,
            BlockingCollection<ImageInfo> initialImageInfos)
        {
            try
            {
                foreach (string url in urls.GetConsumingEnumerable())
                {
                    int idx = url.LastIndexOf(@"\") + 1;
                    initialImageInfos.Add(new ImageInfo(url, url.Substring(idx, url.Length - idx)));
                }
            }
            finally
            {
                initialImageInfos.CompleteAdding();
            }
        }


        private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
        {
            List<ImageInfo> localInfos = new List<ImageInfo>();

            try
            {
                foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
                {
                    lock (locker)
                    {
                        localInfos.Add(imageInfo);
                    }
                }
            }
            finally
            {
                OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
            }
        }




        #region IImagePipelineService Members

        public void StartPipeline()
        {
            BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
            BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);

            TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
                TaskContinuationOptions.None);

            Task stage1 = factory.StartNew(() => CreateImageUrls(buffer1));
            Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
            Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));

        }


        public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;

        protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
        {
            if (PipeLineCompleted != null)
            {
                PipeLineCompleted(this, e);
            }
        }
        #endregion
    }
    #endregion

}

可以看出,这两个服务都使用了你之前见过的 TPL 概念以及本文中的 BlockingCollection<T> 相关内容。

 

外观

这是管道末端生成的内容,它是一个用于 WPF 的类表面面板,它利用了我同事的 SurfacePanel。这是可能的,因为我简单地将标准的 WPF ListBox 面板替换为我同事的面板。所以,是的,你下面看到的就是一个 ListBox

代码是了解这一切如何运作的地方。

 

 

 

暂时就这些

这就是我在这篇文章中想说的全部内容。我希望你喜欢它,并想要更多。如果你喜欢这篇文章并想要更多,能否抽出一些时间留下评论和投票。非常感谢。

希望,下次再见。

© . All rights reserved.