任务并行库:n 之 5






4.93/5 (57投票s)
探究任务并行库的使用。
演示代码源:TasksArticle5.zip
引言
这是我提出的TPL系列文章的第五部分。上次我介绍了Parallel For和Foreach,并涵盖了以下内容:
- PLinq 简介
- 有用的扩展方法
- 简单的 PLinq 示例
- 排序
- 使用范围
- 处理异常
- 取消 PLinq 查询
- 分区以获得可能更好的性能
- 使用自定义聚合
这次我们将探讨如何使用一些新类,尽管我不会将其正式归类为 TPL 的一部分,但在使用 TPL 时很可能会用到它们。这些新类位于 System.Collections.Concurrent 命名空间中,以下是 .NET 4 中引入的该命名空间中的类列表:
类名 | 描述 |
BlockingCollection<T> | 为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和边界功能 |
ConcurrentBag<T> | 表示对象的线程安全、无序集合。 |
ConcurrentDictionary<TKey, TValue> | 表示一个线程安全的键值对集合,可以由多个线程并发访问。 |
ConcurrentQueue<T> | 表示一个线程安全的先进先出 (FIFO) 集合。 |
ConcurrentStack<T> | 表示一个线程安全的后进先出 (LIFO) 集合。 |
OrderablePartitioner<TSource> | 表示将可排序数据源拆分为多个分区的特定方式。 |
Partitioner | 为数组、列表和可枚举对象提供常见的分区策略。 |
Partitioner<TSource> | 表示将数据源拆分为多个分区的特定方式。 |
从这张表中可以看出,我们之前已经见过其中一些类,例如 OrderedPartitioner<T>
和 Partitioner<T>/Partitioner
。
文章系列路线图
这是可能包含 6 篇文章中的第 5 篇,我希望大家会喜欢。下面是我想要涵盖的大致内容。
现在我意识到有些人会简单地阅读这篇文章,并表示它与 MSDN 上目前可用的内容相似,我部分同意这一点,但是,我选择继续撰写这些文章有几个原因,如下所示:
- 只有前几篇文章会展示与MSDN相似的想法,之后我认为我将深入探讨的材料将不会出现在MSDN上,并且将是我代表我进行的一些TPL研究的结果,我将在文章中概述这些研究,因此您将受益于我的研究,您可以直接阅读...是的,很棒
- 这里会有实时输出的屏幕截图,这是MSDN上没有多少的,这可能有助于一些读者加强文章的文本内容。
- 这里可能有一些读者甚至从未听说过任务并行库,因此不会在MSDN中遇到它,你知道那个老故事,你必须首先知道你在寻找什么。
- 我喜欢线程文章,所以喜欢做,所以我做了,会做,已经做了,并将继续做。
话虽如此,如果读者阅读了这篇文章后,真的认为它与MSDN太过相似(我仍然希望不会如此),也请告诉我,我会尝试调整即将发布的文章以作修改。
目录
正如我在本文介绍中所说,在 System.Collections.Concurrent 命名空间中我们可以使用一些新类,在本文中我将重点介绍 BlockingCollection 的使用。
BlockingCollection
MSDN 对 BlockingCollection<T>
的描述如下:
下面的部分摘自 http://msdn.microsoft.com/en-us/library/dd997371.aspx,截至2011年3月25日
BlockingCollection<T>
是一个线程安全集合类,它提供以下功能:
- 生产者-消费者模式的实现。
- 多个线程并发添加和取出项目。
- 可选的最大容量。
- 当集合为空或满时,插入和删除操作会阻塞。
- 插入和删除“尝试”操作,不会阻塞或阻塞到指定时间段。
- 封装任何实现
IProducerConsumerCollection<T>
的集合类型 - 使用取消令牌进行取消。
- 两种使用 foreach(Visual Basic 中的 For Each)进行枚举的方式
- 只读枚举。
- 枚举时移除项目。
使用 BlockingCollection<T>
,我们可以创建我称之为“管道”的东西,它包含多个阶段,每个阶段都利用生产者-消费者模式。因此,想象一下我们有一个三阶段的管道,第一个阶段会生成项目,第二个阶段会消耗这些项目,第二个阶段会消耗第一个阶段的输出,并为第三个阶段提供数据...你明白了吧。
那么让我们看一些示例管道。
BlockingCollection 基础
正如我所说,本文将重点关注 BlockingCollection<T>
,它利用了生产者-消费者模式。那么这是如何发生的呢?我们来看看吧。
生产者
生产者负责创建项目,并且应该告知消费者生产已完成。一个非常简单的示例如下所示:
static void CreateInitialRange(BlockingCollection<int> output)
{
try
{
for (int i = 1; i < 10; i++)
{
output.Add(i);
}
}
finally
{
output.CompleteAdding();
}
}
从上面的代码我们可以看出,我们向一个阻塞的 BlockingCollection<T>
中添加项目,当我们完成生产时,我们需要通过 BlockingCollection<T>.CompletedAdding
方法通知消费者不再有预期项目。
消费者
消费者显然是负责消费生产者产生的结果的那一方,那么它具体是如何做到的呢?实际上,它非常简单。我们真正需要做的就是使用 BlockingCollection<T>.GetConsumingEnumerable()
,消费者可以使用它来消费生产者产生的值。以下是紧接上面代码片段的另一段代码片段:
static void DoubleTheRange(BlockingCollection<int> input, BlockingCollection<int> output)
{
try
{
foreach (var number in input.GetConsumingEnumerable())
{
output.Add((int)(number * number));
}
}
finally
{
output.CompleteAdding();
}
}
简单管道
演示项目名称:SimplePipeline
现在我谈论了 BlockingCollection<T>
一点,让我们看看如何将这些生产者-消费者集合链接起来,形成更有意义的东西(我称之为管道)。
这是一个简单的控制台应用程序的完整代码列表,它执行以下管道:
- 创建一个初始整数范围(生产阶段)
- 将初始整数范围翻倍(来自生产者,也作为下一阶段的生产者)
- 写入倍增阶段生产者的结果
class Program
{
static void Main(string[] args)
{
var buffer1 = new BlockingCollection<int>(10);
var buffer2 = new BlockingCollection<int>(10);
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
//Start the phases of the pipeline
var stage1 = f.StartNew(() => CreateInitialRange(buffer1));
var stage2 = f.StartNew(() => DoubleTheRange(buffer1, buffer2));
var stage3 = f.StartNew(() => WriteResults(buffer2));
//wait for the phases to complete
Task.WaitAll(stage1, stage2, stage3);
Console.ReadLine();
}
static void CreateInitialRange(BlockingCollection<int> output)
{
try
{
for (int i = 1; i < 10; i++)
{
output.Add(i);
Console.WriteLine("CreateInitialRange {0}", i);
}
}
finally
{
output.CompleteAdding();
}
}
static void DoubleTheRange(
BlockingCollection<int> input,
BlockingCollection<int> output)
{
try
{
foreach (var number in input.GetConsumingEnumerable())
{
output.Add((int)(number * number));
}
}
finally
{
output.CompleteAdding();
}
}
static void WriteResults(BlockingCollection<int> input)
{
foreach (var squaredNumber in input.GetConsumingEnumerable())
{
Console.WriteLine("Result is {0}", squaredNumber);
}
}
}
以下是运行此代码的结果:
更复杂的管道
演示项目名称:WPFImagePipeline
我还包含了一个更复杂的管道,它与上面的示例功能几乎相同,但使用WPF应用程序来显示最终消费者的结果。我认为这可能更容易让人们理解这个演示中发生了什么。
我应该指出,这个演示试图查找图像,这些图像可以是本地文件系统上的,也可以是基于网络的。所创建的管道类型取决于以下 App.Config 设置:
如果您未连接到互联网,只需确保 LocalImageFolder 指向您电脑上有效的图片路径。
我不想过多地讨论这段代码,因为它与讨论关系不大,但我认为对这个演示的工作原理有个大致的了解可能会有所帮助,所以我们开始吧。
View
只有一个窗口,其中包含以下XAML(我犹豫了是否要展示,但最终我决定展示,因为文章中包含一个有用的线程控件,需要一些XAML才能使其工作)
<controls:AsyncHost AsyncState="{Binding Path=AsyncState, Mode=OneWay}">
<Grid x:Name="mainGrid" Background="White" Margin="5"
controls:AsyncHost.AsyncContentType="Content">
<ListBox ItemsSource="{Binding ProcessedImages}"
ItemTemplate="{StaticResource ImageInfoDataTemplate}"
BorderThickness="0"
BorderBrush="Transparent"
ItemContainerStyle="{StaticResource ImageInfoListBoxItemStyle}">
<ListBox.ItemsPanel>
<ItemsPanelTemplate>
<controls:ScatterPanel Background="White">
</controls:ScatterPanel>
</ItemsPanelTemplate>
</ListBox.ItemsPanel>
</ListBox>
</Grid>
<controls:AsyncBusyUserControl
controls:AsyncHost.AsyncContentType="Busy"
AsyncWaitText="{Binding Path=WaitText, Mode=OneWay}"
Visibility="Hidden" />
<controls:AsyncFailedUserControl
controls:AsyncHost.AsyncContentType="Error"
Error="{Binding Path=ErrorMessage, Mode=OneWay}"
Visibility="Hidden" />
</controls:AsyncHost>
一般的想法是主窗口只是在 ListBox
中显示图像列表。这些图像可以通过管道从本地硬盘(参见 App.Config)或通过随机 Google 搜索获取。
线程组件助手被称为 AsyncHost
,并绑定到此窗口使用的 ViewModel 公开的 AsyncState
属性。根据该绑定属性的值,AsyncHost
将显示以下三项之一:
AsyncState
=Content
,将显示标记为附加属性设置为“Content
”的元素,在此示例中为ListBox
。AsyncState
=Busy
,将显示标记为附加属性设置为“Busy
”的元素,在此示例中是名为AyncBusyUserControl
的自定义控件。AsyncState
=Error
,将显示标记为附加属性设置为“Error
”的元素,在此示例中是名为AyncFailedUserControl
的自定义控件。
所有这些控件都在下载中,您可以随意查看这些控件,它们非常有用,我发现在任何WPF UI项目中,当我执行可能失败的长时间运行任务时,我经常使用它们。
这就是相关的 XAML
ViewModel
现在我们已经看到了视图中最相关的部分,让我们看看视图模型,它看起来像这样:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Common;
using WPFImagePipeline.Services;
using WPFImagePipeline.Model;
using System.Configuration;
using System.IO;
using WPFImagePipeline.Controls;
namespace WPFImagePipeline.ViewModels
{
public class MainWindowViewModel : INPCBase
{
private ILocalImagePipelineService localImagePipelineService;
private IGoogleImagePipeLineService googleImagePipelineService;
private IMessageBoxService messageBoxService;
private List<ImageInfo> processedImages = new List<ImageInfo>();
private bool useWebBasedImages = false;
private string localImageFolder = "";
private string defaultImagePath = @"C:\Users\Public\Pictures\Sample Pictures";
private string waitText;
private string errorMessage;
private AsyncType asyncState = AsyncType.Content;
public MainWindowViewModel(
ILocalImagePipelineService imagePipelineService,
IGoogleImagePipeLineService googleSearchProvider,
IMessageBoxService messageBoxService)
{
this.localImagePipelineService = imagePipelineService;
this.googleImagePipelineService = googleSearchProvider;
this.messageBoxService = messageBoxService;
imagePipelineService.PipeLineCompleted += ImagePipelineService_PipeLineCompleted;
googleSearchProvider.PipeLineCompleted += GooglePipelineService_PipeLineCompleted;
AsyncState = AsyncType.Content;
WaitText ="Fetching images";
}
public void DoIt()
{
AsyncState = AsyncType.Busy;
bool result=false;
if (Boolean.TryParse(
ConfigurationManager.AppSettings["UseWebBasedImages"].ToString(),
out useWebBasedImages))
{
if (useWebBasedImages)
{
googleImagePipelineService.StartPipeline();
}
else
{
ShowUsingLocalImages();
}
}
else
{
ShowUsingLocalImages();
}
}
public List<ImageInfo> ProcessedImages
{
get { return processedImages; }
set
{
if (processedImages != value)
{
processedImages = value;
NotifyPropertyChanged("ProcessedImages");
}
}
}
public AsyncType AsyncState
{
get { return asyncState; }
set
{
if (asyncState != value)
{
asyncState = value;
NotifyPropertyChanged("AsyncState");
}
}
}
public string WaitText
{
get { return waitText; }
set
{
if (waitText != value)
{
waitText = value;
NotifyPropertyChanged("WaitText");
}
}
}
public string ErrorMessage
{
get { return errorMessage; }
set
{
if (errorMessage != value)
{
errorMessage = value;
NotifyPropertyChanged("ErrorMessage");
}
}
}
private void ShowUsingLocalImages()
{
localImageFolder = ConfigurationManager.AppSettings["LocalImageFolder"].ToString();
if (!String.IsNullOrEmpty(localImageFolder))
{
if (Directory.Exists(localImageFolder))
{
localImagePipelineService.StartPipeline(localImageFolder);
}
else
{
messageBoxService.ShowMessage(
"The LocalImageFolder folder you specified does not exist");
}
}
else
{
localImagePipelineService.StartPipeline(
@"C:\Users\Public\Pictures\Sample Pictures");
}
}
private void ImagePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
{
ProcessedImages = e.GatheredImages;
AsyncState = AsyncType.Content;
}
private void GooglePipelineService_PipeLineCompleted(object sender, ImagePipelineCompletedArgs e)
{
ProcessedImages = e.GatheredImages;
AsyncState = AsyncType.Content;
}
}
}
需要注意的是,MainWindowViewModel
使用了两个服务,一个用于本地图像管道,另一个用于 Google 图像管道。这两个服务如下所示。使用哪个服务取决于 App.Config 中的“UseWebBasedImages”设置。
LocalImagePipelineService:本地管道
此服务执行本地图像搜索管道。其思路是,当管道完成添加时,它将引发 PipeLineCompleted
事件,MainWindowViewModel
将监听此事件,然后获取图像并将其设置到 MainWindowViewModel
中的一个属性上,该属性将更新 XAML 中的 ListBox
将监听的绑定,然后图像就会显示出来。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using WPFImagePipeline.Model;
using System.Threading.Tasks;
using System.IO;
namespace WPFImagePipeline.Services
{
public class ImagePipelineCompletedArgs : EventArgs
{
public List<ImageInfo> GatheredImages { get; private set; }
public ImagePipelineCompletedArgs(List<ImageInfo> gatheredImages)
{
this.GatheredImages = gatheredImages;
}
}
public interface ILocalImagePipelineService
{
void StartPipeline(string yourImageFolder);
event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
}
public class LocalImagePipelineService : ILocalImagePipelineService
{
private Object locker = new Object();
private FileInfo[] GetAllMatchingImageFiles(string yourImageFolder)
{
string lookfor = "*.png;*.jpg;*.gif;*.tif";
string[] extensions = lookfor.Split(new char[] { ';' });
List<FileInfo> myfileinfos = new List<FileInfo>();
DirectoryInfo di = new DirectoryInfo(yourImageFolder);
foreach (string ext in extensions)
{
myfileinfos.AddRange(di.GetFiles(ext));
}
return myfileinfos.ToArray();
}
private void CreateImageUrls(string yourImageFolder, BlockingCollection<string> urls)
{
try
{
FileInfo[] files = GetAllMatchingImageFiles(yourImageFolder);
Random rand = new Random();
int added = 0;
do
{
int idx = rand.Next(0, files.Count());
urls.Add(files[idx].FullName);
++added;
} while (added < 100);
}
finally
{
urls.CompleteAdding();
}
}
private void CreateImageInfos(BlockingCollection<string> urls,
BlockingCollection<ImageInfo> initialImageInfos)
{
try
{
foreach (string url in urls.GetConsumingEnumerable())
{
int idx = url.LastIndexOf(@"\") + 1;
initialImageInfos.Add(new ImageInfo(url, url.Substring(idx,url.Length-idx)));
}
}
finally
{
initialImageInfos.CompleteAdding();
}
}
private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
{
List<ImageInfo> localInfos = new List<ImageInfo>();
try
{
foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
{
lock (locker)
{
localInfos.Add(imageInfo);
}
}
}
finally
{
OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
}
}
#region IImagePipelineService Members
public void StartPipeline(string yourImageFolder)
{
BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);
TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
Task stage1 = factory.StartNew(() => CreateImageUrls(yourImageFolder,buffer1));
Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));
}
public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
{
if (PipeLineCompleted != null)
{
PipeLineCompleted(this, e);
}
}
#endregion
}
}
GoogleImagePipelineService:网络管道
此服务使用免费的 .NET Google 搜索 API Dll 执行网络图片搜索管道。和之前一样,其思路是当管道完成添加时,它将引发 PipeLineCompleted
事件,MainWindowViewModel
将监听此事件,然后获取图片并将其设置到 MainWindowViewModel
中的一个属性上,该属性将更新 XAML 中 ListBox
将监听的绑定,然后图片就会显示出来。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using WPFImagePipeline.Model;
using System.Threading;
using System.Threading.Tasks;
using Gapi.Search;
using System.Collections.Concurrent;
namespace WPFImagePipeline.Services
{
#region IGoogleSearchProvider
public interface IGoogleImagePipeLineService
{
void StartPipeline();
event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
}
#endregion
#region GoogleSearchProvider
public class GoogleImagePipeLineService : IGoogleImagePipeLineService
{
private List<string> randomKeyWords = new List<string>()
{ "pitbull", "shark", "dog", "parrot", "robot",
"cheerleader", "gun", "skull", "plane", "manga",
"bikini","model","snake","spider"
};
private Random rand = new Random();
private List<string> urls = new List<string>();
private Object locker = new Object();
private void CreateImageUrls(BlockingCollection<string> urls)
{
try
{
int added = 0;
do
{
string keyword = randomKeyWords[rand.Next(0, randomKeyWords.Count)];
SearchResults searchResults = Searcher.Search(SearchType.Image, keyword);
if (searchResults.Items.Count() > 0)
{
foreach (var searchResult in searchResults.Items)
{
urls.Add(searchResult.Url);
++added;
}
}
} while (added < 100);
}
finally
{
urls.CompleteAdding();
}
}
private void CreateImageInfos(BlockingCollection<string> urls,
BlockingCollection<ImageInfo> initialImageInfos)
{
try
{
foreach (string url in urls.GetConsumingEnumerable())
{
int idx = url.LastIndexOf(@"\") + 1;
initialImageInfos.Add(new ImageInfo(url, url.Substring(idx, url.Length - idx)));
}
}
finally
{
initialImageInfos.CompleteAdding();
}
}
private void AlertViewModel(BlockingCollection<ImageInfo> initialImageInfos)
{
List<ImageInfo> localInfos = new List<ImageInfo>();
try
{
foreach (ImageInfo imageInfo in initialImageInfos.GetConsumingEnumerable())
{
lock (locker)
{
localInfos.Add(imageInfo);
}
}
}
finally
{
OnPipeLineCompleted(new ImagePipelineCompletedArgs(localInfos));
}
}
#region IImagePipelineService Members
public void StartPipeline()
{
BlockingCollection<string> buffer1 = new BlockingCollection<string>(100);
BlockingCollection<ImageInfo> buffer2 = new BlockingCollection<ImageInfo>(100);
TaskFactory factory = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
Task stage1 = factory.StartNew(() => CreateImageUrls(buffer1));
Task stage2 = factory.StartNew(() => CreateImageInfos(buffer1, buffer2));
Task stage3 = factory.StartNew(() => AlertViewModel(buffer2));
}
public event EventHandler<ImagePipelineCompletedArgs> PipeLineCompleted;
protected virtual void OnPipeLineCompleted(ImagePipelineCompletedArgs e)
{
if (PipeLineCompleted != null)
{
PipeLineCompleted(this, e);
}
}
#endregion
}
#endregion
}
可以看出,这两个服务都使用了你之前见过的 TPL 概念以及本文中的 BlockingCollection<T>
相关内容。
外观
这是管道末端生成的内容,它是一个用于 WPF 的类表面面板,它利用了我同事的 SurfacePanel。这是可能的,因为我简单地将标准的 WPF ListBox
面板替换为我同事的面板。所以,是的,你下面看到的就是一个 ListBox
。
代码是了解这一切如何运作的地方。
暂时就这些
这就是我在这篇文章中想说的全部内容。我希望你喜欢它,并想要更多。如果你喜欢这篇文章并想要更多,能否抽出一些时间留下评论和投票。非常感谢。
希望,下次再见。