使用匿名方法、异步处理和线程池提高性能
使用匿名方法、异步处理和线程池提高性能。
引言
我在新团队的第一周就被问到是否可以提高应用程序业务逻辑外观中某个方法中循环的性能。该循环的目的是在从数据层获取的自定义对象与业务层中相似但不兼容的自定义对象之间来回合成数据。我知道你在想什么……2001 年左右的相当标准的 OOP 内容,但你可能也知道,由于所有活动部件之间存在的共生关系,这需要良好的代码熟悉度。更困难的是,这些对象中的一些属性实际上返回了其他不兼容对象的集合,这些集合也需要来回合成。
Using the Code
我决定最好的行动方案可能是专注于循环本身。我知道循环迭代对象集合并将其转换为其他对象的集合,而且我知道集合可能很大,所以我认为通过简单地异步而非内联地填充循环中的对象,我可以立即获得一些性能改进。
我需要做的第一件事是基线性能计数。我需要获取循环本身完成所需的时间(以毫秒为单位),不包括获取操作,因为获取操作可能会使统计数据偏离多达 50%。在旧的 1.1 时代,我必须导入 Kernel32 库中的 QueryPerformanceCounter
类,并运行启动、停止和清除操作并计算总数:http://msdn2.microsoft.com/en-us/library/ms979201.aspx。
然而,现在我们可以使用 System.Diagnostics 库中的 Stopwatch
类,通过两个命令:Start
和 Stop
来完成同样的事情。不要忘记使用格式化字符串写入秒表输出,并对 ElapsedMiliseconds
属性进行 ToString()
以避免装箱操作。
让我们看一个代码模拟
public sealed class SolutionEntityMapper
{
public static FooDataCollection MapObject(FooEntityCollection fooEntityCollection)
{
FooDataCollection fooDataCollection = new FooDataCollection();
#if DEBUG
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
//Start the timer
sw.Start();
#endif
//Run the loop inline
foreach (FooEntityItem fooEntityItem in fooEntityCollection)
{
FooDataItem fooDataItem = new FooDataItem ();
fooDataItem.SomeProperty = fooEntityItem.SomeProperty;
//Do more long running tasks
fooDataCollection.Add(fooDataItem);
}
#if DEBUG
//Stop the timer
sw.Stop();
//Print out the elapsed time in miliseconds without boxing
System.Diagnostics.Debug.WriteLine(String.Format(
"Operation Executed in {0} miliseconds.", sw.ElapsedMilliseconds.ToString()));
#endif
//Return the collection
return fooDataCollection;
}
}
看起来很简单。启动计时器,运行循环,创建新对象,填充属性(请记住,某些属性返回集合,因此它们必须调用其他辅助函数来填充集合),然后停止计时器。为了准备集合,我创建了一个单元测试,该测试将使用 1000 个实体对象预填充集合。循环运行时间略低于 6000 毫秒;为了转换方便,我们称之为 6 秒。这对于一个小公司来说勉强可以接受,但对于一个服务 30,000+ 用户的公司来说,这根本无法很好地扩展。
好的,那我们异步试试看。我想实现的第一件事是让循环启动数据项,然后移动到下一个,启动它,然后等待所有完成并返回数据。合成数据的代码很长,我倾向于使用隔离行为的块状方法,所以我将代码移动到一个名为 ProcessEntity
的方法中,并专注于 MapObject
方法中的异步操作。接下来,我必须为 ProcessEntity
方法创建一个委托,以便我可以异步调用它。锁定共享方法
public sealed class SolutionEntityMapper
{
Static object myLock = new object();
delegate FooDataItem ProcessEntityDelegate (FooEntityItem fooEntityItem);
private static FooDataItem processEntity(FooEntityItem fooEntityItem)
{
lock (myLock);
{
FooDataItem fooDataItem = new FooDataItem();
fooDataItem.SomeProperty = fooEntityItem.SomeProperty;
//Do more long running tasks
return fooDataItem;}
}
}
}
现在我已经将方法分开了,让我们思考一下如何异步调用循环。我们知道,如果在循环中调用 BeginInvoke
然后调用 EndInvoke
,我们不如直接内联运行循环,因为 EndInvoke
会阻塞直到线程完成。异步运行循环并实现我们所需性能的唯一方法是使用回调方法。这带来了它自己的问题,因为我们还需要回调方法返回一个填充好的 fooDataItem
,然后将该项添加到集合中并将集合返回给调用方法。
我们可以使用 out
参数,但当它们异步返回时,我们如何将输出放入新集合中呢?我们可以声明一个 Member
集合并填充它,但在异步回调方法中跟踪我们填充的成员集合并不是我希望在未来版本的代码中做的事情。这就是 C# 匿名方法的优点所在。如果我们为回调方法创建一个匿名委托,我们可以在一个方法中访问数据项。如果我们为回调使用匿名方法,一个需要考虑的问题是确保在集合填充完成之前,我们不会将控制权返回给调用方法。为此,我创建了一个 while
循环,以确保返回的集合大小与处理的集合大小相同。我检查每个集合的 Count
属性,因为每次向集合添加成员时,Count
属性都会更新,并且成员会存储起来以备后用。这不是最好的选择,但我们将在帖子后面进行修复。
while (fooDataCollection.Count != fooEntityCollection.Count)
{
/*sit and spin until the thread finishes*/
}
现在我们来编写代码
public sealed class SolutionEntityMapper
{
Static object myLock = new object();
delegate FooDataItem ProcessEntityDelegate (FooEntityItem fooEntityItem);
delegate void callBackDelegate(IAsyncResult ar);
public static FooDataCollection MapObject(fooEntityCollection fooEntityCollection)
{
ProcessEntityDelegate processEntityDelegate =
new ProcessEntityDelegate SolutionEntityMapper.processEntity);
FooDataCollection fooDataCollection = new FooDataCollection();
#if DEBUG
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
//Start the timer
sw.Start();
#endif
//Create the Anonymous delegate method
callBackDelegate del = delegate(IAsyncResult ar)
{
ProcessEntityDelegate processSolution = ar.AsyncState as ProcessEntityDelegate;
//Pull the item out of the IAsynchResult
FooDataItem fooDataItem = processSolution.EndInvoke(ar);
//Add the item to the collection
fooDataCollection.Add(fooDataItem);
};
//Start the loop
foreach (fooEntityItem fooEntityItem in fooEntityCollection)
{
//Start Asynchronous operation
IAsyncResult result = processEntityDelegate.BeginInvoke(
fooEntityItem, new AsyncCallback(del), processEntityDelegate);
}
while (fooDataCollection.Count != fooEntityCollection.Count)
{ /*sit and spin until the thread finishes*/ }
#if DEBUG
/Stop the timer
sw.Stop();
//Print out the elapsed time in miliseconds without boxing
System.Diagnostics.Debug.WriteLine(String.Format(
"Operation Executed in {0} miliseconds.", sw.ElapsedMilliseconds.ToString()));
#endif
//Return the collection
return fooDataCollection;
}
private static FooDataItem processEntity(FooEntityItem fooEntityItem)
{
lock (myLock);
{
FooDataItem fooDataItem = new FooDataItem();
fooDataItem.SomeProperty = fooEntityItem.SomeProperty;
//Do more long running tasks
return fooDataItem;
}
}
}
那么,同步处理 1000 条记录与异步处理 1000 条记录的输出时间是多少?
- 原始循环:6000 毫秒或 6 秒。
- 异步循环:194 毫秒或 0.194 秒。
对于一个小小的重构来说,这不是一个糟糕的改进,但是,我们能做得更好吗?如果可以避免,使用 while
循环等待异步操作完成是不可取的。
异步调用更多是为了运行像 I/O 操作这样的单线程处理而设计的,而线程池是为了处理称为后台线程的多个任务而设计的,它们为我们处理线程管理开销。Web 池将保存最大可设置数量的工作线程,直到达到预定义限制(默认值为 25),并使它们处于暂停状态,以便为下一个异步操作做好准备。目前,我们将使用默认值;有关调优的更多说明将在本文末尾提供。有关更多信息,请参阅 http://msdn2.microsoft.com/en-us/library/0ka9477y.aspx。
我们需要确保在线程池完成之前不将控制权返回给调用方法,我们有几种可用的选项。我的选择是结合使用 AutoResetEvent
和 WaitHandle
。有关该主题的更多信息可以在这里找到:http://msdn2.microsoft.com/en-us/library/system.threading.waithandle.aspx。理想情况下,我们将 while
循环替换为 AutoResetEvent
数组,并为每个线程添加一个 AutoResetEvent
,然后调用 WaitHandle.WaitAll
,传入 AutoResetEvent
数组。为了使示例与上文更一致,我将 while
循环替换为单个 AutoResetEvent
,并使用与原始 while
循环类似的语法来信号 WaitHandle
。
让我们用线程池等待回调委托替换异步回调委托,看看会发生什么。
public sealed class SolutionEntityMapper
{
Static object myLock = new object();
delegate FooDataItem ProcessEntityDelegate (FooEntityItem fooEntityItem);
我们将匿名方法转换为线程池等待回调方法,但这次我们从线程上下文中提取 FooDataItem
。创建一个自动重置事件数组,并将一个 AutoResetEvent
添加到数组中,将其初始化为 false
。
public static FooDataCollection MapObject(fooEntityCollection fooEntityCollection)
{
ProcessEntityDelegate processEntityDelegate =
new ProcessEntityDelegate (SolutionEntityMapper.processEntity);
FooDataCollection fooDataCollection = new FooDataCollection();
AutoResetEvent[] threadCompleted = new AutoResetEvent[] { new AutoResetEvent(false) };
#if DEBUG
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
//Start the timer
sw.Start();
#endif
WaitCallback del = delegate(Object threadContext)
{
FooDataItem fooDataItem = threadContext as FooDataItem;
//Add the item to the collection
fooDataCollection.Add(fooDataItem);
//Signal the end of processing
if (fooDataCollection.Count == fooEntityCollection.Count)
threadCompleted[0].Set();
}
现在,我们可以将线程池代码移动到循环中
//Start the loop
foreach (fooEntityItem fooEntityItem in fooEntityCollection)
{
//Start Asynchronus operation
ThreadPool.QueueUserWorkItem(new WaitCallback(del),
processEntityDelegate (fooEntityItem));
}
#if DEBUG
/Stop the timer
sw.Stop();
//Print out the elapsed time in miliseconds without boxing
System.Diagnostics.Debug.WriteLine(String.Format(
"Operation Executed in {0} miliseconds.", sw.ElapsedMilliseconds.ToString()));
#endif
//Wait for all events to complete
WaitHandle.WaitAny(threadCompleted);
//Return the collection
return fooDataCollection;
}
}
那么,同步处理 1000 条记录与异步处理 1000 条记录与线程池处理的输出时间是多少?
- 原始循环:6000 毫秒或 6 秒。
- 异步循环:194 毫秒或 0.194 秒。
- 线程池:101 毫秒或 0.101 秒。
关注点
调优:池中的线程数将控制您可以同时完成的任务数量。一个线程池的限制为 25,将允许每个处理器核心有 25 个工作线程。一个双处理器或双核机器,线程池设置为最大 25 个线程,将允许 50 个线程池线程。如果您的应用程序在双处理器系统上接收 100 个同时请求,线程池限制为 25,那么 50 个将立即处理,另外 50 个将排队等待。当最初的 50 个完成后,其他的将向上移动到队列中。
另外,请记住,系统中的用户数量或同时连接的数量与同时请求的数量不同。