65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Intel® Threading Building Blocks 和 OpenMP 库的并行可扩展 Burrows-Wheeler 变换 (BWT)

starIconstarIconstarIconstarIconstarIcon

5.00/5 (12投票s)

2017 年 4 月 30 日

CPOL

28分钟阅读

viewsIcon

18703

downloadIcon

498

本文是一篇实用的指南,介绍如何基于实现 Burrows-Wheeler 变换 (BWT) 算法的并行可扩展代码示例,使用 Intel® Threading Building Blocks (TBB) 和 OpenMP 库进行 C++ 开发。

引言

本文是一篇实用的指南,介绍如何基于实现 Burrows-Wheeler 变换 (BWT) 算法的并行可扩展代码示例,使用 Intel® Threading Building Blocks (TBB) 和 OpenMP 库进行 C++ 开发。

以下文章将介绍实现 BWT 编码/解码算法的并行代码的基本思想,该算法可以根据 Amdahl 定律完美扩展,从而通过利用现代多核 Intel® CPU(如 Intel® Core™ i7 和 Intel® Xeon™ E5 处理器系列以及最新的 Intel® Xeon™ Phi 协处理器)来提高性能。

特别是,我们将讨论并行编程的各个方面,例如使用 Intel TBB 的库函数模板进行紧密循环并行化,以及使用多任务处理来并发执行无法并行运行的顺序代码片段。此外,我们还将解释如何使用嵌套并行性概念来优化嵌套循环的性能,特别是如何将一个或多个嵌套循环的执行折叠成一个单一的递归并行过程。最后,我们将讨论使用 OpenMP 的并行指令构造,其使用允许在多个硬件线程之间共享整个编码/解码过程,从而优化通常大量数据的编码/解码过程。

在本文中…

在本文的开头,我们将仔细研究算法本身及其计算复杂度。然后,我们将讨论实现实际编码/解码的 C++ 顺序代码片段,并对这些代码片段进行详细分析,以确定基本的性能优化热点。在讨论过程中,我们将解释如何使用特定的 OpenMP 并行指令构造和 Intel® TBB 库函数模板将顺序代码转换为并行代码,从而极大地提高编码/解码过程的性能。

本文后续将讨论的主题包括

  • 使用 Intel® TBB 的模板函数进行单紧密循环并行化,以及如何正确“折叠”两个或多个嵌套并行循环以提供多维数据处理性能加速;
  • 使用 Intel® TBB 的任务组函数并行执行两个或多个独立的顺序任务,以加速无法完美并行化的顺序代码片段的性能;
  • 线程同步,以避免创建的并行代码可能引起的竞争条件和数据依赖问题。我们将讨论诸如使用线程隔离来提供折叠的嵌套循环并行执行同步等同步方面。在讨论过程中,我们将比较 Intel® TBB 或 OpenMP 库提供的线程同步;

除了使用 Intel® TBB 的模板函数来实现并行循环的方面之外,在本文中,我们还将了解如何使用 OpenMP 库的特定并行指令构造,以基于分区合并算法完美地在所有 CPU 线程之间共享大量数据集的整个编码/解码过程。以下算法主要用于将单个循环的迭代分布到多个并行运行的 CPU 线程之间,这与执行实际编码/解码的相同循环的顺序执行相比,提供了显著的性能加速增益。

最后,在本文的结尾,我们将使用 Intel® VTune™ Amplifier XE 工具提供对以下并行代码的性能分析和评估。我们将比较编码/解码不同大小数据集时并行代码的执行性能。

背景

Burrows-Wheeler 变换 (BWT)

Burrows-Wheeler 变换 (BWT) 是一种数据编码/解码算法,它是许多数据压缩算法(如bzip2)的重要组成部分。该算法最初由 Michael Burrows 和 David Wheeler 于 1994 年提出。该算法的主要思想是对给定字符字符串进行块排序,以产生易于压缩的输出,例如通过游程长度编码 (RLE)

编码

使用 BWT 算法进行字符串编码的过程主要基于对原始输入字符串中的字符进行字典序排列。通常,该过程非常简单,可以表述如下:

  1. 创建一个由 n 个字符串组成的输出集,其中字符串是原始输入字符串的所有可能旋转;
  2. 按字典序对生成的字符串集进行排序;
  3. 从生成的字符串集中每个字符串的最后一个字符创建输出字符串;

要使用 BWT 变换算法执行实际编码,我们首先必须创建一个生成的字符串集。生成的字符串集中的每个字符串都可以通过将原始输入字符串向右旋转 k 个字符(k < n)得到,其中 k 是给定生成字符串集中新排列字符串的位置。

字符串本身旋转是一个循环移位操作,在此操作中,我们实际上获取了给定输入字符串最后一个位置的字符并将其赋给临时变量:(t <- string[n])。然后,我们执行简单的线性移位,将每个字符移动到给定字符串的下一个位置

foreach (k <- 0 downto (n-1)) {string[k + 1] <- string[k];}

由于其余字符已成功向右移动一位,我们只需将临时变量中的最后一个字符赋给给定字符串的第一个位置:(string[0] <- t)。要将输入字符串向右旋转 k 个位置,我们必须重复上述过程 k 次,直到获得一个排列字符串,其中所有字符相对于其在输入字符串中的原始位置都旋转了 k 个位置。

然而,在字符串旋转方面存在一项重要的算法级优化,可以大大提高编码过程的性能。为了获得旋转后的字符串,我们通常不再需要执行上述对原始输入字符串的复杂旋转。另外,也没有必要重复执行该过程多次以找到原始输入字符串的特定旋转。

相反,要将输入字符串向右旋转 k 个位置,我们首先需要从原始输入字符串的起始位置(该位置完全对应于原始字符串应旋转的位置数)复制最后 k 个字符。然后,我们将这些复制的字符赋给一个新创建的字符串的开头,从第一个字符的位置开始。

foreach (i <- k to n) {new_string[i – k] <- string[i];}

之后,我们只需将原始输入字符串的前 k 个字符复制到新创建的字符串的起始位置 k 处,该位置完全对应于原始字符串需要旋转的位置数。

foreach (j <- 0 to k) {new_string[j + k] <- string[j];}

BWT 算法的第二个关键阶段是按字典序对生成的字符串集进行排序,这可以通过使用大多数现有的排序算法来完成,例如著名的quicksortmerge-sort。通过按字典序对生成的字符串集进行排序,我们实际上获得了原始输入字符串的排列字符序列。

最后,由于我们获得了有序的字符串集,我们通常需要提取给定生成字符串集中每个字符串的最后一个字符,并将其附加到最终包含编码字符串的输出中。

下面的示例说明了 BWT 编码过程,在此过程中,我们通过将原始输入字符串向右旋转正好对应于当前生成字符串在以下生成字符串集中位置的字符数来获得生成的排列字符串集(我们将使用空终止字符“\0”来指示要编码的字符串的最终位置)

输入字符串 ->    BROWSE\0     0

----------------------------------

                   \0BROWSE     1

                   E\0BROWS     2

                   SE\0BROW     3

                   WSE\0BRO     4

                   OWSE\0BR     5                        

                   ROWSE\0B     6

空终止字符将进一步被解码算法使用,以在生成集合中查找在其最终位置包含该字符的字符串。该字符串将精确匹配解码后的原始输入字符串。

BWT 算法的第二个关键阶段是按字典序对生成的字符串集进行排序,这可以通过使用大多数现有的排序算法来完成,例如著名的 quicksort 和 mergesort。通过按字典序对生成的字符串集进行排序,我们实际上获得了原始输入字符串的排列字符序列。

输入字符串 ->    BROWSE\0     0   ->   \0BROWSE       1

---------------------------------------------------------

          \0BROWSE     1        BROWSE\0      0

          E\0BROWS     2        E\0BROWS      2

          SE\0BROW     3        OWSE\0BR      5

          WSE\0BRO     4        ROWSE\0B      6

          OWSE\0BR     5        SE\0BROW      3                

          ROWSE\0B     6        WSE\0BRO      4

最后,由于我们获得了在上一个编码步骤中排序的字符串集,我们通常需要提取给定生成字符串集中每个字符串的最后一个字符,并将其附加到将包含编码字符串的输出中,如下图所示。

输入字符串 ->    \0BROWSE   0   ->   \0BROWSE

------------------------------------------------

          BROWSE\0     1        BROWSE\0

          E\0BROWS     2        E\0BROWS     

          OWSE\0BR     3        OWSE\0BR      

          ROWSE\0B     4        ROWSE\0B      

          SE\0BROW     5        SE\0BROW                      

          WSE\0BRO     6        WSE\0BRO      

 

---------------------------------------------

输出 -> E\0SRBWO

下面列出的 C++ 代码片段实现了 BWT 编码算法。

void bwt_encode(char* string, size_t length, char* output)
{
 // Allocate the memory buffer for the resulting set of strings
 BWT* bwt = (BWT*)malloc(sizeof(BWT) * length);
 // Allocate the memory buffer for the first string in the given resulting set
 bwt[0].string = (char*)malloc(length);
 // Copy the original string to the first position in the resulting set
 memset((void*)bwt[0].string, 0x00, length);
 memcpy((void*)bwt[0].string, (const void*)string, length);

 // Iterate through the resulting set staring at the position 
 // of the second string in the given resulting set.
 for (size_t row = 1; row < length; row++)
 {
  // Allocate memory for each newly created string in the resulting set
  bwt[row].string = (char*)malloc(length);
  memset((void*)bwt[row].string, 0x00, length);

  // Rotate the original string by row-positions right:
  // Copy the last row-characters from the original string and assign them starting 
  // at the position of the first character of a newly created string;
  for (size_t index = length - row; index < length; index++)
   bwt[row].string[index - (length - row)] = string[index];

  // Copy the first row-characters from the original string and assign them 
  // staring at the position row in a newly created string
  for (size_t index = 0; index < length - row; index++)
   bwt[row].string[index + row] = string[index];
 }

 // Sort the given resulting set lexicographically
 qsort((void*)bwt, length, sizeof(BWT), cmp_proc);

 // Iterate through the resulting set and for each string retrieve the character
 // located at the last position. Append each character obtained to the output buffer
 for (size_t row = 0; row < length; row++)
  output[row] = bwt[row].string[length - 1];

 output[length] = '\0';

 // Deallocate each string buffer in the resulting set
 for (size_t index = 0; index < length; index++)
   free(bwt[index].string);

 free(bwt);
}

整个编码算法的计算复杂度估计为 p = O(n^2 x log2(n)),并且与其成正比。在这种情况下,BWT 编码的整体复杂度估计不包括实际的排序计算复杂度。所使用的排序算法的复杂度是一种开销,其值可以简化为编码或解码过程。

解码

为了解码编码后的原始输入字符串,我们将使用反向 BWT 算法,该算法具有以下步骤,并且可以表述如下:

  1. 对于要解码的字符串(包含 n 个字符),创建一个包含 n 个空字符串的生成集;
  2. 提取该字符串的每个字符,并将其插入到给定生成集的每个字符串的第一位之前;
  3. 按字典序对生成的字符串集进行排序;
  4. 重复步骤(2,3)n 次,其中 n 是要解码的字符串中的字符数;
  5. 执行线性搜索,以在当前生成集中查找包含空终止字符在其最终位置的字符串;
  6. 将上一步获得的字符串的字符复制到输出字符串;

正如我们已经注意到的,BWT 解码算法有所不同。首先,我们需要创建一个包含 n 个空字符串的生成集。

foreach (k <- 0 to n) { strings[k] <- {0};}

之后,我们需要提取要解码的字符串的字符,并将每个字符插入到当前生成集中每个字符串的第一位之前。

foreach (k <- 0 to n) { strings[k][0] <- output[k];}

换句话说,我们将整个要解码的字符串放在给定生成集的第一列之前。然后,我们显然必须对生成集进行字典序排序。我们通常会为要解码的字符串中的每个字符位置重复此过程 n 次。最后,我们必须执行简单的线性搜索来查找包含空终止字符在其最终位置的字符串。显然,该字符串是解码后的原始输入字符串,其字符必须复制到输出字符串。

下面的示例说明了使用反向 BWT 算法创建字符串集的过程。

下面 C++ 代码片段说明了 BWT 解码算法的实现。

void bwt_decode(char* string, size_t length, char* output)
{
 // Allocate the memory buffer for the resulting set of strings
 BWT* bwt = (BWT*)malloc(sizeof(BWT) * length);
 // Iterate through the resulting set of strings
 for (size_t row = 0; row < length; row++)
 {
  // Allocate memory buffer for each string
  bwt[row].string = (char*)malloc(length);
  memset((void*)bwt[row].string, 0x00, length);
 }

        // Do length – iterations to perform the inverse BWT transformation
 for (size_t index = 0; index < length; index++)
 {
  // Insert a column before the first column in the resulting set
  // Iterate through the resulting set and for each string move 
  // all characters to the next position right
  for (size_t row = 0; row < length; row++)
   // Iterate through the current string and copy each character
   // to the next position starting at the final position of the current string
   for (size_t col = 0; col < length - 1; col++)
   {
         // Compute the current and previous position of the current character
        size_t rev_first = length - col - 2;
        size_t rev_second = length - col - 1;
        // Copy the current character from the previous position to the new position 
        bwt[row].string[rev_second] = bwt[row].string[rev_first];
   }

  // Iterate through the characters of the original 
  // input string and insert each character at the first 
  // position of each string in the current resulting set
  for (size_t row = 0; row < length; row++)
   bwt[row].string[0] = string[row];

  // Sort the given resulting set lexicographically 
  qsort((void*)bwt, length, sizeof(BWT), cmp_proc);
 }

 // Perform the linear search to find a string containing 
 // the null-terminating character at its final position
 for (size_t index = 0; index < length; index++)
  if (bwt[index].string[length - 1] == '\0')
   memcpy((void*)output, (const void*)bwt[index].string, length);

 output[length] = '\0';

        // Deallocate each string buffer in the resulting set
 for (size_t index = 0; index < length; index++)
   free(bwt[index].string);

 free(bwt);
}

与上面讨论的编码算法类似,BWT 解码具有许多算法级优化,例如使用后缀数组或指向生成集中每个排列字符串的指针数组。编码器创建的后缀数组与编码字符串一起传递给解码器。这反过来又简化了解码过程,因为解码器无需执行复杂的反向变换。然而,使用后缀数组会降低数据在 BWT 变换作为数据压缩算法一部分时的可压缩性。

BWT 解码算法的计算复杂度高于编码过程的复杂度,最初估计为 p = O(N^3 + N^2 + 2N)。显然,与编码过程相比,BWT 解码算法的计算复杂度非常高。

性能优化

在我们开始讨论如何使用上述多线程库优化 BWT 变换算法的性能之前,让我们首先仔细查看上面列出的顺序代码片段,以确定实现编码和解码过程的代码片段中的基本性能优化热点。

紧密循环并行化

正如我们已经注意到的,实现编码算法的代码的第一个优化热点是在主循环的每次迭代中执行实际字符串旋转的两个顺序循环。根据字符串旋转算法的性质,这些循环的并行执行通常不会引起数据流依赖或竞争条件问题,这使得通过执行紧密循环并行化来完美地并行执行这些循环成为可能。此外,作为两个独立任务,可以并行执行实现这些后续循环的代码片段。

为了并行执行这些循环的迭代,我们将主要使用 Intel® TBB 库的 parallel_for 模板函数。该函数提供了实现紧密循环并行化的基本机制,在此过程中,顺序循环的特定迭代由一个线程团队并发执行。

// Executing the first sequential loop in 
// parallel using Intel®TBB's parallel_for template function
tbb::parallel_for(tbb::blocked_range<size_t>(length - row, length),
  [&](const tbb::blocked_range<size_t> &r) {
  // Iterating through the original input string
  // Each iteration of the following loop is executed in parallel concurrently 
  for (size_t index = r.begin(); index != r.end(); index++)
   // Assigning the current character from the input string
   // at the specific position of the current string in the resulting set
   bwt[row].string[index - (length - row)] = string[index];
});

// Executing the second sequential loop in 
// parallel using Intel®TBB's parallel_for template function
tbb::parallel_for(tbb::blocked_range<size_t>(0, length - row),
 [&](const tbb::blocked_range<size_t> &r) {
 // Iterating through the original input string
 // Each iteration of the following loop is executed in parallel concurrently 
 for (size_t index = r.begin(); index != r.end(); index++)
  // Assigning the current character from the input string
  // at the specific position of the current string in the resulting set
  bwt[row].string[index + row] = string[index];
});

parallel_for 模板函数将 blocked_range 模板类的一个对象作为第一个参数。该类的对象主要用于定义在并行循环执行期间可以递归拆分的范围。换句话说,通过使用此模板类,我们主要定义了在每次循环迭代中分配给循环计数器变量的值的范围。这些范围通常通过将这些值赋给 blocked_range 构造函数的第一个和第二个参数来声明。第二个参数是 lambda 表达式,它定义了并行执行的代码。此 lambda 表达式的参数是对同一个 blocked_range 类对象的引用,该对象用于访问由以下 blocked range 定义的循环计数器变量索引的下界和上界值。接下来,我们将使用此对象访问 blocked range 类的 begin() 和 end() 方法,以在 lambda 表达式的代码中检索这些下界和上界值。

显然,lambda 表达式中定义的代码实现了循环,其迭代为索引变量的每个值并发执行,由一个独立线程团队执行。在每次迭代中,我们执行的代码是将原始输入字符串的特定字符复制到新创建字符串的特定位置。

根据使用 Intel®TBB 库进行并行编程的最佳实践,当我们 G需要并发并行执行特定循环的迭代以执行紧密循环并行化时,我们通常使用 parallel_for 模板函数。

并行任务执行

为了优化上述后续循环的性能,我们将使用 Intel®TBB 的 task_group 类,该类的对象用于将这些后续循环的执行生成为两个独立的并行任务。为此,我们通常需要声明一个 task_group 类对象。接下来,我们将调用 run 方法,该方法接受一个参数——对 task_handle<Func> 模板类对象的引用。根据 Intel®TBB® 库函数内部机制,我们将使用 C++ lambda 表达式,无论何时需要定义代码片段,其并行执行都是使用这些库模板函数完成的。在这种情况下,lambda 表达式将包含调用 parallel_for 模板函数的代码片段,以并行执行实际执行原始字符串旋转的每个特定循环。

为了生成每个并行任务,我们执行 run 方法的异步调用。在这种特定情况下,对于调用特定并行循环执行的代码片段,该方法被调用两次。调用此方法后,我们需要对正在执行的并行任务进行同步。为此,我们需要调用 wait 方法,该方法通常会暂停代码的运行,直到每个并行任务完成其执行。

tbb::task_group g; // Declaring an object of the task_group class

// Optimization hotspot: 1
// Running the first parallel independent task
g.run([&] { 
 // Executing the first sequential loop in 
// parallel using Intel®TBB's parallel_for template function
 tbb::parallel_for(tbb::blocked_range<size_t>(length - row, length),
  [&](const tbb::blocked_range<size_t> &r) {
  // Iterating through the original input string
  // Each iteration of the following loop is executed in parallel concurrently 
  for (size_t index = r.begin(); index != r.end(); index++)
   // Assigning the current character from the input string
   // at the specific position of the current string in the resulting set
   bwt[row].string[index - (length - row)] = string[index];
 });
});

// Optimization hotspot: 1
// Running the second parallel independent task
g.run([&] {
// Executing the second sequential loop in 
// parallel using Intel®TBB's parallel_for template function
tbb::parallel_for(tbb::blocked_range<size_t>(0, length - row),
  [&](const tbb::blocked_range<size_t> &r) {
  // Iterating through the original input string
// Each iteration of the following loop is executed in parallel concurrently 
  for (size_t index = r.begin(); index != r.end(); index++)
       // Assigning the current character from the input string
       // at the specific position of the current string in the resulting set
        bwt[row].string[index + row] = string[index];
});
});

// Waiting for both parallel tasks to complete their execution
g.wait();

嵌套并行性

在对上述代码进行并行优化时,我们将主要处理多个嵌套循环的并行化。在这种特定情况下,执行实际字符串旋转的并行循环在主循环的每次迭代中执行。显然,主循环是正在讨论的代码的另一个重要优化热点。在这种情况下,我们将使用嵌套并行性的概念,因为我们需要并行化多个嵌套循环。

根据 BWT 编码算法的性质,该过程通常不会引起任何数据流依赖或竞争条件问题,因为原始字符串的每次旋转都不依赖于前一次循环迭代期间执行的字符串旋转结果。为了并行执行主循环,我们将使用与并行化执行实际字符串旋转的嵌套循环相同的 Intel®TBB 的 parallel_for 模板函数。

// Optimization hotspot 2:
// Executing the main sequential loop in 
// parallel using Intel®TBB's parallel_for template function
tbb::parallel_for(tbb::blocked_range<size_t>(1, length),
 [&](const tbb::blocked_range<size_t> &r) {
 // Iterating through the resulting set of string
 // Each iteration of the following loop is executed in parallel concurrently 
 for (size_t row = r.begin(); row != r.end(); row++)
 {
     // Alocate memory for the current string in the resulting set
     bwt[row].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
   length, MEM_COMMIT, PAGE_READWRITE);
  ::ZeroMemory((LPVOID)bwt[row].string, length);

  tbb::task_group g; // Declaring an object of the task_group class

      // Optimization hotspot: 1
  // Running first parallel loop that performs string as the independent task
  g.run([&] { 
   // Executing the first sequential loop in 
   // parallel using Intel®TBB's parallel_for template function
   tbb::parallel_for(tbb::blocked_range<size_t>(length - row, length),
    [&](const tbb::blocked_range<size_t> &r) {
    // Iterating through the original input string
    // Each iteration of the following loop is executed in parallel concurrently 
     for (size_t index = r.begin(); index != r.end(); index++)
      // Assigning the current character from the input string
      // at the specific position of the current string in the resulting set
      bwt[row].string[index - (length - row)] = string[index];
    });
   });

   // Optimization hotspot: 1
   // Running second parallel loop that performs string as the independent task
   g.run([&] {
    // Executing the second sequential loop in 
    // parallel using Intel®TBB's parallel_for template function
    tbb::parallel_for(tbb::blocked_range<size_t>(0, length - row),
     [&](const tbb::blocked_range<size_t> &r) {
     // Iterating through the original input string
     // Each iteration of the following loop is executed in parallel concurrently 
     for (size_t index = r.begin(); index != r.end(); index++)
      // Assigning the current character from the input string
      // at the specific position of the current string in the resulting set
      bwt[row].string[index + row] = string[index];
    });
   });
   
   // Waiting for both parallel tasks to complete their execution
   g.wait();
  }
 });

正如我们从上面的并行代码中看到的,在这种情况下,为了并行化这些嵌套循环,我们基本上使用了嵌套并行性的一种特殊情况,即这些循环不是完美嵌套的,这使得无法将这些嵌套循环的执行折叠成一个单一的递归并行过程。在主循环的每次并行执行的迭代中,我们正在执行对同一 parallel_for 函数的后续调用,该函数生成另一个线程团队,该团队执行主循环内部嵌套的循环的迭代。

折叠嵌套循环

此时,让我们看一下实现 BWT 解码算法的代码。我们将讨论的优化热点是两个完美嵌套循环的并行化。在这种情况下,我们通常执行第一个循环来迭代生成的字符串集。在以下循环的每次迭代中,我们迭代当前字符串,将每个字符向右移动一个位置。通过执行插入,我们实际上处理二维数据,因为生成的字符串集实际上是字符矩阵。

为了优化这两个嵌套循环的性能,我们实际上需要将这些循环的并行执行折叠成一个单一的并行循环,其迭代是并发执行的。为此,我们将使用相同的 parallel_for 模板函数,并将 blocked_range2d 类的对象作为参数传递。使用此类的对象允许将这些嵌套循环的迭代递归地拆分为由二维线程网格执行的单一并行过程。blocked_range2d 类的构造函数有六个参数。第一个和第二个参数是分配给第一个循环每次迭代的循环计数器变量的值的下界和上界。第三个参数通常指定粒度。粒度是网格中单个线程执行的迭代次数。粒度值的用法是相当实验性的。在这种特定情况下,我们将粒度参数设置为第一个循环执行的总迭代次数的一半。这意味着总迭代次数将在两个并发线程之间共享。类似地,tbb::blocked_range2d 构造函数的接下来的三个参数是范围,这些范围的值将分配给第二个循环的循环计数器变量,以及粒度值。在这种情况下,粒度参数设置为第二个嵌套循环的迭代次数,这意味着网格中的每个特定线程将只执行第二个循环的一次迭代:

// Optimization hotspot: 1
  // Collapse two parallel nested loops that perform the insertion of a column
  // to the resulting set of string at the position before the first existing column
  tbb::task_group_context root(tbb::task_group_context::isolated);
  tbb::parallel_for(tbb::blocked_range2d<size_t>(0, length, length / 2, 0, length - 1, length),
   [&](const tbb::blocked_range2d<size_t> &r) {
   // Iterate through the resulting set and for each string move 
   // all characters to the next position right
   for (size_t row = r.rows().begin(); row != r.rows().end(); row++)
    // Iterate through the current string and copy each character
    // to the next position starting at the final position of the current string
    for (size_t col = r.cols().begin(); col != r.cols().end(); col++)
    {
     // Compute the current and previous position of the current character
     size_t rev_first = length - col - 2;
     size_t rev_second = length - col - 1;
     bwt[row].string[rev_second] = bwt[row].string[rev_first];
    }
  }, root);

与执行 BWT 编码的代码类似,执行实际解码的代码实现了主循环,在其每次迭代中,我们执行反向 BWT 变换以获得解码字符串。在这种特定情况下,执行实际反向变换的主循环无法完美并行化,因为该循环每次迭代的并行执行会引起数据流依赖问题。在以下循环的每次迭代中,我们实际上是在将由编码字符串字符组成的列插入到生成集的第一列之前。根据反向 BWT 算法的性质,必须按特定顺序插入新列才能在代码执行结束时获得解码字符串。这就是为什么我们不能并行化该循环执行的原因。

 // Do length - iterations sequentially to perform the inverse BWT transformation
 for (size_t index = 0; index < length; index++)
 {
  // Optimization hotspot: 1
  // Collapse two parallel nested loops that perform the insertion of a column
  // to the resulting set of string at the position before the first existing column
  tbb::task_group_context root(tbb::task_group_context::isolated);
  tbb::parallel_for(tbb::blocked_range2d<size_t>(0, length, length / 2, 0, length - 1, length),
   [&](const tbb::blocked_range2d<size_t> &r) {
   // Iterate through the resulting set and for each string move 
   // all characters to the next position right
   for (size_t row = r.rows().begin(); row != r.rows().end(); row++)
    // Iterate through the current string and copy each character
    // to the next position starting at the final position of the current string
    for (size_t col = r.cols().begin(); col != r.cols().end(); col++)
    {
     // Compute the current and previous position of the current character
     size_t rev_first = length - col - 2;
     size_t rev_second = length - col - 1;
     bwt[row].string[rev_second] = bwt[row].string[rev_first];
    }
  }, root);

  // Perform the parallel iteration through the characters of the original 
  // input string inserting each character at the first position of each string 
  // in the current resulting set
  tbb::parallel_for(tbb::blocked_range<size_t>(0, length),
   [&](const tbb::blocked_range<size_t> &r) {
   for (size_t row = r.begin(); row != r.end(); row++)
    bwt[row].string[0] = string[row];
  });

  // Performing parallel sort of the resulting set of strings 
  // lexicographically by using Intel®TBB's parallel_sort template function
  tbb::parallel_sort(bwt, bwt + length, [](const BWT& bwt1, const BWT& bwt2)
   { return (strcmp(bwt1.string, bwt2.string) < 0) ? 1 : 0; });
 }

线程同步

与 OpenMP 等其他并行库和包不同,将嵌套循环折叠成单一并行过程并不总是能确保并发执行单个折叠循环的每个迭代的线程团队之间存在适当的同步。在这种情况下,实际上需要线程同步机制来提供执行第一个循环迭代的线程与专门执行该第一个循环嵌套的第二个循环迭代的线程之间的同步。这种复杂的同步机制称为*“并行任务隔离”*。通过使用这些并行任务隔离,我们实际上在二维线程网格的每个行之间提供了同步。在执行执行多维数据处理的两个或多个折叠嵌套循环时,Intel TBB 同步机制实际上创建了一个并发并行任务树,它们之间的实际同步是一个非常复杂的过程,在并行代码执行期间会产生足够的开销。

为了放松线程同步过程,在这种特定情况下,Intel TBB 同步机制执行所谓的“并行任务取消”,这意味着“父”并行任务的取消实际上会取消树中所有“子”任务的执行。如果“父”任务当前被取消,则会导致当前被取消的父任务内部生成的子任务意外终止。这反过来通常会导致并行运行的任务未正确执行的情况。根据最新的 Intel TBB 指南,创建隔离的 task_group_context 根保护了第二个嵌套循环免受第一个循环在执行过程中向下传播的取消。为了提供每个并行任务隔离,我们只需要声明 task_group_context 类对象,并将 task_group_context::isolated 枚举值传递给其构造函数。然后,我们必须将此对象作为 parallel_for 函数的第三个参数传递。这通常会提供每个正在执行第二个嵌套循环的特定迭代的并发线程的隔离。

// Declare the task group context object to provide the threads isolation
tbb::task_group_context root(tbb::task_group_context::isolated);

// Use the task group context object as the last parameter of the parallel_for function
tbb::parallel_for(tbb::blocked_range2d<size_t>(0, length, length / 2, 0, length - 1, length),

       [&](const tbb::blocked_range2d<size_t> &r) {

for (size_t row = r.rows().begin(); row != r.rows().end(); row++)

            for (size_t col = r.cols().begin(); col != r.cols().end(); col++)

            {

               size_t rev_first = length - col - 2;

               size_t rev_second = length - col - 1;

               bwt[row].string[rev_second] = bwt[row].string[rev_first];

            }

       }, root);

并行字典序排序

在创建原始字符串的所有可能旋转的当前生成集后,我们通常会执行编码字符的排列,这通常通过按字典序对生成的字符串集进行排序来完成。正如您可能已经从实现编码算法的顺序代码中注意到的,我们特别使用了 C 标准库函数 qsort 来对生成的字符串集进行排序。通常,有许多函数可以执行实际排序,包括 qsort 和 std::sort STL 库函数以及 Intel®TBB 的 parallel_sort 模板函数。与 qsort 和 std::sort 不同,该函数通常执行并行归并排序,这反过来又可以极大地提高排序过程的性能。因此,为了提供更好的性能并同时精确遵循 Intel®TBB 库的使用,我们将使用该函数按字典序对当前生成的字符串集进行排序。

// Performing parallel sort of the resulting set of strings 
// lexicographically by using Intel®TBB's parallel_sort template function
tbb::parallel_sort(bwt, bwt + length, [](const BWT& bwt1, const BWT& bwt2)
 { return (strcmp(bwt1.string, bwt2.string) < 0) ? 1 : 0; });

使用 OpenMP 共享编码/解码过程

在本文的前几节中,我们讨论了使用各种 Intel®TBB 库函数来提高实现 BWT 算法的代码的性能。通常大量数据集的编码过程会导致使用 Intel®TBB 库实现的并行代码的可扩展性较差。因此,建议将要编码的整个缓冲区分成小块,并在并行执行的多个并发线程之间共享处理这些小块的代码的执行。为此,我们将使用 OpenMP 库的 #pragma omp parallel 指令构造。

根据使用此并行指令构造的概念,由 #pragma omp parallel 包围的并行代码区域由团队中的多个线程并发执行。为了共享整个编码/解码过程,我们将使用分区算法将整个字符串缓冲区划分为一系列块。作为使用分区算法的结果,在代码执行结束时,我们将获得一个编码块的数组。在执行实际编码之前,我们首先必须分配对象数组来存储编码字符串及其大小,如下所示:

BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, \
 NULL, sizeof(BWT) * 8, MEM_COMMIT, PAGE_READWRITE);

之后,我们需要将纯文本文件的内容读取到先前分配的缓冲区中。

load_from_file(argv[3], input);

根据分区算法,对于每个线程,我们必须获取当前线程 ID 和将执行执行实际编码/解码的并行代码片段的线程数。然后,我们将必须计算当前块的大小以及起始和结束位置。此时,我们还需要计算由团队中最后一个线程编码/解码的块的对齐。

// Obtain the thread-id of the current thread
int thread_id = omp_get_thread_num();
// Obtain the total number threads in a team
int n_threads = omp_get_num_threads();

// Compute the actual size of the current chunk
size_t size = (size_t)ceil(length / (float)n_threads);

// Compute the starting and final position of a chunk in the string buffer
size_t start = thread_id * size;
size_t end = (thread_id + 1) * size;

// Align the size of the chunk encoded/decoded by the last thread in a team
end = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? length : end;
size = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? (end - start) : size;

然后,我们将必须分配一个字符串缓冲区,当前块将从整个字符串缓冲区复制到该缓冲区,以及用于编码块的缓冲区。

// Allocate string buffer for the current chunk
char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
 (size + 1), MEM_COMMIT, PAGE_READWRITE);
::ZeroMemory((LPVOID)chunk, (size + 1));
// Copy the current chunk to the previously allocated buffer
::CopyMemory((LPVOID)chunk, (CONST VOID*)&input[start], size);

// Allocate the string buffer for the encoded data
char* encoded = (char*)::VirtualAllocEx(hProcHandle, NULL, \
 (size + 1), MEM_COMMIT, PAGE_READWRITE);
::ZeroMemory((LPVOID)encoded, (size + 1));

为了执行当前字符串数据块的实际编码,我们调用 parallel::tbb_bwt_encode_chunk(chunk, (size + 1), encoded) 函数。

在获得编码块后,我们需要将其内容复制到当前对象的字符串缓冲区中。

// Allocating buffer for the currently encoded chunk
bwt[thread_id].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
 (size + 1), MEM_COMMIT, PAGE_READWRITE);

// Assigning the length variable of the current object the value of the current chunks’ actual size
bwt[thread_id].length = size + 1;
// Copy encoded string into the string buffer of the current object
::CopyMemory((LPVOID)bwt[thread_id].string, (CONST VOID*)encoded, (size + 1));

正如我们已经讨论过的,在代码执行结束时,我们将获得包含每个编码块的对象数组。然后,我们将此数组写入二进制输出文件。

save_to_file(argv[4], output, filelen);

在解码过程中,我们需要为块对象数组分配缓冲区,并从二进制输出文件中读取编码块,然后对每个特定块执行实际解码。

BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, \
 NULL, sizeof(BWT) * 8, MEM_COMMIT, PAGE_READWRITE);

bwt_load_from_file(argv[3], bwt);

通常,解码过程与编码输入文件的过程略有不同。此时,我们通常从二进制输出文件中读取的对象数组中检索每个块。

在解码每个块之前,我们分配缓冲区来存储编码字符串。

char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
 bwt[thread_id].length, MEM_COMMIT, PAGE_READWRITE);
::ZeroMemory((LPVOID)chunk, bwt[thread_id].length);

然后,我们将编码字符串从特定块对象的字符串缓冲区复制到先前分配的临时缓冲区中。

::CopyMemory((LPVOID)chunk, (CONST VOID*)bwt[thread_id].string, bwt[thread_id].length);

之后,我们调用 parallel::tbb_bwt_decode_chunk(chunk, bwt[thread_id].length, bwt[thread_id].string) 函数来执行实际解码。通过使用此函数执行实际解码的结果是,我们获得解码字符串并将其复制到当前对象的字符串缓冲区中。在解码过程结束时,对象数组将包含字符串,每个字符串都是在解码过程中获得的。我们迭代地将数组中的每个字符串连接起来以获得整个解码字符串,然后将其以纯文本格式写入文件。

save_to_file(argv[4], output, filelen);

实现编码/解码过程共享的完整代码片段如下所示。

编码

  #pragma omp parallel
  {
   int thread_id = omp_get_thread_num();
   int n_threads = omp_get_num_threads();

   size_t size = (size_t)ceil(length / (float)n_threads);

   size_t start = thread_id * size;
   size_t end = (thread_id + 1) * size;

   end = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? length : end;
   size = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? (end - start) : size;

   HANDLE hProcHandle = ::GetCurrentProcess();
   char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
    (size + 1), MEM_COMMIT, PAGE_READWRITE);
   ::ZeroMemory((LPVOID)chunk, (size + 1));
   ::CopyMemory((LPVOID)chunk, (CONST VOID*)&input[start], size);

   char* encoded = (char*)::VirtualAllocEx(hProcHandle, NULL, \
    (size + 1), MEM_COMMIT, PAGE_READWRITE);
   ::ZeroMemory((LPVOID)encoded, (size + 1));

   parallel::tbb_bwt_encode_chunk(chunk, (size + 1), encoded);

   bwt[thread_id].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
    (size + 1), MEM_COMMIT, PAGE_READWRITE);

   bwt[thread_id].length = size + 1;
   ::CopyMemory((LPVOID)bwt[thread_id].string, (CONST VOID*)encoded, (size + 1));

   if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
    (size + 1), MEM_RELEASE);
   if (encoded != NULL) ::VirtualFreeEx(hProcHandle, encoded, \
    (size + 1), MEM_RELEASE);
  }

解码

#pragma omp parallel
  {
   int thread_id = omp_get_thread_num();
   int n_threads = omp_get_num_threads();

   HANDLE hProcHandle = ::GetCurrentProcess();
   char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
     bwt[thread_id].length, MEM_COMMIT, PAGE_READWRITE);
   ::ZeroMemory((LPVOID)chunk, bwt[thread_id].length);
   ::CopyMemory((LPVOID)chunk, (CONST VOID*)bwt[thread_id].string, bwt[thread_id].length);

   parallel::tbb_bwt_decode_chunk(chunk, bwt[thread_id].length, bwt[thread_id].string);

   if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
    bwt[thread_id].length, MEM_RELEASE);
  }

  for (size_t index = 0; index < 8; index++)
   strncat(output, bwt[index].string, bwt[index].length);

  if (output != NULL && strlen(output) > 0)
   length = strlen(output);

如何编译和运行此代码

要编译此代码,请使用以下命令:

Debug x64
icl /GS /W3 /Zc:wchar_t /ZI /Od /Fd".\vc140.pdb" /Qopenmp /Qstd=c++11 /Zc:forScope /RTC1 /MDd /Fa /EHs /Fo /Qprof-dir .\ /Fp"bwt.pch" bwt.cpp /link /OUT:"bwt.exe" /MANIFEST /NXCOMPAT /PDB:"bwt.pdb" /DYNAMICBASE "kernel32.lib" "user32.lib" "gdi32.lib" "winspool.lib" "comdlg32.lib" "advapi32.lib" "shell32.lib" "ole32.lib" "oleaut32.lib" "uuid.lib" "odbc32.lib" "odbccp32.lib" "tbb.lib" "tbb_debug.lib" /DEBUG /MACHINE:X64 /INCREMENTAL /SUBSYSTEM:CONSOLE
Release x64
icl /GS /W3 /Gy /Zc:wchar_t /Zi /O2 /Zc:forScope /Oi /MD /Fa /EHs /Fo /Qopenmp /Qprof-dir .\ /Fp"bwt.pch"  bwt.cpp /link  /OUT:"bwt.exe" /MANIFEST /NXCOMPAT /PDB:"bwt.pdb" /DYNAMICBASE "kernel32.lib" "user32.lib" "gdi32.lib" "winspool.lib" "comdlg32.lib" "advapi32.lib" "shell32.lib" "ole32.lib" "oleaut32.lib" "uuid.lib" "odbc32.lib" "odbccp32.lib" "tbb.lib" "tbb_debug.lib" /MACHINE:X64 /OPT:REF /INCREMENTAL:NO /SUBSYSTEM:CONSOLE

要使用此程序对文本文件进行编码/解码,请使用以下命令:

编码

bwt.exe -e -s sample.txt sample.dat  -  for serial code execution
bwt.exe -e -p sample.txt sample.dat -  for parallel code execution

解码

bwt.exe -d -s sample.dat output.txt  -  for serial code execution
bwt.exe -d -p sample.dat output.txt -  for parallel code execution

性能评估

在本节中,我们将讨论执行 Burrows-Wheeler 变换 (BWT) 的并行代码的性能评估。此时,我们将使用 Intel VTune Amplifier XE 来衡量使用 Intel TBB 和 OpenMP 库优化的以下并行代码的实际性能加速。特别是,我们将分析性能测量工具提供的结果,例如理想执行 CPU 时间、用于并行任务调度的自旋开销时间。此外,我们还将分析并行执行时间与 OpenMP 区域执行的潜在增益。

我们将强调的第一个方面是 CPU 理想执行时间,它占总 CPU 执行时间的 86.3%。通常,使用这些多线程并行库和包,尤其是 OpenMP 库,可以使以下并行代码的执行速度比执行复杂 Burrows-Wheeler 变换 (BWT) 的旧顺序代码快 3 倍。提供使用 Intel® TBB 或 OpenMP 库来提高性能的混合并行代码通常提供最低的开销时间,这主要用于线程调度和其他目的,例如线程同步。然而,在以下并行代码执行期间,CPU 忙于执行其他应用程序线程的自旋时间可能会略高于正常水平。有时,允许一些自旋时间以确保所有线程的执行正确同步是比较可取的。

与串行执行时间相比,此代码的并行区域时间通常非常高,约为 99.9%,这意味着该代码是完美并行的。然而,一些并行区域具有一定的潜在增益,特别是那些使用 OpenMP 库并行化的区域。该代码的总潜在增益约为 20.6%。在并行运行此代码时,我们通常会超额分配实际的硬件线程数量,因为 OpenMP 并行区域是在使用 Intel TBB 库并行化的代码之上执行的。

从上面的统计图可以看出,TBB 调度器内部函数是导致代码执行期间显著 CPU 负载的热点。然而,由以下并行代码产生的总 CPU 负载是正常或理想的,而不是差的。

此时,让我们仔细看看 CPU 的逻辑内核在以下代码执行期间是如何同时被利用的。

从上图可以看出,只有 8 个逻辑核心中的 4 个被有效利用。这实际上意味着该代码没有产生足够的 CPU 工作负载,因为 OpenMP 下方的多个 Intel TBB 并行区域的执行通常不会完全将并行任务分配给多个 CPU 核心。通常,Intel TBB 线程调度机制不支持静态调度,而是动态处理并行任务调度。在这种情况下,平均 CPU 使用率和目标 CPU 利用率相等,并且可以估计为自并行代码执行开始以来的 30 秒。

最后,在具有 8 个逻辑核心的 CPU 上,以下并行代码单次实例执行的总持续时间达到 3.355 秒,与用于完成相同任务的顺序代码执行相比,这是一个相当不错的分数。

使用代码

实现并行可扩展 Burrows-Wheeler 变换算法的代码如下所示。

#include <tbb/tbb.h>
#include <tbb/parallel_for.h>
#include <tbb/parallel_sort.h>
#include <tbb/blocked_range2d.h>

#include <time.h>
#include <omp.h>

typedef struct tagBWT
{
	char* string;
	size_t length;
}BWT;

int cmp_proc(const void * a, const void * b)
{
	return strcmp((*(BWT*)a).string, (*(BWT*)b).string);
}

namespace serial
{
	void c_bwt_encode_chunk(char* string, size_t length, char* output)
	{
		HANDLE hProcHandle = ::GetCurrentProcess();
		BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, NULL, \
			sizeof(BWT) * length, MEM_COMMIT, PAGE_READWRITE);
		bwt[0].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
			length, MEM_COMMIT, PAGE_READWRITE);
		::ZeroMemory((LPVOID)bwt[0].string, length);
		::CopyMemory((LPVOID)bwt[0].string, string, length);

		for (size_t row = 1; row < length; row++)
		{
			bwt[row].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				length, MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)bwt[row].string, length);

			for (size_t index = length - row; index < length; index++)
				bwt[row].string[index - (length - row)] = string[index];

			for (size_t index = 0; index < length - row; index++)
				bwt[row].string[index + row] = string[index];
		}

		qsort((void*)bwt, length, sizeof(BWT), cmp_proc);

		for (size_t row = 0; row < length; row++)
			output[row] = bwt[row].string[length - 1];

		output[length] = '\0';

		for (size_t index = 0; index < length; index++)
			if (bwt[index].string != NULL)
				::VirtualFreeEx(hProcHandle, bwt[index].string, length, MEM_RELEASE);

		if (bwt != NULL)
			::VirtualFreeEx(hProcHandle, bwt, sizeof(BWT) * length, MEM_RELEASE);
	}

	void c_bwt_decode_chunk(char* string, size_t length, char* output)
	{
		HANDLE hProcHandle = ::GetCurrentProcess();
		BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, NULL, \
			sizeof(BWT) * length, MEM_COMMIT, PAGE_READWRITE);
		for (size_t row = 0; row < length; row++)
		{
			bwt[row].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				length, MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)bwt[row].string, length);
		}

		for (size_t index = 0; index < length; index++)
		{
			for (size_t row = 0; row < length; row++)
				for (size_t col = 0; col < length - 1; col++)
				{
					size_t rev_first = length - col - 2;
					size_t rev_second = length - col - 1;
					bwt[row].string[rev_second] = bwt[row].string[rev_first];
				}

			for (size_t row = 0; row < length; row++)
				bwt[row].string[0] = string[row];

			qsort((void*)bwt, length, sizeof(BWT), cmp_proc);
		}

		for (size_t index = 0; index < length; index++)
			if (bwt[index].string[length - 1] == '\0')
				::CopyMemory((LPVOID)output, (CONST VOID*)bwt[index].string, length);

		output[length] = '\0';

		for (size_t index = 0; index < length; index++)
			if (bwt[index].string != NULL)
				::VirtualFreeEx(hProcHandle, bwt[index].string, length, MEM_RELEASE);

		if (bwt != NULL)
			::VirtualFreeEx(hProcHandle, bwt, sizeof(BWT) * length, MEM_RELEASE);
	}

	void c_bwt_encode(char* input, size_t length, BWT*& bwt)
	{
		int n_threads = omp_get_num_threads();
		for (int chunk_id = 0; chunk_id < n_threads; chunk_id++)
		{
			size_t size = (size_t)ceil(length / (float)n_threads);

			size_t start = chunk_id * size;
			size_t end = (chunk_id + 1) * size;

			end = ((length % n_threads) > 0 && chunk_id == n_threads - 1) ? length : end;
			size = ((length % n_threads) > 0 && chunk_id == n_threads - 1) ? (end - start) : size;

			HANDLE hProcHandle = ::GetCurrentProcess();
			char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)chunk, (size + 1));
			::CopyMemory((LPVOID)chunk, (CONST VOID*)&input[start], size);

			char* encoded = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)encoded, (size + 1));

			serial::c_bwt_encode_chunk(chunk, (size + 1), encoded);

			bwt[chunk_id].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);

			bwt[chunk_id].length = size + 1;
			::CopyMemory((LPVOID)bwt[chunk_id].string, (CONST VOID*)encoded, (size + 1));

			if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
				(size + 1), MEM_RELEASE);
			if (encoded != NULL) ::VirtualFreeEx(hProcHandle, encoded, \
				(size + 1), MEM_RELEASE);
		}
	}

	void c_bwt_decode(const BWT* bwt, char* output, size_t& length)
	{
		int n_threads = omp_get_num_threads();
		for (int chunk_id = 0; chunk_id < n_threads; chunk_id++)
		{
			HANDLE hProcHandle = ::GetCurrentProcess();
			char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				bwt[chunk_id].length, MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)chunk, bwt[chunk_id].length);
			::CopyMemory((LPVOID)chunk, (CONST VOID*)bwt[chunk_id].string, bwt[chunk_id].length);

			serial::c_bwt_decode_chunk(chunk, bwt[chunk_id].length, bwt[chunk_id].string);

			if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
				bwt[chunk_id].length, MEM_RELEASE);
		}

		for (size_t index = 0; index < n_threads; index++)
			strncat(output, bwt[index].string, bwt[index].length);

		if (output != NULL && strlen(output) > 0)
			length = strlen(output);
	}
}

namespace parallel
{
	void tbb_bwt_encode_chunk(char* string, size_t length, char* output)
	{
		HANDLE hProcHandle = ::GetCurrentProcess();

		BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, NULL, \
			sizeof(BWT) * length, MEM_COMMIT, PAGE_READWRITE);
		bwt[0].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
			length, MEM_COMMIT, PAGE_READWRITE);
		::CopyMemory((LPVOID)bwt[0].string, (CONST VOID*)string, length);

		tbb::parallel_for(tbb::blocked_range<size_t>(1, length),
			[&](const tbb::blocked_range<size_t> &r) {
			for (size_t row = r.begin(); row != r.end(); row++)
			{
				bwt[row].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
					length, MEM_COMMIT, PAGE_READWRITE);
				::ZeroMemory((LPVOID)bwt[row].string, length);

				tbb::task_group g;

				g.run([&] {
					tbb::parallel_for(tbb::blocked_range<size_t>(length - row, length),
						[&](const tbb::blocked_range<size_t> &r) {
						for (size_t index = r.begin(); index != r.end(); index++)
							bwt[row].string[index - (length - row)] = string[index];
					});
				});

				g.run([&] {
					tbb::parallel_for(tbb::blocked_range<size_t>(0, length - row),
						[&](const tbb::blocked_range<size_t> &r) {
						for (size_t index = r.begin(); index != r.end(); index++)
							bwt[row].string[index + row] = string[index];
					});
				});

				g.wait();
			}
		});

		tbb::parallel_sort(bwt, bwt + length, [](const BWT& bwt1, const BWT& bwt2)
			{ return (strcmp(bwt1.string, bwt2.string) < 0) ? 1 : 0; });

		tbb::parallel_for(tbb::blocked_range<size_t>(0, length),
			[&](const tbb::blocked_range<size_t> &r) {
			for (size_t row = r.begin(); row != r.end(); row++)
				output[row] = bwt[row].string[length - 1];
		});

		tbb::parallel_for(static_cast<size_t>(0), length, [&](size_t row) {
			if (bwt[row].string != NULL) {
				::VirtualFreeEx(hProcHandle, bwt[row].string, \
					length, MEM_RELEASE);
				bwt[row].string = NULL;
			}});

		if (bwt != NULL) ::VirtualFreeEx(hProcHandle, bwt, \
			sizeof(BWT) * length, MEM_RELEASE);
	}

	void tbb_bwt_decode_chunk(char* string, size_t length, char* output)
	{
		HANDLE hProcHandle = ::GetCurrentProcess();

		BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, NULL, \
			sizeof(BWT) * length, MEM_COMMIT, PAGE_READWRITE);
		tbb::parallel_for(static_cast<size_t>(0), length, [&](size_t row) {
			bwt[row].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				length, MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)bwt[row].string, length);
		});

		for (size_t index = 0; index < length; index++)
		{

                        tbb::task_group_context root(tbb::task_group_context::isolated);
			tbb::parallel_for(tbb::blocked_range2d<size_t>(0, length, length / 2, 0, length - 1, length),
				[&](const tbb::blocked_range2d<size_t> &r) {
				for (size_t row = r.rows().begin(); row != r.rows().end(); row++)
					for (size_t col = r.cols().begin(); col != r.cols().end(); col++)
					{
						size_t rev_first = length - col - 2;
						size_t rev_second = length - col - 1;
						bwt[row].string[rev_second] = bwt[row].string[rev_first];
					}
			}, root);

			tbb::parallel_for(tbb::blocked_range<size_t>(0, length),
				[&](const tbb::blocked_range<size_t> &r) {
				for (size_t row = r.begin(); row != r.end(); row++)
					bwt[row].string[0] = string[row];
			});

			tbb::parallel_sort(bwt, bwt + length, [](const BWT& bwt1, const BWT& bwt2)
				{ return (strcmp(bwt1.string, bwt2.string) < 0) ? 1 : 0; });
		}

		tbb::parallel_for(tbb::blocked_range<size_t>(0, length),
			[&](const tbb::blocked_range<size_t> &r) {
			for (size_t index = r.begin(); index != r.end(); index++)
				if (bwt[index].string[length - 1] == '\0')
					::CopyMemory((LPVOID)output, (CONST VOID*)bwt[index].string, length);
		});

		output[length] = '\0';

		tbb::parallel_for(static_cast<size_t>(0), length, [&](size_t row) {
			if (bwt[row].string != NULL) {
				::VirtualFreeEx(hProcHandle, bwt[row].string, \
					length, MEM_RELEASE);
				bwt[row].string = NULL;
			}
		});

		if (bwt != NULL) ::VirtualFreeEx(hProcHandle, bwt, \
			sizeof(BWT) * length, MEM_RELEASE);
	}

	void omp_bwt_encode(char* input, size_t length, BWT*& bwt)
	{
		#pragma omp parallel
		{
			int thread_id = omp_get_thread_num();
			int n_threads = omp_get_num_threads();

			size_t size = (size_t)ceil(length / (float)n_threads);

			size_t start = thread_id * size;
			size_t end = (thread_id + 1) * size;

			end = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? length : end;
			size = ((length % n_threads) > 0 && thread_id == n_threads - 1) ? (end - start) : size;

			HANDLE hProcHandle = ::GetCurrentProcess();
			char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)chunk, (size + 1));
			::CopyMemory((LPVOID)chunk, (CONST VOID*)&input[start], size);

			char* encoded = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)encoded, (size + 1));

			parallel::tbb_bwt_encode_chunk(chunk, (size + 1), encoded);

			bwt[thread_id].string = (char*)::VirtualAllocEx(hProcHandle, NULL, \
				(size + 1), MEM_COMMIT, PAGE_READWRITE);

			bwt[thread_id].length = size + 1;
			::CopyMemory((LPVOID)bwt[thread_id].string, (CONST VOID*)encoded, (size + 1));

			if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
				(size + 1), MEM_RELEASE);
			if (encoded != NULL) ::VirtualFreeEx(hProcHandle, encoded, \
				(size + 1), MEM_RELEASE);
		}
	}

	void omp_bwt_decode(const BWT* bwt, char* output, size_t& length)
	{
		#pragma omp parallel
		{
			int thread_id = omp_get_thread_num();
			int n_threads = omp_get_num_threads();

			HANDLE hProcHandle = ::GetCurrentProcess();
			char* chunk = (char*)::VirtualAllocEx(hProcHandle, NULL, \
					bwt[thread_id].length, MEM_COMMIT, PAGE_READWRITE);
			::ZeroMemory((LPVOID)chunk, bwt[thread_id].length);
			::CopyMemory((LPVOID)chunk, (CONST VOID*)bwt[thread_id].string, bwt[thread_id].length);

			parallel::tbb_bwt_decode_chunk(chunk, bwt[thread_id].length, bwt[thread_id].string);

			if (chunk != NULL) ::VirtualFreeEx(hProcHandle, chunk, \
				bwt[thread_id].length, MEM_RELEASE);
		}

		for (size_t index = 0; index < 8; index++)
			strncat(output, bwt[index].string, bwt[index].length);

		if (output != NULL && strlen(output) > 0)
			length = strlen(output);
	}
}

DWORD g_BytesTransferred = 0;

VOID CALLBACK FileIOCompletionRoutine(
	__in  DWORD dwErrorCode,
	__in  DWORD dwNumberOfBytesTransfered,
	__in  LPOVERLAPPED lpOverlapped
);

VOID CALLBACK FileIOCompletionRoutine(
	__in  DWORD dwErrorCode,
	__in  DWORD dwNumberOfBytesTransfered,
	__in  LPOVERLAPPED lpOverlapped)
{
	g_BytesTransferred = dwNumberOfBytesTransfered;
}

size_t load_from_file(const char* filename, void*& buffer)
{
	LARGE_INTEGER lpFileSize = { 0 };
	HANDLE hProcHandle = ::GetCurrentProcess();
	HANDLE hFile = ::CreateFile(filename, GENERIC_READ, FILE_SHARE_READ, NULL, \
		OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);

	if (hFile == INVALID_HANDLE_VALUE)
	{
		fprintf(stdout, "Unable to open file %s...\n", filename);
		return 0;
	}

	::GetFileSizeEx(hFile, &lpFileSize);

	buffer = ::VirtualAllocEx(hProcHandle, \
		NULL, lpFileSize.LowPart, MEM_COMMIT, PAGE_READWRITE);

	OVERLAPPED overlapped = { 0 };
	if (!::ReadFileEx(hFile, buffer, \
			lpFileSize.LowPart, &overlapped, FileIOCompletionRoutine))
	{
		fprintf(stdout, "Unable to read file %s...\n", filename);
		return 0;
	}

	::CloseHandle(hFile);

	return lpFileSize.LowPart;
}

size_t save_to_file(const char* filename, LPVOID buffer, size_t length)
{
	DWORD dwBytesWritten = 0;
	HANDLE hProcHandle = ::GetCurrentProcess();
	HANDLE hFile = ::CreateFile(filename, GENERIC_WRITE, NULL, NULL, \
		CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);

	if (hFile == INVALID_HANDLE_VALUE)
	{
		fprintf(stdout, "Unable to create file %s...\n", filename);
		return 0;
	}

	OVERLAPPED overlapped = { 0 };
	if (!::WriteFileEx(hFile, buffer, \
		length, &overlapped, FileIOCompletionRoutine))
	{
		fprintf(stdout, "Unable to write to file %s...\n", filename);
		return 0;
	}

	if (g_BytesTransferred > 0 && g_BytesTransferred <= length - 1)
		length = g_BytesTransferred;

	if (buffer != NULL) 
		::VirtualFreeEx(hProcHandle, buffer, length, MEM_RELEASE);

	::CloseHandle(hFile);

	return length;
}

size_t bwt_load_from_file(const char* filename, BWT*& bwt)
{
	size_t length = 0L;
	LARGE_INTEGER lpFileSize = { 0 };
	HANDLE hProcHandle = ::GetCurrentProcess();
	HANDLE hFile = ::CreateFile(filename, GENERIC_READ, FILE_SHARE_READ, NULL, \
		OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);

	if (hFile == INVALID_HANDLE_VALUE)
	{
		fprintf(stdout, "Unable to open file %s...\n", filename);
		return 0;
	}

	for (size_t index = 0; index < 8; index++)
	{
		DWORD dwBytesRead = 0L;
		OVERLAPPED overlapped = { 0 };
		if (!::ReadFile(hFile, (void*)&bwt[index].length, \
			sizeof(size_t), &dwBytesRead, NULL))
		{
			fprintf(stdout, "Unable to read file %s...\n", filename);
			return 0;
		}

		length += bwt[index].length;

		bwt[index].string = (char*)::VirtualAllocEx(hProcHandle, \
			NULL, bwt[index].length, MEM_COMMIT, PAGE_READWRITE);

		if (!::ReadFile(hFile, (void*)bwt[index].string, \
			bwt[index].length, &dwBytesRead, NULL))
		{
			fprintf(stdout, "Unable to read file %s...\n", filename);
			return 0;
		}
	}

	::CloseHandle(hFile);

	return length;
}

size_t bwt_save_to_file(const char* filename, const BWT* bwt, size_t length)
{
	HANDLE hProcHandle = ::GetCurrentProcess();
	HANDLE hFile = ::CreateFile(filename, GENERIC_WRITE, NULL, NULL, \
		CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);

	if (hFile == INVALID_HANDLE_VALUE)
	{
		fprintf(stdout, "Unable to create file %s...\n", filename);
		return 0;
	}

	for (size_t index = 0; index < length; index++)
	{
		DWORD dwBytesWritten = 0L;
		OVERLAPPED overlapped = { 0 };
		if (!::WriteFile(hFile, (void*)&bwt[index].length, \
			sizeof(size_t), &dwBytesWritten, NULL))
		{
			fprintf(stdout, "Unable to write to file %s...\n", filename);
			return 0;
		}

		if (!::WriteFile(hFile, (void*)bwt[index].string, \
			bwt[index].length, &dwBytesWritten, NULL))
		{
			fprintf(stdout, "Unable to write to file %s...\n", filename);
			return 0;
		}
	}

	::CloseHandle(hFile);

	return length;
}

int main(int argc, char* argv[])
{
	clock_t time_start, time_end;

	char logo[1024] = "Burrows-Wheeler Parallel Tranformation using Intel(R) TBB for C++ "\
		"(BWTP v.1.00.16384) by Arthur V. Ratz\n""\n Usage: bwt [options] [mode] file1 file2\n"\
		"\nOptions:\n\n  -e\tencode *.txt file\n  -d\tdecode *.txt file\n"
		"\nModes:\n\n -s\tserial execution\n -p\tparallel execution";

	if (argc < 5)
	{
		fprintf(stdout, "%s\n", logo);
		return 0;
	}

	size_t filelen = 0;
	void* input = NULL;
	HANDLE hProcHandle = ::GetCurrentProcess();
	if (!strcmp(argv[1], "-e"))
	{
		if ((filelen = load_from_file(argv[3], input)) > 0)
		{
			fprintf(stdout, "Encoding data...\n");

			char* string = reinterpret_cast<char*>(input);
			BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, \
				NULL, sizeof(BWT) * 8, MEM_COMMIT, PAGE_READWRITE);
			
			time_start = clock();

			if (!strcmp(argv[2], "-p"))
				parallel::omp_bwt_encode(string, filelen, bwt);
			else if (!strcmp(argv[2], "-s")) 
				serial::c_bwt_encode(string, filelen, bwt);

			time_end = clock();

			if (filelen > 0)
				bwt_save_to_file(argv[4], bwt, 8);

			for (int index = 0; index < 8; index++)
				if (bwt[index].string != NULL) {
					::VirtualFreeEx(hProcHandle, bwt[index].string, \
						bwt[index].length, MEM_RELEASE);
					bwt[index].string = NULL;
				}

			if (bwt != NULL) ::VirtualFreeEx(hProcHandle, bwt, \
				sizeof(BWT) * 8, MEM_RELEASE);
			if (input != NULL) ::VirtualFreeEx(hProcHandle, input, \
				filelen, MEM_RELEASE);
		}

		fprintf(stdout, "Execution Time: %6.4f secs\n", (time_end - time_start) / (float)CLOCKS_PER_SEC);
	}
	
	else if (!strcmp(argv[1], "-d"))
	{
		BWT* bwt = (BWT*)::VirtualAllocEx(hProcHandle, \
			NULL, sizeof(BWT) * 8, MEM_COMMIT, PAGE_READWRITE);

		if ((filelen = bwt_load_from_file(argv[3], bwt)) > 0)
		{
			fprintf(stdout, "Decoding data...\n");

			char* output = (char*)::VirtualAllocEx(hProcHandle, \
				NULL, filelen, MEM_COMMIT, PAGE_READWRITE);

			time_start = clock();

			if (!strcmp(argv[2], "-p"))
				parallel::omp_bwt_decode(bwt, output, filelen);
			else if (!strcmp(argv[2], "-s"))
				serial::c_bwt_decode(bwt, output, filelen);

			time_end = clock();

			if (filelen > 0)
				save_to_file(argv[4], output, filelen);

			if (output != NULL) ::VirtualFreeEx(hProcHandle, output, \
				filelen, MEM_RELEASE);
		}

		fprintf(stdout, "Execution Time: %6.4f secs\n", (time_end - time_start) / (float)CLOCKS_PER_SEC);

		for (int index = 0; index < 8; index++)
			if (bwt[index].string != NULL) {
				::VirtualFreeEx(hProcHandle, bwt[index].string, \
					bwt[index].length, MEM_RELEASE);
				bwt[index].string = NULL;
			}

		if (bwt != NULL) ::VirtualFreeEx(hProcHandle, bwt, \
			sizeof(BWT) * 8, MEM_RELEASE);
	}

	return 0;
}

关注点

在本文中,我们讨论了如何提高执行纯文本(*.txt)格式文件的顺序编码/解码的代码的性能。我们仔细研究了算法本身及其计算复杂度。

特别是,我们讨论了实现实际编码/解码的 C++ 顺序代码片段,并对以下代码进行了详细分析,以确定基本性能优化热点。在讨论过程中,我们解释了如何使用特定的 OpenMP 并行指令构造和 Intel® TBB 库函数模板将顺序代码转换为并行代码,从而极大地提高编码/解码过程的性能。

除了使用 Intel® TBB 的模板函数来实现并行循环的方面之外,在本文中,我们还了解了如何使用 OpenMP 库的特定并行指令构造,以基于分区算法在所有 CPU 线程之间完美地共享大量数据集的整个编码/解码过程。

此外,仅为实验目的,为了提供本文介绍的并行代码更好的性能和可扩展性,如果有人会可以将以下代码的并行区域的执行卸载到 Intel® Xeon™ Phi 协处理器上,其“许多集成核心”架构将为通常缓慢的顺序代码执行复杂的复杂数据转换提供出色的性能。或者,可以使用相同的并行编程概念来现代化以下并行代码,以使用 Nvidia CUDA GPU 执行。

历史

  • 2017 年 5 月 1 日 - 本文的第一个修订版已发布。
  • 2017 年 5 月 2 日 - 本文的第二个修订版已发布。
© . All rights reserved.