65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 CUDA 8.0 运行时 API 实现并行可扩展分布计数算法 (DCA)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (23投票s)

2016 年 12 月 10 日

CPOL

24分钟阅读

viewsIcon

14379

downloadIcon

409

在本文中,我们将演示一种方法,通过使用 NVIDIA CUDA 8.0 运行时 API,可以将实现传统分布计数算法 (DCA) 的代码性能提高(高达 600%)

“CUDA® 是 NVIDIA 发明的一种并行计算平台和编程模型。它通过利用图形处理单元 (GPU) 的强大功能,显著提高了计算性能。”加州大学戴维斯分校 John Owens 博士和 NVIDIA 公司 David Luebke 博士

引言

在本文中,我们将演示如何使用 NVIDIA CUDA® 多线程硬件平台和编程模型开发一种高效的并行代码,该代码实现了著名的分布计数算法,该算法专门设计用于解决复杂的统计问题,即查找给定数据集中每个项目的总体重复项数量,该数据集通常规模庞大,包含多达 10^6 - 10^9 个数据项。自最初提出以来,以下算法仍然是解决各种计算问题的唯一方法,例如收集调查统计数据、对无序数据集中一系列项目执行统计中位数计算(其值又经常用于降低单色栅格图像图像处理过程中的噪声水平),以及在聚类分析中增加 k-means 聚类算法产生的聚类之间的距离,这本身提供了更好的聚类质量等。

实际上,传统的分布计数算法主要基于执行线性搜索来查找给定数据集中特定项目的所有重复项。在这种情况下,对 N 个数据项中的每个数据项都执行线性搜索,这显著增加了所执行计算的复杂性。在正常条件下,分布计数算法的复杂性可以估计为 O(Nx(N+k)) >= O(N^2) 的琐碎比较,其中 N - 给定数据集中的数据项数量,k - 结果数据集中包含给定数据集中每个数据项出现频率的项目数量。通常,我们将 k 值视为此特定算法的开销值。显然,以下值表示结果频率表的潜在大小,而该大小又在很大程度上取决于给定数据集中最小和最大项的值。

通过分析所讨论的问题,我们可能会认为我们通常可以通过使用现有的排序算法之一来执行上述类似任务,从而避免实现分布计数算法。如果我们更深入地研究这个问题,我们会注意到,除了计数排序算法之外,没有一个现有的排序算法可以用于执行分布计数,而计数排序算法从一个方面来看被认为不切实际,但从另一方面来看,可以有效地用于解决所讨论的问题。

正如我们上面已经讨论的,传统分布计数算法最明显的缺点之一是当使用以下算法处理上述通常巨大的数据集时,查找数据项重复项本身的性能。从下面显示的表 1 中,我们可以看到对每个特定数据项执行的线性搜索,以及开销处理,如何影响在大小从 10^6 到 10^9 个数据项的数据集中顺序执行分布计数的代码的整体性能

表 1:顺序分布计数算法的计算复杂性

数据集

N - 项目

频率表

k-项目

计算复杂性 p = O(Nx(N+k))

顺序代码

执行时间

(最佳情况性能)

1000000

10^6

2 x 10^12

0h:12m:38s

10000000

10^7

2 x 10^14

2h:11m:12s

100000000

10^8

2 x 10^16

22h:05m:12s

1000000000

10^9

2 x 10^18

232h:34m:23s

指示分布计数算法常规性能的经验统计数据是通过在基于现代 CPU Intel Core i7-4790 4.0 GHZ 和 32GB RAM 的单个计算单元上运行特定顺序代码估算的。

通过处理通常巨大的数据集,以下算法能够以不少于 728 秒 ~ 12 分 38 秒的时间生成数据项出现频率的结果数据集,同时处理大小约为或大于 10^6 个数据项的数据集。此外,在给定数据集的总体大小增加的情况下,假设它变大 10 倍,那么顺序执行代码的性能会显著下降,并且与游戏、深度学习 AI 神经网络、处理现代硬盘中大量集群的 HDD 工具和实用程序以及其他复杂数据处理任务等其他复杂数据处理任务相比,其持续时间非常长。例如,如果您运行以下顺序代码来查找 10^8 - 10^9 个数据项的重复项,它能够以几乎空闲的时间完成以下处理任务。

这反过来又意味着,包含通常 10^8-10^9 个数据项的巨大数据集的处理问题,如果不使用特定的多线程库和框架,就无法解决,这些库和框架可以利用并行计算的强大功能,例如大多数现有现代多核 CPU 支持的硬件级并行性,以及 NVIDIA CUDA® 等独立的硬件平台和编程模型,它们允许使用大多数支持 CUDA 的 GPU 资源来解决各种复杂的计算问题,特别是提高用于处理通常巨大数据集的顺序代码性能的问题。

自 NVIDIA 公司首次发明并推出 CUDA® 技术以来,作为一种多线程并行化框架库和工具,它被实现为 C/C++、Fortran、Java、Python 等多种编程语言的并行计算 GPU 硬件平台和编程模型。与其他多线程框架不同,NVIDIA CUDA® 技术提供了执行复杂计算的能力,揭示了大多数支持 CUDA 的 GPU 的强大功能。它作为传统基于 CPU 计算的替代方案,将普通的平均 PC 变成一台功能强大的超级计算机,利用 NVIDIA 显卡 GPU 和内存几乎无限的资源来执行复杂的计算。使用 NVIDIA CUDA® 为那些在处理巨量数据时对 CPU 和系统内存产生巨大工作负载的应用程序提供了显著的性能加速增益。

在接下来的文章中,我们将讨论 NVIDIA CUDA® 可扩展编程模型架构作为执行并行多线程计算的高效平台,同时,详细解释如何将实现 NP 难、可扩展性差的传统分布计数算法的顺序执行代码转换为由 GPU 流式多处理器 (SM) 阵列执行的,由发散的硬件级线程组成的 warp 并行运行。

以下,通常可以通过将所讨论的计算过程的顺序循环转换为由大量线程并行执行的 CUDA 内核来实现。CUDA 内核通常从主机代码启动,而每个内核的代码片段都在设备上执行。在以下文章的开头,我们将讨论主机或设备代码的单独执行主题,这是 CUDA 编程模型最基本的方面之一。

在本文中,我们还将仔细研究如何正确地将查找数据项重复项的整个过程划分为多个独立的子过程,这些子过程能够并行执行以下任务。为此,我们将使用各种方法,例如 CUDA 的批量或动态并行概念,以利用 CUDA® 多核处理器硬件平台有效地并行化由分布计数算法公式化的数据处理。

特别是,我们将讨论动态并行性的不同场景,例如线性或递归并行线程执行,以及如何有效地在多个并行 GPU 硬件线程之间共享由一个或多个嵌套顺序循环执行的查找数据项重复项的整个过程,这反过来又可以显著降低旨在解决所讨论问题的算法的计算复杂性。

在讨论过程中,我们还将重点强调代码现代化的一些重要方面,例如访问存储在 GPU 设备共享内存中的发散线程块的屏障同步,以及允许克服许多严重的已知问题和副作用(例如数据流依赖和竞争条件)的特定归约机制,这些问题和副作用在许多情况下阻碍了代码现代化过程,使得许多现有算法无法并行化。

此外,在代码现代化过程中,我们将学习如何使用原子数据操作,例如原子加法、增量和比较-交换 (CAS),以克服并行化代码中竞争条件的发生。此外,我们将解释如何实现固定内存,使用固定内存可以有效地避免竞争条件的发生,在以下问题无法通过使用屏障同步或原子操作来克服的情况下。

既然我们已经找出了如何将实现分布计数算法的代码的顺序循环转换为由包含大量线程的块执行的内核,我们将学习如何通过使用 CUDA 流机制优化在主机上异步启动的这些内核的性能。特别是,我们将讨论如何在主机或设备代码中启动内核,每个内核都在自己的流中由特定的流式多处理器执行,从而使顺序内核调用同步。同时,我们将详细描述诸如隐式或显式流同步等重要方面,并解释传统默认流的隐式同步与通过使用特定 CUDA 运行时 API 调用创建的流的显式同步之间的区别。在讨论过程中,我们将讨论特定于事件的机制,这是同步流的唯一方法。

此外,除了上述并行化技术的讨论,本文还将概述使用 CUDA 编程模型的一些重要方面,例如主机或设备内存分配和管理,例如在并行代码执行期间使用 CUDA 运行时 API 例程在主机和设备内存之间进行特定数据传输。

除了 CUDA 流和内存管理主题,在本文中,我们将学习如何使用 CUDA 运行时 API 的事件管理函数来估算代码执行时间。

背景

在我们开始之前,在本节中,我们将解释用于查找给定数据集中每个数据项的重复项并将获得的每个项的统计数据存储到频率数组中的传统分布计数算法。在本节中,我们将彻底讨论以下算法执行的操作,并提供有关如何通过在实现分布计数算法的顺序代码中查找优化热点来优化性能的详细建议。

分布计数算法概述

分布计数算法 (DCA) – 解决查找给定数据集中每个数据项重复项的统计问题最简单的算法之一。正如我们已经讨论的,以下算法通常用于许多应用程序,例如计算统计中位数、收集调查统计数据,作为各种分区和 AI 算法的一部分,例如旨在解决分类问题的 k-means 聚类算法等。罗伯特·塞奇威克在他的《C 语言基本算法第 1 部分》一书中详细讨论了分布计数算法。

该算法可以表述如下:假设我们有一个无序数据集 data,包含 N 个特定数据类型的随机生成数据项。为了找到给定数据集中每个数据项的重复项总数,我们通常需要将循环计数器变量 index 从 0 递增到 N,并对每个数据项 data[index] 在后续数据项的子集中执行线性搜索,以找到与当前项 data[index] 具有相同值的项的数量。为此,我们需要声明变量 count = 1,我们将在此变量中累积每次项 data[index] 出现次数的值,作为执行下面讨论的线性搜索的结果。为了执行线性搜索,我们需要使用另一个循环计数器变量 nindex,将其值从 index + 1 递增到 N,并对每个项 data[nindex] 执行当前项的值与 data[index] 项的值的比较。如果两个值相等,则我们将变量 count 的值递增 1,否则继续与下一个项 data[nindex] 进行比较。最后,在线性搜索执行结束时,变量 count 将保存项 data[index] 在数组 data 中的实际出现次数。count 变量的值是当前项 data[index] 的总出现频率值。

例如,我们有一个值数组 data[N] = { 3, 1, 6, 2, 1, 3, 6 },其中包含 N 个整数类型的数据项,我们需要找到索引 = 0 的第一个数据项的出现频率,其值为 3。此时,变量 count 被赋值为 1,我们通常开始将每个索引 nindex 从 1 到 N 的项 data[nindex] 与以下项 data[index]=data[0]=3 的值进行比较。在比较过程中,我们确定索引 nindex = 5 的项等于索引 index = 0 的当前数据项。在这种情况下,我们将 count 变量的值递增,使其值变为 count = count + 1 = 2。我们迭代到数组末尾,发现没有更多项的值等于当前项的值 data[index] = 3。因此,变量 count = 2 的值是给定数据集中第一个数据项 data[0] = 3 的出现频率值。类似地,通过迭代项数组,我们继续处理数组 data 中的下一个项。

由于我们已经获得了数组中当前项 data[index] 的出现频率值,我们需要将以下值添加到结果频率集中。请注意,结果频率表中的每个条目都是两个值对,即数据项 data[index] 的值和总出现次数 count。数据项频率的结果集表示为下面代码片段中定义的 FREQ_T 结构对象数组。

在将当前条目追加到用于存储每个数据项频率值的关联数组之前,我们需要检查当前项 data[index] 的出现次数是否未曾计算。为此,我们需要声明一个布尔变量 exists 并将其初始化为 false。之后,我们应该将循环计数器变量 item 从 ft_item - 1 递减到 0,直到 exists 变量的值不等于 true,或者已经达到结果频率数组的末尾。通过迭代结果频率数组,对于 freq_t[item] 的每个节点,我们获取 freq_t[item].n_val 变量的值并将其与当前项 data[index] 的值进行比较。如果当前节点 item 的 freq_t[item].n_val 变量的值等于 data[index] 的值,则我们将 exists = true,这意味着当前项 data[index] 的出现次数已经计算,并且不需要将当前项 data[index] 的频率数据追加到结果集中。否则,如果所有节点都没有 freq_t[item].n_val 变量的值等于当前项的值 data[index],则我们将索引为 ft_item 的节点的 freq_t[item].n_val 和 freq_t[ft_item].n_freq 变量分别赋值为 data[index] 和 count 的值。最后,我们将变量 ft_item 的值递增 1,然后处理数组 data 中的下一个项。

下面的代码片段演示了分布计数算法最基本的实现

typedef struct tagFreqT

{
	__int64 n_val;  
        __int64 n_freq;
} FREQ_T;


void dist_count(const __int64* data, FREQ_T* freq_t, const size_t N)
{
    __int64 ft_item = 0;
    // Iterating through the array of data items data[]
    for (__int64 index = 0; index < N; index++) // { 1 }  <--- Performance optimization hotspot
    {
        __int64 count = 1;
        // Performing the linear search to find the actual amount of
        // duplicates for the current item data[i]
        // { 2 } <--- Performance optimization hotspot
        for (__int64 nindex = index + 1; nindex < N; nindex++) 
             if (data[nindex] == data[index]) count++;

        bool exists = false;
        // Performing the linear search to determine if the frequency value for the
        // current item data[i] has been already stored into the frequency table.
        for (__int64 item = ft_item - 1; item >= 0 && !exists; item--) // { 3 }
             if (freq_t[item].n_val == data[index]) exists = true;     

        // If not, append the frequency value of the current item data[i] to the frequency table
        if (exists == false)

        {
            freq_t[ft_item].n_val = data[index];
            freq_t[ft_item++].n_freq = count;
        }
    }
}

算法的多线程并行性能优化

正如您可能从上面的代码片段中注意到的,根据所讨论的算法,查找每个数据项重复项数量的整个过程主要依赖于执行两个嵌套循环。第一个最外层循环 { 1 } 通常用于迭代数据项数组。另一个嵌套循环基本上执行线性搜索,以查找后续数据项子集中每个当前项的所有重复项,并将获得的出现次数累积到变量 count 中。

现在,让我们仔细看看嵌套循环 { 2 } 的性能。以下循环在执行时,精确地执行 p = O(N) 次迭代,以通过遍历数据集执行线性搜索来查找每个当前项 data[index] 的重复项数量。此外,在执行线性搜索以获取数据项出现次数的末尾,我们通常执行另一个循环来迭代数组 freq_t,以确定当前数据没有条目,并且我们之前尚未对当前项 data[index] 执行分布计数。这反过来又对查找数据项重复项的过程造成了足够的开销,因为以下循环精确地执行 次迭代,以检查当前项是否尚未索引到频率表 freq_t 中。这实际上就是为什么实现线性搜索的此代码片段的总复杂性可以估计为 p = O(N+k) 次迭代。

根据分布计数算法,在最外层顺序循环的每次迭代中,对从给定数据集中检索到的 N 个项目中的每一个项目重复执行上述线性搜索。因此,对数据集中每个项目 data[index] 执行的线性搜索的复杂度值乘以 N(即最外层循环的迭代次数),可以估计为,这通常会超过大多数排序和其他执行复杂计算的算法的平方计算复杂度 O(N^2)。

显然,在这种情况下,执行线性搜索的嵌套循环成为了并行多线程优化的热点。根据最佳实践,如果我们将嵌套循环的迭代并行执行,由多个线程执行,以便每次迭代都在其自己的线程中执行,那么实现分布计数算法的代码的性能加速增益会高得多。这反过来将确保所讨论的代码比顺序执行的代码执行得快得多。嵌套循环的并行执行提供了整个算法高达 65% 的性能加速增益。

最外层循环是我们接下来要讨论的另一个性能优化热点。与其他算法不同,特别是那些执行排序的算法,由分布计数算法制定的查找给定数据集中每个项目的重复项的过程几乎没有数据流依赖问题。这实际上意味着在最外层循环的每次迭代中执行线性搜索的代码通常不会访问作为执行前一次和下一次迭代的结果而处理的数据。在这种特殊情况下,以下代码能够独立地执行线性搜索以查找给定数据集中每个项目的重复项数量。通过在最外层循环的每次迭代中执行线性搜索计算出的实际重复项计数不依赖于给定数据集中其他类似项目获得的计数。这反过来又有利于所讨论的整个算法的正确高效并行化。

然而,这段代码中还有另一个嵌套循环 { 3 },它执行线性搜索,以确定当前项 data[index] 的实际重复次数是否已索引到表示项频率表的数组中。不幸的是,由于在以下循环执行期间可能发生的数据流依赖性,以下嵌套循环无法完美并行化。在这种情况下,为了确保以下代码正确执行,我们需要实现特定的线程同步机制,例如临界区锁,以避免可能存在的数据流依赖性问题。

使用 Intel OpenMP 性能库优化的代码

这是另一段使用 Intel OpenMP 库实现并行可扩展分布计数算法的代码片段。在对上面列出的代码进行并行多线程优化时,我们精确地检查了以下部分前几段中讨论的优化提示

void omp_dist_count(const __int64* data, FREQ_T* freq_t, const size_t N, const size_t FT_SIZE)

{
        __int64 ft_item = 0; omp_lock_t lock;
        int max_threads = omp_get_max_threads();

        omp_init_lock(&lock);

        // Worksharing the iterations of the outermost loop accross multiple
        // sections executed in parallel
        #pragma omp parallel default(none) \ // { 1 }
                shared(data, freq_t, max_threads, ft_item, lock) num_threads(max_threads)
        {
                // Obtaing the thread-id value of the current thread being executed
                int thread_id = omp_get_thread_num();

                // Computing the start and end position of the current chunk of the array data
                __int64 start = thread_id * (N / max_threads);
                __int64 end = (thread_id + 1) * (N / max_threads);

                // Performing iteration through the data items within the current array chunk
                for (int index = start; index < end; index++)
                {
                        __int64 count = 0;

                        // Performing the tight loop parallelization of the nested loop that performs
                        // the linear search to find the duplicates of the current data data[index]
                        // Reducing of the incremented value of the current item's duplicates count to
                        // avoid the race condition issue occurrence.
                        #pragma omp parallel for default(none) \ // { 2 }
                                reduction(+:count) shared(data, index)

                        for (__int64 nindex = 0; nindex < N; nindex++)
                                if (data[nindex] == data[index]) count++;

                        // Explicitly synchronizing the fragment of code that performs the check to
                        // avoid duplicates of those items for which the actual counting has already
                        // been performed and the number of duplicates have been indexed into the
                        // frequency table

                        omp_set_lock(&lock);

                        bool exists = false;
                        for (__int64 item = FT_SIZE - 1; item >= 0 && !exists; item--)
                                if (freq_t[item].n_val == data[index]) exists = true;

                        if (exists == false)
                        {
                                // Appending the value of the actual count of duplicates
                                // for the current item data[i] to the frequency table

                                freq_t[ft_item].n_freq = count;
                                freq_t[ft_item++].n_val = data[index];
                        }

                        omp_unset_lock(&lock);
                }
        }

        omp_destroy_lock(&lock);
}

从上面列出的代码片段中我们可以看到,应用于算法顺序代码的第一个优化是,我们对用于迭代数据项数组的最外层循环 { 1 } 进行了工作共享,通过将迭代总数拆分由多个 CPU 硬件级线程执行,每个线程执行自己定义的并行区域的副本。这反过来又允许通过在两个或更多 CPU 核心上扩展所执行操作的整个范围来显著减轻 CPU 负载。

在通过使用并行工作共享构造转换了最外层循环之后,我们必须为嵌套循环 { 2 } 提供紧密的循环并行化。为此,在这种情况下,我们使用 #pragma omp parallel for 指令构造来并行执行嵌套循环的每次迭代,这在并行代码执行期间提供了实际的加速。

请注意,在这种情况下,我们正在并行区域内实现紧密循环并行化,该区域由多个线程执行。这通常被称为 OpenMP 的嵌套并行。使用嵌套并行策略的主要目标是为正在优化的代码提供更好的性能加速增益。

我们将讨论的另一个优化方面是用于避免嵌套循环 { 2 } 并行执行期间竞争条件问题的归约机制。正如我们从上面列出的代码片段中看到的,在由其自己的线程执行的嵌套循环 { 2 } 的每次迭代中,我们正在检查数据项 data[nindex] 的值是否等于我们正在估计重复次数的当前项 data[index] 的值。如果是,我们正在执行共享变量 count 的增量。在这种情况下,为了获得以下变量的正确值,我们将通过使用 reduction(+:count) 子句对增量运算符执行显式归约。

然而,上面列出的一些代码片段,包括嵌套循环 { 3 },仍然没有并行化,因为尝试并行运行以下片段通常会导致数据流依赖问题的发生。由于以下代码片段旨在由每个线程顺序运行,因此,我们正在使用显式同步机制,例如通过调用 OpenMP API 例程 omp_set_lock(&lock)、omp_set_unlock(&lock) 来锁定临界区,以确保以下代码片段由每个线程顺序执行。

上面列出的 OpenMP 并行代码的主要缺点是,在嵌套循环 { 3 } 执行期间出现数据流依赖问题通常会导致以下代码无法完全扩展到所有可用的 CPU 核心。为此,我们使用 Intel Amplifier XE 来衡量和评估使用 OpenMP 库并行化的代码的整体性能。图 1 说明了执行分布计数的并行代码如何在多个 CPU 核心上进行扩展。

图 1:使用 Intel Amplifier XE 进行并行代码性能评估

从图 1 中可以看出,几乎一半的线程利用率低下,原因是性能潜在增益和 CPU 负载不平衡的不断增长,这是由于上述实现的第二个嵌套循环代码片段并行化不足造成的。此外,使用临界区锁通常会在并行代码执行期间导致足够的开销。从上面的热门热点分析部分可以看出,与特定的 fork-join 调用和隐式屏障同步相比,omp_set_lock(&lock)、omp_set_unlock(&lock) 函数调用具有最长的持续时间。

总之,从顺序或并行代码片段中我们可以看出,使用 Intel OpenMP 性能库通常无法实现以下代码的理想并行化。此代码无法完美并行化的原因之一是,在使用 OpenMP 库的多线程原语在多个 CPU 核心上扩展执行此代码时,OpenMP 库本身缺乏隐式同步。

在本文的下一部分,我们将讨论如何使用 NVIDIA CUDA®——一个并行多线程平台,它能为实现分布计数算法的代码提供更高效的并行化。

使用代码

cpdCountChunkKernel 内核在由多个发散线程执行时,从数组 ppdInputVec 中检索每个数据项,并通过动态启动另一个 cdpCountIfKernel 内核执行线性搜索,以查找每个当前数据项 ppdInputVec[threadIdx.x] 的实际重复项数量,该内核执行下面讨论的简单计数-如果操作。

    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkKernel(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_0];

	// Declare a locally shared pinned array ppdSHCountVec used to store the number 
	// of duplicates value for each data item retrieved during the cdpCountChunkKernel execution
	__shared__ FreqT ppdSHCountVec[CHUNK_SIZE_GRID_0];

	// Perform a check if the value of threadIdx.x variable 
	// is less than the value of the ppdInputVec array size
	if (threadIdx.x < nSize)
	{
		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());
		// If this kernel is executed by the first thread with thread-id equal to 0, 
		// initiallizing the locally shared array ppdSHCountVec used to store the actual 
		// number of duplicates for each data item from the array ppdInputVec
		if (threadIdx.x == 0)
		{
			for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_0; iIndex++)
				ppdSHCountVec[iIndex].value = ppdSHCountVec[iIndex].freq = 0;
		}

		CntType* ppdCountValue = NULL;
		// Retrieving the current data item with index threadIdx.x from the array 
		// ppdInputVec and assigning its value to the thread-private variable nValue
		InpType nValue = ppdInputVec[threadIdx.x];
		// Allocating GPU device memory to store the actual value of the number of the
		// current item's duplicates and assigning its address to the ppdCountValue pointer variable
		cudaErrorCheck(cudaMalloc((void**)&ppdCountValue, sizeof(CntType)));

		// Launching cdpCountIfKernel to obtain the actual number of duplicates 
		// ppdCountValue for the current data item that has been previously retrieved 
		// from the array ppdInputVec and assigned to nValue variable
		cdpCountIfKernel<inptype, cnttype=""> << < 1, \
			blockDim.x, 0, pStreamHandles[threadIdx.x] >> > (ppdInputVec, ppdCountValue, nValue, nSize);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());
		// Waiting until the cudaDeviceSynchronize API call will finish its execution
		cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[threadIdx.x], pStreamEvents[threadIdx.x], 0));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		// Performing a check if the value of the number of duplicates is greater 
		// than zero and not exceeding the value of ppdInputVec array size
		if (*ppdCountValue > 0 && *ppdCountValue < nSize)
		{
			__syncthreads(); // Perfoming synchronization of all threads in a block
			// Assigning the value of the current data item retrieved to the ppdSHCountVec[threadIdx.x].value 
			// variable of the current node with index threadIdx.x in the array ppdSHCountVec
			ppdSHCountVec[threadIdx.x].value = nValue;
			// Assigning the number of duplicates to the ppdSHCountVec[threadIdx.x].freq
			// variable of the current node with index threadIdx.x in the array ppdSHCountVec
			ppdSHCountVec[threadIdx.x].freq = *ppdCountValue;
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());

			bool bExists = false;
			// Iterating through the shared array ppdSHCountVec used to store 
			// the values of the actual number of data items duplicates 
			for (int iIndex = 0; iIndex < threadIdx.x && !bExists; iIndex++)
				// For each node with index iIndex in the array we're performing a check
				// if the value of the variable ppdSHCountVec[iIndex].value is equal to
				// the value nValue variable. If so, the actual number of duplicates for
				// the current item was already obtained.
				if (ppdSHCountVec[iIndex].value == nValue) bExists = true;

			__syncthreads(); // Perfoming synchronization of all threads in a block
			// If the value of bExists variable is equal to false 
			// (the number of duplicates value for the current item has not been already obtained), 
			// copy the current node with index threadIdx.x to the array ppdCountVec declared in
			// global GPU context.
			if (bExists == false)
			{
				__syncthreads(); // Perfoming synchronization of all threads in a block
				// Copy the current node with index threadIdx.x to the array 
				// ppdCountVec declared in global GPU context.
				ppdCountVec[threadIdx.x] = ppdSHCountVec[threadIdx.x];
			}
		}

		// Deallocating the buffer which address is assigned to the ppdCountValue variable
		cudaErrorCheck(cudaFree(ppdCountValue));
		// Destorying a stream in which the cdpCountIfKernel is executed
		cudaErrorCheck(cudaStreamDestroy(pStreamHandles[threadIdx.x]))
	}
}
</inptype,></typename>

以下代码片段实现了 cdpCountIfKernel,该内核执行线性搜索以查找数组 ppdInpVec 中当前数据项的重复项数量,其值赋给 nValue 变量。它对本地共享变量 nSHLocalCount 执行原子增量操作,该变量用于累积所有线程(每个线程执行其自己的 cdpCountIfKernel 实例)的重复项数量。在每个线程执行结束时,nSHLocalCount 变量的值被复制到全局上下文内存,其地址赋给 ppdCountVec 全局变量。

   // cdpCountIfKernel is a kernel function performing parallel count-if linear search
// to find the actual number of duplicates in the array ppdInputVec of size nSize for 
// the value of nValue variable passed as an argument to the following kernel function.
// The value of the total amount of duplicates for the value of nValue is returned as the 
// value of the pointer ppdCountVec variable passed as an argument to the following kernel
template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountIfKernel(const InpType* ppdInputVec, \
	CntType* ppdCountVec, const InpType nValue, CntType nSize)
{
	// Declare a locally shared variable nSHLocalCount used to accumulate the total 
	// amount of occurrences for nValue in the array ppdInputVec The following variable 
	// is shared across all threads executing cdpCountIfKernel. Since this variable
	// value has been modified during the kernel execution, we'll use the atomicAdd(...)
	// routine to perform the atomic reduction while incrementing the value of nSHLocalCount by 1
	__shared__ CntType nSHLocalCount;

	// Perform a check if the value of threadIdx.x variable 
	// is less than the value of the ppdInputVec array size
	if (threadIdx.x < nSize)
	{
		__syncthreads(); // Synchronizing all threads across a block
		// If this kernel is executed by the first thread with thread-id 
		// equal to 0 assigning the initial value of variable nSHLocalCount = 0
		if (threadIdx.x == 0) nSHLocalCount = 0;
		// Performing a check if the current item ppdInputVec[threadIdx.x] in the
		// array ppdInputVec is equal to the value of nValue variable.
		if (ppdInputVec[threadIdx.x] == nValue)
		{
			__syncthreads(); // If so, perfoming synchronization of all threads in a block
			// Incrementing the value of nSHLocalCount variable value by 1 
			// using atomic reduction function atomicAdd(...)
			atomicAdd(&nSHLocalCount, 1);
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
		}

		// If the value nSHLocalCount shared variable is not equal to 0,
		// duplicating its value by assigning it to the GPU device memory
		// accessed by the ppdCountVec pointer allocated in GPU device memory
		if (nSHLocalCount > 0)
		{
			__syncthreads(); // Perfoming synchronization of all threads in a block
		    // Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
			// Assigning the value of nSHLocalCount variable to the device memory
			// accessed by the global context pointer variable ppdCountVec
			*ppdCountVec = nSHLocalCount;
		}
	}
} 
</typename>

以下代码片段实现了 cdpCountChunkRecursiveKernel 内核,该内核在多个发散线程之间执行分布计数过程的并行工作共享,每个线程计算其自己的块内一组项目的重复项数量。每个线程动态启动 cdpCountChunkKernel,以获取当前块内每个特定项目的重复项数量,该块的边界在 cdpCountChunkRecursiveKernel 执行开始时计算。在每个线程计算出当前块内每个项目的重复项数量后,以下内核通过将每个当前块的重复项数量数组合并到全局数组 ppdCountVec 中来执行归约。

    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkRecursiveKernel(InpType* ppdInputVec, FreqT* ppdCountVec, \
	CntType nChunkID, CntType nDepth, CntType nItemsCount, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CUDA_MAX_DEPTH];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CUDA_MAX_DEPTH];

	// Performing a check if the actual recursion depth 
	// is not exceeding the specified threshold CUDA_MAX_DEPTH
	if (nDepth < CUDA_MAX_DEPTH)
	{
		// Computing the begining and ending position of the current chunk in the array ppdInputVec
		CntType nBegin = nChunkID * nChunkSize;
		CntType nEnd = (nChunkID + 1) * nChunkSize;

		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth], cudaEventDisableTiming));

		// Performing a check if the ending position of the current
		// chunk doesn't exceed the actual size of the ppdInputVec array
		if (nEnd <= nSize)
		{
			// Declaring a pointer variable that is assigned to the 
			// address of buffer used to store the current chunk in GPU memory
			InpType* ppdChunkPtr = NULL;
			// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU memory
			cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
			// Performing the tranfer of the current chunk from ppdInputVec array to the temporary buffer ppdChunkPtr.
			cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
				sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[nDepth]));

			// Declaring the pointer variable that is assigned to the address of a temporary
			// buffer used to store the array of values, each one is the number of 
			// duplicates for each data item within the current chunk
			FreqT* ppdLocalCountVec = NULL;
			// Allocating buffer ppdLocalCountVec used to store the array of values, 
			// each one is the number of duplicates for each data item within the current chunk
			cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_0));
			cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_GRID_0, pStreamHandles[nDepth]));

			// Dynamically launching the cpdCountChunkKernel to compute the 
			// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
			cdpCountChunkKernel <inptype, cnttype=""> << < 1, \
				CHUNK_SIZE_GRID_0, 0, pStreamHandles[nDepth] >> > (ppdChunkPtr, ppdLocalCountVec, CHUNK_SIZE_GRID_0);

			// Asynchronously record the stream event prior to synchronizing all threads across GPU device
			cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth], pStreamHandles[nDepth]));
			// Synchronizing all threads of the entire GPU device
			cudaErrorCheck(cudaDeviceSynchronize());
			// Waiting until the cudaDeviceSynchronize API call will finish its execution
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth], pStreamEvents[nDepth], 0));

			// Destorying a stream in which the cdpCountIfKernel is executed
			cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth]));
			// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
			cudaErrorCheck(cudaFree(ppdChunkPtr));

			// Performing reduction by merging the array of the number of duplicates values
			// obtained by each divergent thread into the global array used to store the overall
			// number of duplicates for all items from the array ppdInputVec
			CntType nItem = cdpMergeReduction<cnttype>(ppdLocalCountVec, ppdCountVec, \
				GridType::LOCAL, 0, 0, nItemsCount, CHUNK_SIZE_GRID_0);

			// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
			cudaErrorCheck(cudaFree(ppdLocalCountVec));

			// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
			cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[nDepth + 1], cudaStreamNonBlocking));
			// Initializing the new event used to synchronize the new non-blocking stream execution
			cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[nDepth + 1], cudaEventDisableTiming));

			// Recursively launching the cdpCountChunkRecursiveKernel to obtain the number of duplicates
			// for the data items in the succeeding chunk of the pInputVec array
			cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, pStreamHandles[nDepth + 1] >> > \
				(ppdInputVec, ppdCountVec, nChunkID + 1, nDepth + 1, nItem, nChunkSize, nSize);
			cudaErrorCheck(cudaDeviceSynchronize());

			// Asynchronously record the stream event prior to synchronizing all threads across GPU device
			cudaErrorCheck(cudaEventRecord(pStreamEvents[nDepth + 1], pStreamHandles[nDepth + 1]));
			// Waiting until the cudaDeviceSynchronize API call will finish its execution
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[nDepth + 1], pStreamEvents[nDepth + 1], 0));
			// Destorying a stream in which the cdpCountChunkRecursiveKernel is executed
			cudaErrorCheck(cudaStreamDestroy(pStreamHandles[nDepth + 1]));
		}
	}
}
</inptype,></cnttype></inptype,></typename>

以下代码片段实现了 cdpCountChunkGrid1 和 cdpCountChunkGrid2 内核,它们与前面讨论的内核类似,执行相同的分布计数过程并行工作共享,唯一的区别是以下内核不是执行递归工作共享,而是通常执行批量并行化,以分别计算数组 ppdInputVec 的每个块内项目的重复项数量。cdpCountChunkGrid1 是相对于执行 cdpCountChunkGrid2 内核的线程的父线程网格。

    template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid1(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];

	// Computing the begining and ending position of the current chunk in the array ppdInputVec
	CntType nBegin = threadIdx.x * nChunkSize;
	CntType nEnd = (threadIdx.x + 1) * nChunkSize;

	// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
	cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
	// Initializing the new event used to synchronize the new non-blocking stream execution
	cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

	// Performing a check if the ending position of the current
	// chunk doesn't exceed the actual size of the ppdInputVec array
	if (nEnd <= nSize)
	{
		// Declaring a pointer variable that is assigned to the 
		// address of buffer used to store the current chunk in GPU memory
		InpType* ppdChunkPtr = NULL;
		// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU memory
		cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
		// Performing the tranfer of the current chunk from ppdInputVec array to the temporary buffer ppdChunkPtr.
		cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
			sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));

		// Declaring the pointer variable that is assigned to the address of a temporary
		// buffer used to store the array of values, each one is the number of 
		// duplicates for each data item within the current chunk
		FreqT* ppdLocalCountVec = NULL;
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk
		cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, sizeof(FreqT) * CHUNK_SIZE_GRID_1));
		cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));

		cudaErrorCheck(cudaMemsetAsync(ppdCountVecG1[nParentThreadID][threadIdx.x], 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_1, pStreamHandles[threadIdx.x]));

		// Dynamically launching the cpdCountChunkRecursiveKernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkRecursiveKernel <inptype, cnttype=""> << < 1, 1, 0, \
			pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, 0, 0, 0, CHUNK_SIZE_GRID_0, nChunkSize);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkRecursiveKernel is executed
		for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));

		// Performing reduction by merging the array of the number of duplicates values
		// obtained by each divergent thread into the locally shared array ppdLocalCountVec
		// used to store the overall number of duplicates for all items from the array ppdInputVec
		cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_1, \
			nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_1);
		// Performing reduction by merging the array of the number of duplicates values
		// obtained by all divergent threads into the global array ppdCountVec used to store the overall
		// number of duplicates for all items from the array ppdInputVec
		cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_1, nParentThreadID);

		// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
		cudaErrorCheck(cudaFree(ppdLocalCountVec));
		// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
		cudaErrorCheck(cudaFree(ppdChunkPtr));
	}
}

template<typename cnttype="__uint64" inptype="__int8," typename="">
__global__ void cdpCountChunkGrid2(InpType* ppdInputVec, FreqT* ppdCountVec, CntType nParentThreadID, CntType nChunkSize, CntType nSize)
{
	// Declare a locally shared array of stream events 
	// pStreamEvents that are used to synchronize each stream
	__shared__ cudaStream_t pStreamHandles[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];
	// Declare a locally shared array of events used to 
	// synchronize each stream in which we'll launch cdpCountIfKernel kernel
	__shared__ cudaEvent_t pStreamEvents[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0];

	// Computing the begining and ending position of the current chunk in the array ppdInputVec
	CntType nBegin = threadIdx.x * nChunkSize;
	CntType nEnd = (threadIdx.x + 1) * nChunkSize;

	// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
	cudaErrorCheck(cudaStreamCreateWithFlags(&pStreamHandles[threadIdx.x], cudaStreamNonBlocking));
	// Initializing the new event used to synchronize the new non-blocking stream execution
	cudaErrorCheck(cudaEventCreateWithFlags(&pStreamEvents[threadIdx.x], cudaEventDisableTiming));

	// Performing a check if the ending position of the current
	// chunk doesn't exceed the actual size of the ppdInputVec array
	if (nEnd <= nSize)
	{
		// Declaring a pointer variable that is assigned to the 
		// address of buffer used to store the current chunk in GPU memory
		InpType* ppdChunkPtr = NULL;
		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk
		cudaErrorCheck(cudaMalloc((void**)&ppdChunkPtr, sizeof(InpType) * nChunkSize));
		cudaErrorCheck(cudaMemcpyAsync(ppdChunkPtr, &ppdInputVec[nBegin], \
			sizeof(InpType) * nChunkSize, cudaMemcpyDeviceToDevice, pStreamHandles[threadIdx.x]));

		FreqT* ppdLocalCountVec = NULL;
		// Allocating buffer ppdLocalCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk	
		cudaErrorCheck(cudaMalloc((void**)&ppdLocalCountVec, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_2));
		cudaErrorCheck(cudaMemsetAsync(ppdLocalCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaMemsetAsync(ppdCountVecG2[threadIdx.x][iIndex], 0x00, \
				sizeof(FreqT) * CHUNK_SIZE_GRID_2, pStreamHandles[threadIdx.x]));

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkGrid1 is executed
		for (int iIndex = 0; iIndex < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; iIndex++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[iIndex], pStreamEvents[iIndex], 0));

		// Dynamically launching the cpdCountChunkGrid1 kernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkGrid1 <inptype, cnttype=""> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, \
			0, pStreamHandles[threadIdx.x] >> > (ppdChunkPtr, ppdLocalCountVec, \
				threadIdx.x, CHUNK_SIZE_GRID_1, CHUNK_SIZE_GRID_2);

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheck(cudaEventRecord(pStreamEvents[threadIdx.x], pStreamHandles[threadIdx.x]));
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Iterating through the array of stream handles and for each stream performing an asynchronous
		// wait operation to synchronize all streams in which the cdpCountChunkGrid2 is executed
		for (int i = 0; i < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0; i++)
			cudaErrorCheck(cudaStreamWaitEvent(pStreamHandles[i], pStreamEvents[i], 0));

		__syncthreads(); // Perfoming synchronization of all threads in a block
		 // Synchronizing all threads of the entire GPU device
		cudaErrorCheck(cudaDeviceSynchronize());

		// Performing reduction by merging the array of the number of duplicates values
		// obtained by each divergent thread into the locally shared array ppdLocalCountVec
		// used to store the overall number of duplicates for all items from the array ppdInputVec
		cdpMergeReduction<cnttype>(ppdLocalCountVec, NULL, GridType::GRID_2, nParentThreadID, threadIdx.x, 0, CHUNK_SIZE_GRID_2);
		// Performing reduction by merging the array of the number of duplicates values
		// obtained by all divergent threads into the global array ppdCountVec used to store the overall
		// number of duplicates for all items from the array ppdInputVec
		cdpMergeAllReduction<cnttype>(ppdCountVec, GridType::GRID_2, nParentThreadID);

		// Deallocating the buffer which address is assigned to the ppdLocalCountVec variable
		cudaErrorCheck(cudaFree(ppdLocalCountVec));
		// Deallocating the buffer which address is assigned to the ppdChunkPtr variable
		cudaErrorCheck(cudaFree(ppdChunkPtr));
	}
}
</cnttype></cnttype></inptype,></typename></cnttype></cnttype></inptype,></typename>

以下代码实现了主机-设备函数 cdpMergeReduction。该函数通过实际合并每个数组中的项目来执行归约,这些数组用于存储每个数据项的重复值,这些值由每个当前线程为数组 ppdInputVec 的每个特定块获得。通过执行合并归约,我们获得了包含 ppdInputVec 数组所有块中每个项目的重复项总数的全局数组。为了提供更好的线程同步并避免数据流依赖问题,在这种情况下,我们使用设备全局固定内存(例如 ppdCountVecG1 和 ppdCountVecG2 缓冲区)来存储每个块和每个线程的重复项数量值。

__device__ FreqT ppdCountVecG1[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_1] = { { { 0 } } };
__device__ FreqT ppdCountVecG2[CHUNK_SIZE_GRID_2 / CHUNK_SIZE_GRID_0] \
[CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0][CHUNK_SIZE_GRID_2] = { { { 0 } } };

typedef enum { GRID_1 = 1, GRID_2 = 2, LOCAL = 3 } GridType;

template<typename cnttype="__uint64">
__device__ __host__ CntType cdpMergeReduction(FreqT* ppdSourceVec, FreqT* ppdCountVec, GridType Grid, \
	CntType nParentThreadID, CntType nThreadID, CntType nItemsCount, CntType nSize)
{
	CntType nItem = nItemsCount;
	// Iterating through the array ppdSourceVec that used to store the 
	// values of the actual number of data items duplicates for the 
	// current chunk obtained during the current thread execution
	for (CntType iIndex = 0; iIndex < nSize; iIndex++)
		// For each node in the array ppdSourceVec we're performing a check if
		// the value of ppdSourceVec[iIndex].freq > 0 is greater than zero, which means
		// the value of the number of duplicates value for the current data item was already
		// indexes into the array ppdSourceVec
		if (ppdSourceVec[iIndex].freq > 0)
		{
			// Performing a linear search to determine if the current item ppdSourceVec[iIndex] exists in the target array ppdCountVec and
			// obtain the value its index. By performing a linear search we actually iterate through the target array and compare the 
			// value of the variable ppdCountVec[nParentThreadID][nThreadID][nIndex++].value of each node with the value of 
			// ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec. 
			CntType nIndex = 0;	bool bExists = false;
			while (nIndex < nItem && bExists == false)
				// If the value of the ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value variable
				// is equal to the the value of ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec,
				// which means that the value of the number of duplicates for the current item in the array ppdInputVec
				// has been already indexed into the array ppdCountVec, we assign the boolean variable bExists = true 
				// and finish the loop execution.
				if (Grid == GridType::GRID_1)
					bExists = (ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
				else if (Grid == GridType::GRID_2)
					bExists = (ppdCountVecG2[nParentThreadID][nThreadID][nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
				else if (Grid == GridType::LOCAL)
					bExists = (ppdCountVec[nIndex++].value == \
						ppdSourceVec[iIndex].value) ? 1 : 0;
			
			// If the value of the number of duplicates for the current data item exists in the target array 
			// ppdCountVec, then we're performing and addition of the value of  ppdSourceVec[iIndex].freq variable 
			// with the value of the ppdCountVec[nParentThreadID][nThreadID][nIndex - 1].freq variable
			if (bExists == true)
			{
				if (Grid == GridType::GRID_1)
					ppdCountVecG1[nParentThreadID][nThreadID][nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
				else if (Grid == GridType::GRID_2)
					ppdCountVecG2[nParentThreadID][nThreadID][nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
				else if (Grid == GridType::LOCAL)
					ppdCountVec[nIndex - 1].freq += \
					ppdSourceVec[iIndex].freq;
			}
			
			// Otherwise, store the number of duplicates value to the target 
			// array ppdCountVec by assigning the value of the ppdSourceVec[iIndex].value 
			// variable to the ppdCountVec[nParentThreadID][nThreadID][nItem].value variable.
			else
			{
				if (Grid == GridType::GRID_1)
				{
					ppdCountVecG1[nParentThreadID][nThreadID][nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVecG1[nParentThreadID][nThreadID][nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}

				else if (Grid == GridType::GRID_2)
				{
					ppdCountVecG2[nParentThreadID][nThreadID][nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVecG2[nParentThreadID][nThreadID][nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}

				else if (Grid == GridType::LOCAL)
				{
					ppdCountVec[nItem].value = \
						ppdSourceVec[iIndex].value;
					ppdCountVec[nItem].freq = \
						ppdSourceVec[iIndex].freq;
					nItem++;
				}
			}
		}

	return nItem;</typename>
}

与上面列出的 cdpMergeReduction 函数不同,cdpMergeAllReduction 设备函数执行特定数组的合并归约,这些数组保存了每个线程为每个块获得的重复项数量,从而获得父块中每个项目的重复项数量。

template<typename cnttype="__uint64">
__device__ CntType cdpMergeAllReduction(FreqT* ppdCountVec, GridType Grid, CntType nParentThreadID)
{
	CntType nThreadID = 0; CntType nItem = 0;
	while (nThreadID < CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0)
	{
		// Performing the merge reduction for the current thread with thread-id assigned to the
		// nThreadID variable by iterating through the array ppdSourceVec that used to store the 
		// values of the actual number of data items duplicates for the current chunk obtained 
		// during the current thread execution
		for (CntType iIndex = 0; iIndex < CHUNK_SIZE_GRID_1; iIndex++)
			// For each node in the array ppdSourceVec we're performing a check if
			// the value of ppdSourceVec[iIndex].freq > 0 is greater than zero, which means
			// the value of the number of duplicates value for the current data item was already
			// indexes into the array ppdSourceVec
			if (((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
				(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0) > 0)
			{
				// Performing a linear search to determine if the current item ppdSourceVec[iIndex] exists in the target array ppdCountVec and
				// obtain the value its index. By performing a linear search we actually iterate through the target array and compare the 
				// value of the variable ppdCountVec[nParentThreadID][nThreadID][nIndex++].value of each node with the value of 
				// ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec. 
				CntType nIndex = 0;	bool bExists = false;
				while (nIndex < nItem && bExists == false)
					bExists = (ppdCountVec[nIndex++].value == \
					((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0)) ? 1 : 0;

				// If the value of the ppdCountVecG1[nParentThreadID][nThreadID][nIndex++].value variable
				// is equal to the the value of ppdSourceVec[iIndex].value variable of the current node in the array ppdSourceVec,
				// which means that the value of the number of duplicates for the current item in the array ppdInputVec
				// has been already indexed into the array ppdCountVec, we assign the boolean variable bExists = true 
				// and finish the loop execution.
				if (bExists == true)
				{
					ppdCountVec[nIndex - 1].freq += \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);
				}

				// Otherwise, store the number of duplicates value to the target 
				// array ppdCountVec by assigning the value of the ppdSourceVec[iIndex].value 
				// variable to the ppdCountVec[nParentThreadID][nThreadID][nItem].value variable.
				else
				{
					ppdCountVec[nItem].value = \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].value : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].value : 0);
					ppdCountVec[nItem].freq = \
						((Grid == GridType::GRID_1) ? ppdCountVecG1[nParentThreadID][nThreadID][iIndex].freq : \
						(Grid == GridType::GRID_2) ? ppdCountVecG2[nParentThreadID][nThreadID][iIndex].freq : 0);

					nItem++;
				}
			}

		nThreadID++;
	}

	return nItem;
}    
</typename>

以下函数是主机上执行分布计数的主机函数。与上面讨论的内核函数类似,以下函数通过顺序执行 cdpCountChunkGrid2 内核并使用流来获取 pData 数组每个块中每个数据项的重复值数组,从而执行分布计数过程的并行工作共享。以下函数通过调用上面讨论的 cdpMergeReduction 函数来执行这些数组的归约。

    template<typename InpType = __int8, typename CntType = __uint64>
__host__ CntType cdpDistCountLaunch(InpType* pData, FreqT* phFreqT, CntType nChunks)
{
	// Allocating the array of streams handles
	cudaStream_t* phStreamHandles = (cudaStream_t*)malloc(sizeof(cudaStream_t) * nChunks);
	// Allocating the array of stream events handles
	cudaEvent_t* phStreamEvents = (cudaEvent_t*)malloc(sizeof(cudaEvent_t) * nChunks);

	// Iterating through the arrays of streams and initializing a handle for each 
	// new non-blocking stream in which we'll execute cdpCountIfKernel.
	// Also we're initializing the new event used to synchronize each 
	// new non-blocking stream execution
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		// Initializing the new non-blocking stream in which we'll execute cdpCountIfKernel
		cudaErrorCheckHost(cudaStreamCreateWithFlags(&phStreamHandles[iIndex], cudaStreamNonBlocking));
		// Initializing the new event used to synchronize the new non-blocking stream execution
		cudaErrorCheckHost(cudaEventCreateWithFlags(&phStreamEvents[iIndex], cudaEventBlockingSync));
	}

	__uint64 nItem = 0, nParentChunkID = 0;
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		// Computing the offset of the current chunk in the array pData
		__uint64 nChunkOffset = iIndex * CHUNK_SIZE_HOST;
		// Computing the parent chunk-id for each chunk being processed
		nParentChunkID = (iIndex % 100 == 0) ? 0 : nParentChunkID;

		fprintf(stdout, "ChunkOffset = %llu nParentChunkID = %llu Chunk = %llu of %llu\n", \
			nChunkOffset, nParentChunkID, iIndex, nChunks);

		__int8* ppdChunkBuf = NULL;
		// Allocating buffer ppdChunkPtr used to store the current chunk of the array ppdInputVec into GPU unified memory
		cudaErrorCheckHost(cudaMallocManaged((void**)&ppdChunkBuf, sizeof(__int8) * CHUNK_SIZE_HOST - 1));
		cudaErrorCheckHost(cudaMemcpyAsync(ppdChunkBuf, &pData[nChunkOffset], \
			sizeof(__int8) * CHUNK_SIZE_HOST - 1, cudaMemcpyHostToDevice, phStreamHandles[iIndex]));

		FreqT* ppdCountVec = NULL;
		// Allocating buffer ppdCountVec used to store the array of values, 
		// each one is the number of duplicates for each data item within the current chunk	
		cudaErrorCheckHost(cudaMallocManaged((void**)&ppdCountVec, sizeof(FreqT) * CHUNK_SIZE_HOST));
		cudaErrorCheckHost(cudaMemsetAsync((void*)ppdCountVec, 0x00, \
			sizeof(FreqT) * CHUNK_SIZE_HOST, phStreamHandles[iIndex]));

		// Asynchronously record the stream event prior to synchronizing all threads across GPU device
		cudaErrorCheckHost(cudaEventRecord(phStreamEvents[iIndex], phStreamHandles[iIndex]));

		// Launching the cpdCountChunkGrid1 kernel to compute the 
		// number of duplicates for each data item in the current chunk transfered to the ppdChunkPtr buffer
		cdpCountChunkGrid2 <__int8, __uint64> << < 1, CHUNK_SIZE_GRID_1 / CHUNK_SIZE_GRID_0, 0, \
			phStreamHandles[iIndex] >> > (ppdChunkBuf, ppdCountVec, \
				nParentChunkID, CHUNK_SIZE_GRID_2, CHUNK_SIZE_HOST);

		// Performing a synchronization of all active streams being executed
		cudaErrorCheckHost(cudaStreamSynchronize(phStreamHandles[iIndex]));

		// Performing a synchronization of all threads on the host
		cudaErrorCheckHost(cudaThreadSynchronize());
		// Synchronizing all threads of the entire GPU device
		cudaErrorCheckHost(cudaDeviceSynchronize());
		// Waiting until the cudaDeviceSynchronize API call will finish its execution
		cudaErrorCheckHost(cudaStreamWaitEvent(phStreamHandles[iIndex], phStreamEvents[iIndex], 0));

		// Allocating buffer phCountVec on host to store the number of duplicates 
		// for each item within the current chunk 
		FreqT* phCountVec = (FreqT*)malloc(sizeof(FreqT) * CHUNK_SIZE_HOST);
		memset((void*)phCountVec, 0x00, sizeof(FreqT) * CHUNK_SIZE_HOST);

		// Tranfering the buffer ppdCountVec from the device to host memory
		cudaErrorCheckHost(cudaMemcpyAsync(phCountVec, ppdCountVec, \
			sizeof(FreqT) * CHUNK_SIZE_HOST, cudaMemcpyDeviceToHost, phStreamHandles[iIndex]));

		// Performing reduction by merging the array of the number of duplicates values
		// obtained during the kernel execution into the array phCountVec used to store 
		// the overall number of duplicates for all items from the array ppdChunkPtr
		nItem = cdpMergeReduction<__uint64>(phCountVec, phFreqT, GridType::LOCAL, 0, 0, nItem, CHUNK_SIZE_HOST);

		// Incrementing the value of nParentChunkID by 1
		nParentChunkID++;
		
		// Deallocating the buffer which address is assigned to the ppdChunkBuf variable
		cudaErrorCheckHost(cudaFree(ppdChunkBuf));
		// Deallocating the buffer which address is assigned to the ppdCountVec variable
		cudaErrorCheckHost(cudaFree(ppdCountVec));

		// Deallocating the buffer which address is assigned to the phCountVec variable
		free(phCountVec);
	}

	// Iterating through the array of either streams of events handles, destroying
	// each stream and event in which the cdpCountChunkGrid2 kernel has been executed
	for (__uint64 iIndex = 0; iIndex < nChunks; iIndex++)
	{
		cudaErrorCheckHost(cudaEventDestroy(phStreamEvents[iIndex]));
		cudaErrorCheckHost(cudaStreamDestroy(phStreamHandles[iIndex]));
	}
	
	return nItem;
}

 

如何运行此代码

要运行本文中讨论的代码,强烈建议在您将运行 CUDA 应用程序可执行文件(可在本文顶部下载)的主机上安装 NVIDIA CUDA 8.0 工具包。此外,为了避免可能与您的 PC 显卡驱动程序发生干扰和冲突,建议将 WDDM TDR 延迟参数设置为 10 秒。以下文章中介绍的项目的另一个限制是,建议使用 compute_61, sm_61 兼容版本显卡(例如 NVIDIA GeForce GTX1070 GPU Pascal 或更高版本)来编译和运行以下项目。

历史

  • 2016 年 12 月 10 日 - 文章第一版发布。
© . All rights reserved.