使用 AsyncVar 实现简单的异步操作






4.86/5 (7投票s)
一个简单的类,用于异步检索值并将其保留直到需要。
引言
AsyncVar
类是一个简单的类,它异步执行一个工作函数并存储其返回值。如果工作函数在需要值之前完成,则该值将保留在 AsyncVar
实例中。如果请求的值早于工作函数完成,主线程将加入工作函数的线程,阻塞直到值可用。
背景
开发人员一直在寻找让他们应用程序稍快一点的方法。即使花费几个小时的编程和调试来实现,将耗时操作缩短一两秒总是件好事。我最近在做一个项目,该项目需要四个耗时的数据操作;每个操作都返回一个值,并且我需要所有四个值才能继续处理。
起初,我将这四个操作设置为串行运行。程序花费了不到一分钟的时间来完成所有这四个操作。作为一个程序员,我认为这是不可接受的,于是我决定研究如何通过多线程来使我的程序运行得更快。在尝试了 ThreadPool
和 BackgroundWorker
组件之后,我希望能有一种更简单的方法来执行返回了进一步处理所需值的后台工作。因此,AsyncVar
应运而生。
构建 AsyncVar 类
为了开始构建 AsyncVar
类,我们需要先创建这个类。根据问题要求,我们可以看到 AsyncVar
类可能需要用于任何类型的数据;因此,我们需要使用泛型来创建它。
namespace AsyncVarArticle
{
using System;
using System.Threading;
public class AsyncVar<TResult>
{
Imports System
Imports System.Threading
Namespace AsyncVarArticle
Public Class AsyncVar(Of TResult)
TResult
泛型参数将确定用于提供值的函数的返回类型,以及存储在 AsyncVar
中的值的类型。
字段
private Func<TResult> workerFunction;
private Thread workerThread;
private TResult value;
private Exception error;
Private _workerFunction As Func(Of TResult)
Private _workerThread As Thread
Private _value As TResult
Private _error As Exception
第一个字段是一个委托引用;该委托将指向异步运行并为我们提供值的那个工作函数。
下一个字段是一个 Thread
对象。我们将使用此对象来运行异步操作,并利用其属性来确定线程的状态(这样我们就可以知道在请求值时是否需要加入线程)。
value
字段,类型为泛型 TResult
,将在工作函数完成后包含其返回值。
error
字段将包含在工作函数中发生的任何未处理异常。
构造函数和析构函数
public AsyncVar(Func<TResult> worker)
{
this.workerFunction = worker;
this.workerThread = new Thread(this.ThreadStart);
this.workerThread.Start();
}
~AsyncVar()
{
if (this.workerThread.IsAlive)
{
this.workerThread.Abort();
}
}
Public Sub New(ByVal worker As Func(Of TResult))
Me._workerFunction = worker
Me._workerThread = New Thread(AddressOf Me.ThreadStart)
Me._workerThread.Start()
End Sub
Protected Overrides Sub Finalize()
If Me._workerThread.IsAlive Then
Me._workerThread.Abort()
End If
End Sub
构造函数存储传入的工作函数,并实例化并启动工作线程。工作线程执行类的 ThreadStart
函数,这是我们接下来要添加到类中的成员。析构函数只是在线程仍在运行时终止它。通常,当 AsyncVar
实例被析构时,线程不应该仍在运行,但这是可能的。如果实例超出范围或被垃圾回收,线程就没有继续运行的理由了。
ThreadStart
我们接下来需要添加到类中的成员是 ThreadStart
方法。Thread
成员将使用此方法进行初始化;它将调用工作函数并处理发生的任何错误。
private void ThreadStart()
{
TResult returnValue = default(TResult);
try
{
returnValue = this.workerFunction();
}
catch (Exception ex)
{
this.error = ex;
}
if (this.error == null)
{
this.value = returnValue;
}
}
Private Sub ThreadStart()
Dim returnValue As TResult
Try
returnValue = Me._workerFunction()
Catch ex As Exception
Me._error = ex
End Try
If Me._error Is Nothing Then
Me._value = returnValue
End If
End Sub
ThreadStart
方法很简单;在一个 try
-catch
块中,它执行工作函数。如果捕获到异常,则将其赋值给 error
字段。否则,工作函数的返回值将被赋值给 value
字段。
属性访问器
AsyncVar
类的最后两个成员是 value
和 error
字段的属性访问器。
public TResult Value
{
get
{
if (this.workerThread.IsAlive)
{
this.workerThread.Join();
}
if (this.error != null)
{
throw new InvalidOperationException("Thread encountered " +
"an exception during execution.", this.error);
}
else
{
return this.value;
}
}
}
Public ReadOnly Property Value() As TResult
Get
If Me._workerThread.IsAlive Then
Me._workerThread.Join()
End If
If Me._error IsNot Nothing Then
Throw New InvalidOperationException("Thread encountered " & _
"an exception during execution.", Me._error)
Else
Return Me._value
End If
End Get
End Property
访问器首先检查线程是否仍在执行;如果是,则当前线程将简单地阻塞,直到工作线程完成。一旦工作线程完成(或者如果它在访问器被调用之前就已经完成),它会检查工作线程中是否捕获了异常。如果有,那么当有人尝试访问值时,将抛出 InvalidOperationException
(类似于 BackgroundWorker
类)。如果没有错误,则直接返回该值。
public Exception Error
{
get
{
if (this.workerThread.IsAlive)
{
this.workerThread.Join();
}
return this.error;
}
}
}
}
Public ReadOnly Property [Error]() As Exception
Get
If Me._workerThread.IsAlive Then
Me._workerThread.Join()
End If
Return Me._error
End Get
End Property
End Class
End Namespace
Error
访问器也做同样的事情;如果线程仍在运行,它会等待其完成。然后,如果遇到了错误,它会返回异常。
完整的类
这是类的完整列表,包括 XML 文档。您可以将其直接复制并粘贴到代码文件中(如果需要,请更改命名空间),然后开始使用该类,而无需进行任何进一步的工作。
namespace AsyncVarArticle
{
using System;
using System.Threading;
/// <summary>
/// Represents a variable whose value
/// is determined by an asynchronous worker function.
/// </summary>
/// <typeparam name="TResult">The type
/// of value represented by the variable.</typeparam>
public class AsyncVar<TResult>
{
private Func<TResult> workerFunction;
private Thread workerThread;
private TResult value;
private Exception error;
/// <summary>
/// Initializes a new instance of the <see
/// cref="AsyncVar<TResult>"/> class.
/// </summary>
/// <param name="worker">The worker function
/// that returns the value to be held.</param>
public AsyncVar(Func<TResult> worker)
{
this.workerFunction = worker;
this.workerThread = new Thread(this.ThreadStart);
this.workerThread.Start();
}
/// <summary>
/// Finalizes an instance of the <see cref="AsyncVar<TResult>"/> class.
/// </summary>
~AsyncVar()
{
if (this.workerThread.IsAlive)
{
this.workerThread.Abort();
}
}
/// <summary>
/// The ThreadStart method used for the worker thread.
/// </summary>
private void ThreadStart()
{
TResult returnValue = default(TResult);
try
{
returnValue = this.workerFunction();
}
catch (Exception ex)
{
this.error = ex;
}
if (this.error == null)
{
this.value = returnValue;
}
}
/// <summary>
/// Gets the value returned by the worker function.
/// If the worker thread is still running, blocks until the thread is complete.
/// </summary>
/// <value>The value returned by the worker function.</value>
public TResult Value
{
get
{
if (this.workerThread.IsAlive)
{
this.workerThread.Join();
}
if (this.error != null)
{
throw new InvalidOperationException("Thread encountered " +
"an exception during execution.", this.error);
}
else
{
return this.value;
}
}
}
/// <summary>
/// Gets an exception thrown by the worker function. If the worker
/// thread is still running, blocks until the thread is complete.
/// If no unhandled exceptions occurred in the worker thread,
/// returns <see langword="null"/>.
/// </summary>
/// <value>An exception thrown by the worker function.</value>
public Exception Error
{
get
{
if (this.workerThread.IsAlive)
{
this.workerThread.Join();
}
return this.error;
}
}
}
}
Imports System
Imports System.Threading
Namespace AsyncVarArticle
''' <summary>
''' Represents a variable whose value
''' is determined by an asynchronous worker function.
''' </summary>
''' <typeparam name="TResult">The type
''' of value represented by the variable.</typeparam>
Public Class AsyncVar(Of TResult)
Private _workerFunction As Func(Of TResult)
Private _workerThread As Thread
Private _value As TResult
Private _error As Exception
''' <summary>
''' Initializes a new instance of the <see cref="AsyncVar(Of TResult)"/> class.
''' </summary>
''' <param name="worker">The worker function
''' that returns the value to be held.</param>
Public Sub New(ByVal worker As Func(Of TResult))
Me._workerFunction = worker
Me._workerThread = New Thread(AddressOf Me.ThreadStart)
Me._workerThread.Start()
End Sub
''' <summary>
''' Finalizes an instance of the <see cref="AsyncVar(Of TResult);"> class.
''' </summary>
Protected Overrides Sub Finalize()
If Me._workerThread.IsAlive Then
Me._workerThread.Abort()
End If
End Sub
''' <summary>
''' The ThreadStart method used for the worker thread.
''' </summary>
Private Sub ThreadStart()
Dim returnValue As TResult
Try
returnValue = Me._workerFunction()
Catch ex As Exception
Me._error = ex
End Try
If Me._error Is Nothing Then
Me._value = returnValue
End If
End Sub
''' <summary>
''' Gets the value returned by the worker function.
''' If the worker thread is still running, blocks until the thread is complete.
''' </summary>
''' <value>The value returned by the worker function.</value>
Public ReadOnly Property Value() As TResult
Get
If Me._workerThread.IsAlive Then
Me._workerThread.Join()
End If
If Me._error IsNot Nothing Then
Throw New InvalidOperationException("Thread encountered" & _
" an exception during execution.", Me._error)
Else
Return Me._value
End If
End Get
End Property
''' <summary>
''' Gets an exception thrown by the worker function.
''' If the worker thread is still running, blocks until
''' the thread is complete. If no unhandled exceptions
''' occurred in the worker thread, returns <see langword="Nothing"/>.
''' </summary>
''' <value>An exception thrown by the worker function.</value>
Public ReadOnly Property [Error]() As Exception
Get
If Me._workerThread.IsAlive Then
Me._workerThread.Join()
End If
Return Me._error
End Get
End Property
End Class
End Namespace
使用 AsyncVar 类
AsyncVar
类被设计成非常易于使用。要异步运行一个操作,只需使用工作函数作为唯一的构造函数参数来构造一个新的 AsyncVar
实例。一旦你构造了 AsyncVar
,操作就会在其自己的线程上开始。
private int SomeOperation()
{
System.Threading.Thread.Sleep(1000);
return 1;
}
private int SomeOtherOperation()
{
System.Threading.Thread.Sleep(3000);
return 3;
}
public void DoLotsOfWork()
{
// run an asynchronous operation using a predefined function
AsyncVar<int> asyncVar1 = new AsyncVar<int>(SomeOperation);
// run an asynchronous operation using an anonymous method
AsyncVar<int> asyncVar2 = new AsyncVar<int>(delegate
{
System.Threading.Thread.Sleep(2000);
return 2;
});
// run an asynchronous operation using a lambda function
AsyncVar<int> asyncVar3 = new AsyncVar<int>(() => SomeOtherOperation());
Console.WriteLine("Return values: {0}, {1}, {2}",
asyncVar1.Value, asyncVar2.Value, asyncVar3.Value);
}
Private Function SomeOperation() As Integer
System.Threading.Thread.Sleep(1000)
Return 1
End Function
Private Function SomeOtherOperation() As Integer
System.Threading.Thread.Sleep(3000)
Return 3
End Function
Public Sub DoLotsOfWork()
' run an asynchronous operation using a predefined function
Dim asyncVar1 As New AsyncVar(Of Integer)(AddressOf SomeOperation)
' run an asynchronous operation using an anonymous method
' (VB.NET does not have syntax for this, use one of the other methods)
' run an asynchronous operation using a lambda function
Dim asyncVar3 As New AsyncVar(Of Integer)(Function() SomeOtherOperation())
Console.WriteLine("Return values: {0}, {1}, {2}", _
asyncVar1.Value, asyncVar2.Value, asyncVar3.Value)
End Sub
如果使用常规的同步调用运行此代码,则需要六秒钟才能完成(第一个函数耗时 1000ms,第二个 2000ms,第三个 3000ms),但使用 AsyncVar
,则只需三秒钟即可完成(最长操作的时间)。当然,Thread.Sleep
不会占用 CPU 时间或资源,因此您的异步操作可能比同步运行时稍慢一些,因为它们需要与其他线程共享时间和资源。
另外,请记住,每个 AsyncVar
实例都有自己的线程;请谨慎使用 AsyncVar
;同时运行数千个 AsyncVar
可能不是一个好主意。