使用 Intel C++ 编译器和 OpenMP 4.5 库实现高效并行三路快速排序






4.88/5 (80投票s)
在本文中,我将介绍 C++11 中的现代代码,实现并行三路快速排序,该算法在渐近上比著名的堆排序和归并排序算法更快、更高效。
注意:请阅读 Intel® Developer Zone 上的原文:使用 Intel C++ 编译器和 OpenMP 4.5 库实现高效并行三路快速排序。
引言
"排序在商业数据处理和现代科学计算中扮演着重要角色。实际上,一种排序算法被评为 20 世纪科学与工程领域的十大算法之一"。
Robert Sedgewick, Kevin Wayne, "Algorithms 4th Edition".
在本文中,我将介绍 C++11 中的现代代码,实现并行三路快速排序,该算法在渐近上比著名的堆排序和归并排序算法更快、更高效。此外,本文讨论的并行代码与现有快速排序实现(如 `qsort(...)` (ANSI C) 和 `std::sort(...)` (ISO/IEC 14882(E)))相比,提供了更高的性能提升。具体来说,我将深入探讨三路快速排序算法及其复杂性。此外,我将详细讨论如何使用 Intel C++ 编译器和 OpenMP 4.5 库提供现代代码,以并行运行排序。我将解释如何使用 OpenMP 的并发任务来并行化递归子排序,从而大幅提高整体性能。最后,我将介绍一个示例程序,演示并行排序在配备对称多核 Intel CPU(如 Intel® Core™ i9 和 Intel® Xeon™ E5)的机器上运行,并同时评估其与作为 C++ 标准库一部分实现的 `std::sort(...)` 执行的排序的性能比较。
三路快速排序算法
执行三路快速排序的主要思想是将整个数组划分为三个后续分区,分别由小于、等于或大于枢轴元素值的元素组成。然后,通过执行相同的三路分区递归排序左分区和右分区。使用三路快速排序可以将排序的最佳情况计算复杂度从线性对数 O(n log n) 降低到线性 O(n),同时排序包含大量元素的数组。以下算法比著名的堆排序和归并排序算法快 O((n log n) / n) = O(log n) 倍。此外,三路快速排序是缓存一致的,极大地影响了 CPU 的缓存流水线,从而影响了排序的整体性能。
三路快速排序算法主要基于对数组中每个元素从左到右进行单次遍历。数组中的每个元素都使用下面讨论的三路比较与枢轴值进行比较。三路比较处理三种主要情况:当元素小于、大于或等于枢轴值时。如果元素小于枢轴值,则将其交换到左分区,否则,如果元素大于枢轴值,则将其交换到右分区。此外,等于枢轴值的元素从不交换。在此过程中,它分别维护左、右和 i 三个索引。索引 i 用于访问数组中的每个元素,而左和右索引用于将元素交换到特定分区。
三路快速排序算法可以表述如下:
- 设 arr[0..N] 为元素数组,low、high 为第一个和最后一个元素的索引;
- 选择第一个元素的值作为枢轴 (pivot = arr[low]);
- 将左变量初始化为第一个元素的索引 (left = low);
- 将右变量初始化为最后一个元素的索引 (right = high);
- 将变量 i 初始化为第二个元素的索引 (i = low + 1);
- 对于数组中每个第 i 个元素 arr[i] (i <= high),执行以下操作:
比较第 i 个元素 arr[i] 与枢轴值
- 如果第 i 个元素 arr[i] 小于枢轴值,则将其与左索引处的元素交换,并将 left 和 i 索引增加 1;
- 如果第 i 个元素 arr[i] 大于枢轴值,则将其与右索引处的元素交换,并将 right 索引减小 1;
- 否则,不执行任何交换,并将索引 i 增加 1;
- 递归排序数组的左分区 arr[low..left - 1];
- 递归排序数组的右分区 arr[right + 1..high];
执行三路分区后,以下算法独立地递归排序左分区和右分区。此操作可以通过生成两个并发任务来执行,这两个任务将并行执行递归排序。
高效并行排序
下面列出的 C++11 现代代码实现了并行三路快速排序算法
namespace internal
{
std::size_t g_depth = 0L;
const std::size_t cutoff = 1000000L;
template<class RanIt, class _Pred>
void qsort3w(RanIt _First, RanIt _Last, _Pred compare)
{
if (_First >= _Last) return;
std::size_t _Size = 0L; g_depth++;
if ((_Size = std::distance(_First, _Last)) > 0)
{
RanIt _LeftIt = _First, _RightIt = _Last;
bool is_swapped_left = false, is_swapped_right = false;
typename std::iterator_traits<RanIt>::value_type _Pivot = *_First;
RanIt _FwdIt = _First + 1;
while (_FwdIt <= _RightIt)
{
if (compare(*_FwdIt, _Pivot))
{
is_swapped_left = true;
std::iter_swap(_LeftIt, _FwdIt);
_LeftIt++; _FwdIt++;
}
else if (compare(_Pivot, *_FwdIt)) {
is_swapped_right = true;
std::iter_swap(_RightIt, _FwdIt);
_RightIt--;
}
else _FwdIt++;
}
if (_Size >= internal::cutoff)
{
#pragma omp taskgroup
{
#pragma omp task untied mergeable
if ((std::distance(_First, _LeftIt) > 0) && (is_swapped_left))
qsort3w(_First, _LeftIt - 1, compare);
#pragma omp task untied mergeable
if ((std::distance(_RightIt, _Last) > 0) && (is_swapped_right))
qsort3w(_RightIt + 1, _Last, compare);
}
}
else
{
#pragma omp task untied mergeable
{
if ((std::distance(_First, _LeftIt) > 0) && is_swapped_left)
qsort3w(_First, _LeftIt - 1, compare);
if ((std::distance(_RightIt, _Last) > 0) && is_swapped_right)
qsort3w(_RightIt + 1, _Last, compare);
}
}
}
}
template<class BidirIt, class _Pred >
void parallel_sort(BidirIt _First, BidirIt _Last, _Pred compare)
{
std::size_t pos = 0L; g_depth = 0L;
if (!misc::sorted(_First, _Last, pos, compare))
{
#pragma omp parallel num_threads(12)
#pragma omp master
internal::qsort3w(_First, _Last - 1, compare);
}
}
}
在此代码中,我们顺序执行三路分区,因为它通常会产生数据流依赖性,因此无法并行运行。此外,在这种情况下,不需要并行优化,因为三路分区的复杂度始终为 O(n),提供了足够的性能提升。
反过来,执行递归排序的代码片段可以轻松并行运行,从而大大提高排序的整体性能。为了执行并行递归排序,我实现了在执行时使用 `#pragma omp taskgroup {}` 指令创建一组两个并发 OpenMP 任务的代码。这两个任务都通过使用 OpenMP 的 `#pragma omp task untied mergeable {}` 指令进行调度和启动,在各自独立的线程中执行递归排序。我使用了 `untied` 子句来确保后续任务由多个线程执行。我还指定了 `mergeable` 子句,以便任务将使用与生成此任务的代码相同的数据上下文。
if (_Size >= internal::cutoff)
{
#pragma omp taskgroup
{
#pragma omp task untied mergeable
if ((std::distance(_First, _LeftIt) > 0) && (is_swapped_left))
qsort3w(_First, _LeftIt - 1, compare);
#pragma omp task untied mergeable
if ((std::distance(_RightIt, _Last) > 0) && (is_swapped_right))
qsort3w(_RightIt + 1, _Last, compare);
}
}
else
{
#pragma omp task untied mergeable
{
if ((std::distance(_First, _LeftIt) > 0) && is_swapped_left)
qsort3w(_First, _LeftIt - 1, compare);
if ((std::distance(_RightIt, _Last) > 0) && is_swapped_right)
qsort3w(_RightIt + 1, _Last, compare);
}
}
第一个被调度的任务执行左分区的排序,而第二个任务则执行右分区的排序。
上面列出的代码实际上执行的是条件任务。在生成任务之前,它会检查是否需要对分区进行排序,并且我们没有对空分区进行排序。如果满足条件,它会生成一个任务,该任务递归调用 `qsort3w(...)` 函数来执行实际的排序。
这里还有一个并行三路快速排序的有效优化。在某些情况下,当排序包含大量重复元素的数组时,排序的递归深度可能会增加,代码将生成数量庞大的并行任务,从而产生大量的同步和任务调度开销时间。为了避免出现此问题,我实现了一个代码片段,该片段首先检查正在排序的数组大小是否超过了特定的截止边界。如果超过,它通常会像上面已经讨论的那样创建一组两个并发任务。否则,它会将两个递归调用合并到由单个线程执行的 `qsort3w(...)` 函数中。这反过来又减少了要调度的并行递归任务的数量。
整个排序过程通过调用在单独线程中启动的 `qsort3w(...)` 函数来启动,以调用并行性
template<class BidirIt, class _Pred >
void parallel_sort(BidirIt _First, BidirIt _Last, _Pred compare)
{
std::size_t pos = 0L; g_depth = 0L;
if (!misc::sorted(_First, _Last, pos, compare))
{
#pragma omp parallel num_threads(12)
#pragma omp master
internal::qsort3w(_First, _Last - 1, compare);
}
}
建议在并行区域的主线程中调用 `qsort3w(...)` 函数,而不是为此目的使用 OpenMP 的任务指令。
C++11 示例演示程序
为了演示所交付并行代码的性能和效率,我实现了一个示例演示程序,通过使用常规 `std::sort` 和本文讨论的并行排序来评估排序所花费的时间。
namespace parallel_sort_impl
{
#if defined( _WIN32 )
static HANDLE hStdout = ::GetStdHandle(STD_OUTPUT_HANDLE);
const WORD wdRed = FOREGROUND_RED | FOREGROUND_INTENSITY;
const WORD wdWhite = FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE;
#endif
void stress_test(void)
{
while (true)
{
std::size_t count = 0L;
std::vector<std::int64_t> array, array_copy;
misc::init(array, std::make_pair(std::pow(10, misc::minval_radix), \
std::pow(10, misc::maxval_radix)), count);
array_copy.resize(array.size());
std::copy(array.begin(), array.end(), array_copy.begin());
std::chrono::system_clock::time_point \
time_s = std::chrono::system_clock::now();
std::cout << "sorting an array...\n";
std::sort(array.begin(), array.end(),
[](std::int64_t first, std::int64_t end) { return first < end; });
std::chrono::system_clock::time_point \
time_f = std::chrono::system_clock::now();
std::chrono::system_clock::duration \
std_sort_time_elapsed = time_f - time_s;
std::cout << std::setiosflags(std::ios::fixed) << std::setprecision(4)
<< "array size = " << count << " execution time (std::sort): "
<< std_sort_time_elapsed.count() << " ms ";
time_s = std::chrono::system_clock::now();
internal::parallel_sort(array_copy.begin(), array_copy.end(),
[](std::int64_t first, std::int64_t end) { return first < end; });
time_f = std::chrono::system_clock::now();
std::size_t position = 0L;
std::chrono::system_clock::duration \
qsort_time_elapsed = time_f - time_s;
bool is_sorted = misc::sorted(array_copy.begin(), array_copy.end(), position,
[](std::int64_t first, std::int64_t end) { return first < end; });
std::double_t time_diff = \
std_sort_time_elapsed.count() - qsort_time_elapsed.count();
#if defined( _WIN32 )
::SetConsoleTextAttribute(hStdout, \
(is_sorted == true) ? wdWhite : wdRed);
#else
if (is_sorted == false)
std::cout << "\033[1;31m";
#endif
std::cout << "<--> (internal::parallel_sort): "
<< qsort_time_elapsed.count() << " ms " << "\n";
std::cout << "verification: ";
if (is_sorted == false) {
std::cout << "failed at pos: " << position << "\n";
std::cin.get();
misc::print_out(array_copy.begin() + position,
array_copy.end() + position + 10);
}
else {
std::double_t ratio = qsort_time_elapsed.count() / \
(std::double_t)std_sort_time_elapsed.count();
std::cout << std::setiosflags(std::ios::fixed) << std::setprecision(2)
<< "passed... [ time_diff: " << std::fabs(time_diff)
<< " ms (" << "ratio: " << (ratio - 1.0) * 100
<< "% (" << (1.0f / ratio) << "x faster)) depth = "
<< internal::g_depth << " ]" << "\n";
}
std::cout << "\n";
#if !defined ( _WIN32 )
if (is_sorted == false)
std::cout << "\033[0m";
#endif
}
}
void parallel_sort_demo(void)
{
std::size_t count = 0L;
std::cout << "Enter the number of data items N = "; std::cin >> count;
std::vector<std::int64_t> array;
misc::init(array, std::make_pair(std::pow(10, misc::minval_radix), \
std::pow(10, misc::maxval_radix)), count);
std::chrono::system_clock::time_point \
time_s = std::chrono::system_clock::now();
internal::parallel_sort(array.begin(), array.end(),
[](std::int64_t first, std::int64_t end) { return first < end; });
std::chrono::system_clock::time_point \
time_f = std::chrono::system_clock::now();
std::size_t position = 0L;
std::chrono::system_clock::duration \
qsort_time_elapsed = time_f - time_s;
std::cout << "Execution Time: " << qsort_time_elapsed.count()
<< " ms " << "depth = " << internal::g_depth << " ";
bool is_sorted = misc::sorted(array.begin(), array.end(), position,
[](std::int64_t first, std::int64_t end) { return first < end; });
std::cout << "(verification: ";
if (is_sorted == false) {
std::cout << "failed at pos: " << position << "\n";
std::cin.get();
misc::print_out(array.begin() + position, array.end() + position + 10);
}
else {
std::cout << "passed...)" << "\n";
}
char option = '\0';
std::cout << "Do you want to output the array [Y/N]?"; std::cin >> option;
if (option == 'y' || option == 'Y')
misc::print_out(array.begin(), array.end());
}
}
int main()
{
std::string logo = "Parallel Sort v.1.00 by Arthur V. Ratz";
std::cout << logo << "\n\n";
char option = '\0';
std::cout << "Do you want to run stress test first [Y/N]?"; std::cin >> option;
std::cout << "\n";
if (option == 'y' || option == 'Y')
parallel_sort_impl::stress_test();
if (option == 'n' || option == 'N')
parallel_sort_impl::parallel_sort_demo();
return 0;
}
#endif // PARALLEL_STABLE_SORT_STL_CPP
总而言之,正如您从使用示例演示程序中可以看到的,本文讨论的并行排序比常规的 `qsort` 或 `std::sort` 函数快得多(高达 2-6 倍)。
历史
- 2015年12月5日 - 第一次修订
- 2015年12月8日 - 第二次修订
- 2015年12月10日 - 第三次修订
- 2015年12月12日 - 最终修订(添加了性能分析)
- 2017年11月16日 - 最终修订(大量材料更新!)
- 2020年5月7日 - 最终修订(全部材料更新!)