65.9K
CodeProject 正在变化。 阅读更多。
Home

利用线程加速批量处理

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.81/5 (11投票s)

2007年12月13日

CPOL

9分钟阅读

viewsIcon

65236

downloadIcon

936

通过将批量处理任务分解为更小的任务并在并发线程中执行来加快处理速度

引言

我经常需要处理大量(例如,数十万甚至数百万)重复的数据,比如数据库表中的记录或对象集合,应用一些业务转换,然后保存它们(例如,写回数据库、生成报告等)。

在一个简单的数据库示例中,你可以只写一个 SQL 语句,根据特定的选择标准(由 WHERE 子句指定)来 UPDATE 受影响的记录。然而,这是一种非常有限的做法,如果你遇到复杂情况,可能会使用游标,那又是另一个棘手的问题。所以,如果你开始在代码中执行更新,你就必须获取一个 DataReader,逐条读取记录,应用你的业务逻辑,然后重写结果。出于各种原因,这非常耗时。

一个与数据库无关的例子可能是一个包含数百万个需要以某种方式操作的对象的集合。你无疑会使用一个 foreach() 循环,逐个处理对象,但这同样是对 CPU 资源的低效利用。你可以做得更好。

本文描述了一种非常高效且可扩展的处理大量数据的方法。它使用多个并发线程来处理小批量的工作,并通过创建更多线程(取决于核心数量)来利用多核 CPU。

附带的演示创建了 5,000,000 个客户对象,这些对象包含 ID 和姓名等基本信息,为每个对象更新折扣百分比,并将客户姓名反转 50 次(只是为了让我们有事可做)。它在一台四核机器上进行了基准测试,并产生了以下处理时间:

  • 单线程执行(即,始终只运行一个 CPU):5 分钟 20 秒。
  • 多线程执行(即,每个 CPU 运行两个线程,总共八个并发线程):2 分钟 40 秒。

您来选择赢家。

传统方式批处理

如引言所述,你可以通过使用 foreach() 循环一次处理一个对象的批次(或记录),并在循环中逐个处理对象。这种方法的优点之一是代码更容易理解,而且你无需处理多线程。话虽如此,如果代码得到良好的文档(应该是!),那么代码是可以完全理解的,而且如果你正在处理大量数据,你应该已经熟悉多线程并应该已经在使用它了。如果你还没有,那就开始吧。

一个简单的单批处理示例

class CustomerMaintenance
{
    // a collection of Customer objects that we need to process
    List<Customer> mCustomerList;
    // Constructor. User passes in the collection of Customer
    // objects to process.
    public CustomerMaintenance (List<Customer> customerList)
    {
        mCustomerList = customerList;
    }
    // Update method applies all the business transformations
    // to the Customer objects, one at a time.
    public void Update()
    {
        foreach (Customer customerToUpdate in mCustomerList)
        {
            customerToUpdate.Discount *= 1.10;
        }
    }
}

在此示例中,一个 Customer 对象列表被传递给一个 CustomerMaintenance 类。

然后执行 Update() 方法,它只是对列表中的每个对象应用一些更改。这是一个非常简单的例子,因为 foreach() 循环可能包含更复杂的代码。正是这些更新代码会在你做复杂事情时减慢你的速度。

使用并发线程进行批处理

上述示例最大的缺点是它没有利用多核 CPU 的优势,而且肯定不可扩展。如果你比较单核和多核机器上运行此代码的执行时间,你可能会在后者上发现更快的速度,但差异不会很大。此外,这种差异很可能是由于 CPU 速度更快、RAM 更大或其他类似因素造成的。你最大的武器——额外的 CPU——将保持未使用和未被利用。

所以,方法如下。将数据分散到并发线程中处理涉及几个步骤:

第一步 - 为每个对象分配键值

我们首先需要能够通过键值来引用每个对象。当我们启动线程工作者时,我们不会将包含所有要处理对象的批次传递给它们。相反,所有这些对象将保留在一个所有线程工作者都可以访问的集合中,并且每个工作者将处理此列表中的不同对象,通过键来访问它们。

与数据库记录相比,生成这些键对于处理对象来说略有不同,但并不难。如果你处理的是对象列表,你可以选择一个在所有对象中都是唯一的属性(例如客户 ID),或者如果你没有这样的属性,你只需使用一个递增计数器(不一定需要是对象的属性)。

要处理对象,只需声明一个 SortedList<key, object>,其中 key 是那个唯一属性,object 是你将要处理的对象类型。用所有键和值填充 SortedList<key, object>,你就准备好了。这是所有工作线程将要更新的共享数据集合。接下来,创建一个 List<key>,其中包含该共享集合的所有键,这就成为所有线程接收其工作批次的池。

例如

// Shared collection, containing all the objects to update
SortedList<long, Customer> mCustomerList mCustomerList = new SortedList<long, Customer>();
// code to populate the dictionary with the Customer objects goes here
// Create a list of keys containing all key values to the shared
// collection. This becomes a sort of index.
List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);

但是,如果你处理的是数据库记录,你需要在 SELECT 语句中使用一个键字段。假设你要对客户表中的所有客户记录执行 UPDATE,你应该首先发出一个 SELECT 语句,检索所有将受影响的客户 ID,然后将这些 ID 存储在一个 List<key> 中。就像上面的对象示例一样,这个列表然后成为所有线程接收其工作批次的池。不过,与之前的例子不同的是,单个线程将直接在数据库中更新记录,这相当于上面使用的共享数据集合(即 mCustomers)。

第二步 - 准备一个信号量

信号量(你需要为此引用 System.Threading)是一个非常简单但至关重要的元素。它将控制我们当前有多少个线程工作者正在运行,当一个线程完成其工作并退出时,信号量会通知我们,我们可以启动另一个工作者。信号量具有很高的可配置性,你可以轻松指定它可以处理多少个请求。

例如

// This will create a semaphore that helps control as many thread launches as we need to.
Semaphore mSemaphore = new Semaphore(numberOfThreadsToUse, numberOfThreadsToUse)

第三步 - 遍历键列表并分批调度工作

现在是激动人心的部分。我们循环,直到第一步创建的键列表包含数据,然后执行以下操作:

  • 等待信号量有可用资源。
  • 在信号量中预留一个资源。
  • 从键列表中复制预定数量的项到工作列表中,该列表将传递给线程工作者。
  • 从键列表中移除这些相同的键,以免它们被重复调度处理。
  • 启动线程工作者,并将工作列表传递给它。线程将应用业务规则并修改共享主列表中由传递给它的键值索引的对象。
  • 当线程工作者完成时,它会释放信号量资源(从而启用另一个线程工作者的启动)并退出。

例如

private void UpdateAllCustomersInConcurrentBatches()
{
    // retrieve the number of CPUs on this machine, and calculate the total number
    // of threads we should run.
    ManagementObjectSearcher managementObjectSearcher = 
        new ManagementObjectSearcher("select * from Win32_Processor");
    ManagementObjectCollection managementObjectCollection = 
        managementObjectSearcher.Get();
    int numberOfCpus = managementObjectCollection.Count;
    int numberOfThreadsToUse = numberOfCpus * mMaxNumberOfThreadsPerCpu;
    int batchSize = 5000;
    
     // get a list of all the key values to process
    List<long> allCustomerIDs = new List<long>(mCustomerList.Keys);
    while (allCustomerIDs.Count > 0)
    { 
        // make of list of customer IDs to process in the next batch
        List<long> customerIDsToProcess = allCustomerIDs.GetRange(0, 
        System.Math.Min(batchSize, allCustomerIDs.Count));
        // remove those customer IDs from the master list so they are not processed again
        allCustomerIDs.RemoveRange(0, System.Math.Min(batchSize, allCustomerIDs.Count));
    
        // wait for the semaphore to let us launch another thread
        mSemaphore.WaitOne();
    
        // launch a thread worker and give it the list of customer IDs to process
        ThreadPool.QueueUserWorkItem(new WaitCallback(UpdateAllCustomersInSubBatch), 
            customerIDsToProcess);
    }
    
    // ensure all threads have exited by waiting until we can get all 
    // the semaphore requests
    for (int ctr = 0; ctr < numberOfThreadsToUse; ctr++)
    {
       mSemaphore.WaitOne();
    }
    mSemaphore.Release(numberOfThreadsToUse);
}

第四步 - 在线程工作者中处理一批记录

处理记录的方法将在第三步中启动,并将接收要处理的键列表。然后,它将使用一个 foreach() 循环遍历这些键,并使用循环中的每个键,访问共享集合中的一个 Customer 对象,并对其应用适当的业务规则和更改。

同样,如果你处理的是数据库记录,你将使用这个键值发出一个 SELECT 语句来获取一条记录,获取它,更新它,然后写回(或者可能只是发出一个 UPDATE 语句)。

例如

private void UpdateAllCustomersInSubBatch(object state)
{
    try
    {
        List<long> customerIDsToProcess = state as List<long>;
        foreach (long customerID in customerIDsToProcess)
        {
            Customer tempCustomer = mCustomerList[customerID];
            // a foreach item cannot be passed down by reference, so pass 
            // a copy.
            ApplyBusinessRulesToCustomerObject(ref tempCustomer);
            Lock (mLock)
            {
                mCustomerList[customerID].Discount = tempCustomer.Discount;
                mCustomerList[customerID].Name = tempCustomer.Name;
            }
        }
    }
    catch (Exception ex)
    {
        lock (mLock)
        {
            // An exception was raised. This thread has no access to the UI, 
            // so store the exception in
            // mExceptions and get out.
            mExceptions.Add(ex);
        }
    }
    finally
    {
        // The work in this thread is complete. Release the semaphore request 
        // so that it can be reused to 
        // launch another thread worker for the next batch.
        mSemaphore.Release();
    }
}

关于异常的说明

关于线程需要注意的一个重要事项是,你必须小心处理异常。在单线程应用程序中,异常会向上追溯到执行路径中的所有对象,因此你可以在任何点捕获和处理它们。

然而,在多线程应用程序中,异常只会向上追溯到该线程中执行的第一个方法。如果你没有 try/catch 块在那个点之前(或在其上)处理异常,你就会收到一个“未处理的异常”错误,并且当异常抛出时,应用程序会停止。

为了解决这个问题,演示使用了一个 List<Exception> 集合来存储线程执行期间抛出的所有异常。当我们完成遍历对象键的主列表后,我们可以检查这个集合的内容,如果存在任何异常,它们就可以被发送回窗体、记录下来等。此外,我们可以修改主调度 foreach() 循环(在第三步中描述),如果异常集合的计数大于零,则停止调度新的工作批次。

关于线程安全和锁定的说明

同样重要的是要注意,由于多个线程将访问和修改相同的对象(例如,共享对象列表和异常集合),我们需要防止数据损坏,并确保一次只有一个线程可以访问共享资源。为此,我们使用 lock() 命令,该命令基本上会阻止所有其他尝试访问 enclosed 代码的请求,直到锁退出。这确保了没有两个线程会同时执行同一段代码。

结论

将大量的重复工作分解成小批量并在并行处理它们,对于处理大量数据来说是必不可少的。我在引言中展示的基准测试表明,你可以轻松节省 50% 的执行时间,尽管节省的幅度在很大程度上取决于你需要对数据进行转换的具体类型以及代码的实现方式。另外,请记住,如果你在具有更多 CPU 的机器上运行你的应用程序,节省的比例将继续增加。

这是一个不可忽视的时间节省器,我希望它能帮助到一些人。如果它对你有帮助,请投赞成票。

历史

  • 2007年12月13日 - 初始帖子
© . All rights reserved.