65.9K
CodeProject 正在变化。 阅读更多。
Home

在 CPU 上模拟 CUDA/OpenCL,通过自动向量化和与串行标量代码相比的速度提升

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2022年4月24日

GPL3

35分钟阅读

viewsIcon

13821

一个小型工具,用于编写各种算法,如同它们是 CUDA/OpenCL 内核一样

引言

VectorizedKernel.h 头文件工具旨在通过仅使用 CPU 来帮助快速原型化基本的 GPGPU 问题,而无需 C++ 编译器和正确的编译器标志集以外的任何其他东西。许多平台的 GPGPU 学习曲线通常受到 API 的质量和可用性(例如,CUDA 比 OpenCL 的“hello-world”短得多,而 OpenCL 则有更多样化的设备选择)的影响,本项目旨在通过依赖于

  • C++14 编译器(GCC v10 用于本项目的所有基准测试)
  • 编译器的自动向量化能力
  • 至少需要 MMX CPU,但 AVX512、AVX1024(未来?)、AVX2048 (未来?)也可以工作

与 CUDA 的驱动程序 API 或纯 OpenCL 代码相比,启动相对简单,并且性能高于未明确向量化的普通单线程代码,代价是冗长

由于设备不支持(例如 GT1030 对于 CUDA 或 RX550 对于 OpenCL)以及有问题的驱动程序增加了启动开销,GPGPU 的入口可能并非总是立即可用。这可以通过雇用云计算机并以每小时几美元的成本管理操作系统映像来轻松解决。通常,免费快速测试演示版更可接受。

背景

当 Nvidia 开始开发自己的通用计算框架时,许多开发人员使用 OpenGL/DirectX/Glide API 来利用图形管道进行“通用”工作。图形芯片的照明、变换和其他管道被用于矩阵运算,图形缓冲区被用作输入/输出缓冲区。像这样的 GPU 使用相比当时 CPU 有着诱人的速度提升比,并且有大量工作涉及将更多通用工作推送到 GPU。

在 CUDA(Nvidia)取得巨大成功后,OpenCL(Apple)紧随其后,技术延迟适度,并被从 Intel 到 Xilinx 到 AMD 的众多硬件供应商以及 Nvidia 等其他公司采用。

虽然 GPGPU 框架和更高级别的库不断发展,对不同类型的任务(包括训练神经网络)越来越有用,但它们也增加了学习、安装和测试的启动开销。GPGPU 的“hello-world”在 CUDA 中保持最简单,但需要 Nvidia 卡。另一方面,OpenCL 的“hello-world”则包含更多的代码行。一个“hello-world”OpenCL 程序有多个部分需要初始化才能将一个字母发送到 GPU

  • 查询平台
  • 查询设备
  • 查询支持的 API 特性或版本
  • 查询图形卡的内存大小或其他特性
  • 创建上下文
  • 创建内核
  • 创建 GPU 缓冲区
  • 创建内核参数
  • 计算“hello world”
  • 将结果返回到 RAM(如果尚未在 CPU 上)
  • 以分配的相反顺序释放所有资源

CUDA 和 OpenCL 都高度依赖代码重用或初始硬件。尽管准备时间很长,但 GPU 运行时非常好,并且大多数时候都可以容忍。制作模拟器的目的是让原型开发需要更少的代码或没有硬件依赖(除了几乎所有台式机 CPU 市场中都存在的 SSE/AVX),并且特别是更快地开始获得结果。

Using the Code

VectorizedKernel.h 头文件定义了一个简单的内核构建函数,用于从 lambda 函数创建 SIMD 内核,并将其应用于内核的所有工作项。工作项只是向量处理任务中看起来像标量的“通道”。如果任务是给数组的所有元素加 1,那么一个工作项就是计算 a[i] = a[i] + 1

内核

createKernel 工厂函数创建内核对象,并接受 simd 宽度和内核 lambda

template<int SimdWidth, typename F, class...Args>
auto createKernel(F&& kernelPrm, KernelArgs<Args...> const& _prm_)
{
        return Kernel<SimdWidth, F, Args...>(std::forward<F>(kernelPrm));
}

Args... 部分允许用户定义自己的内核参数,而 SimdWidth 被明确指定为模板整数参数,该参数是 2 的整数次幂。F 参数接受 lambda 函数,并将其转换为两个版本:一个用于计算主体,使用 simd = 用户指定值;另一个用于尾部,使用 simd = 1。这使得工作调度变得容易且足够优化,而不会破坏用于任何使用当前 simd 宽度值作为内核函数内部值的整数次幂规则。

如何创建内核

auto kernelHelloWorld = Vectorization::createKernel<simd>(
    [&](auto & factory, auto & idThread, int * prm1, float * prm2, int prm3){
        const int currentSimdWidth = factory.width;
		/* scalar code here */
        
	}, 
    Vectorization::KernelArgs<int*,float*, int>{}
);

Hello world 版本

#include "VectorizedKernel.h"
#include <iostream>
int main()
{
	constexpr int simd = 8; // >= SIMD width of CPU (bigger  = more pipelining)

	auto kernelHelloWorld = Vectorization::createKernel<simd>(
        [&](auto & factory, auto & idThread, float * bufferIn, float * bufferOut){
            // this value changes only when total work is not multiple of simd
            // i.e. for the last 3 elements of 23-element array, it is 1 (not 8)
		    const int currentSimdWidth = factory.width;

            // allocation should be done before reaching hot-spot of a loop (if any)
		    auto tmp = factory.template generate<float>();

            // this reads a single chunk that is made of neighboring elements
            // starting address = bufferIn + idThread[0]
		    tmp.readFromContiguous(bufferIn,idThread); 

            // adds 1 to all 8 elements at once
            // writes result back to same variable (last parameter of method)
		    tmp.add(1.0f,tmp);
 
            // same as read version, with opposite direction
		    tmp.writeToContiguous(bufferOut,idThread); 

	}, /* defining kernel parameter types */ Vectorization::KernelArgs<float*,float*>{});

	// size does not have to be multiple of simd
	const int n = 23;

	// better performance with aligned buffers if workload is memory-bandwidth-bound
	float vecIn[n], vecOut[n];
	for(int i=0;i<n;i++)
	{
		std::cin>>vecIn[i];
	}

    // cuda-like kernel launching, with n so-called threads
    // actually it is single-thread but on SIMD units
	kernelHelloWorld.run(n,vecIn,vecOut);

	for(int i=0;i<n;i++)
	{
		std::cout<<vecOut[i]<<" ";
	}
	std::cout<<std::endl;
	return 0;
}

当使用 -march=native-mavx 标志进行编译时,生成的 CPU 指令如下所示

.L2:
mov rsi, rbx
mov edi, OFFSET FLAT:_ZSt3cin
add rbx, 4
cmp rbx, rbp
jne .L2
vmovaps xmm0, XMMWORD PTR .LC0[rip]
lea rbx, [rsp+96]
lea rbp, [rsp+188]
vaddps xmm1, xmm0, XMMWORD PTR [rsp]   <-- 4x add
vmovaps XMMWORD PTR [rsp+96], xmm1
vaddps xmm1, xmm0, XMMWORD PTR [rsp+16] <-- 4x add
vmovaps XMMWORD PTR [rsp+112], xmm1
vaddps xmm1, xmm0, XMMWORD PTR [rsp+32] <-- 4x add
vmovaps XMMWORD PTR [rsp+128], xmm1
vaddps xmm1, xmm0, XMMWORD PTR [rsp+48] <-- 4x add
vaddps xmm0, xmm0, XMMWORD PTR [rsp+64] <-- 4x add
vmovaps XMMWORD PTR [rsp+144], xmm1
vmovq xmm1, QWORD PTR .LC1[rip]
vmovaps XMMWORD PTR [rsp+160], xmm0
vmovq xmm0, QWORD PTR [rsp+80]
vaddps xmm0, xmm0, xmm1
vmovlps QWORD PTR [rsp+176], xmm0
vmovss xmm0, DWORD PTR .LC2[rip]
vaddss xmm0, xmm0, DWORD PTR [rsp+88]
vmovss DWORD PTR [rsp+184], xmm0

vaddps 指令是一次计算 4 个值(32 位浮点数加法)。前五个此类指令计算 23 个工作项的主要部分(20 个)。最后一个 vaddpsvaddss 计算尾部部分(最后 3 个元素)。

当内核中添加额外的乘法运算时

// This value changes only when total work is not multiple of simd
// i.e., for the last 3 elements of 23-element array, it is 1 (not 8)  
const int currentSimdWidth = factory.width;

// allocation should be done before reaching hot-spot of a loop (if any)  
auto tmp = factory.template generate<float>();

// This reads a single chunk that is made of neighboring elements
// starting address = bufferIn + idThread[0] 
tmp.readFromContiguous(bufferIn,idThread);

// adds 1 to all 8 elements at once
// writes result back to same variable (last parameter of method) 
tmp.add(1.0f,tmp);

tmp.mul(3.0f, tmp); <------ an extra multiplication!

// same as read version, with opposite direction 
tmp.writeToContiguous(bufferOut,idThread); 

指令输出如下所示

vaddps  xmm2, xmm0, XMMWORD PTR [rsp]
vmulps  xmm2, xmm2, xmm1
vmovaps XMMWORD PTR [rsp+96], xmm2
vaddps  xmm2, xmm0, XMMWORD PTR [rsp+16]
vmulps  xmm2, xmm2, xmm1
vmovaps XMMWORD PTR [rsp+112], xmm2
vaddps  xmm2, xmm0, XMMWORD PTR [rsp+32]  <-- 4x add
vmulps  xmm2, xmm2, xmm1                  <-- 4x mul
vmovaps XMMWORD PTR [rsp+128], xmm2
vaddps  xmm2, xmm0, XMMWORD PTR [rsp+48] <-- 4x add
vaddps  xmm0, xmm0, XMMWORD PTR [rsp+64] <-- 4x add
vmulps  xmm2, xmm2, xmm1                 <-- 4x mul
vmulps  xmm0, xmm0, xmm1                 <-- 4x mul
vmovq   xmm1, QWORD PTR .LC2[rip]
vmovaps XMMWORD PTR [rsp+144], xmm2
vmovaps XMMWORD PTR [rsp+160], xmm0
vmovq   xmm0, QWORD PTR [rsp+80]
vaddps  xmm0, xmm0, xmm1
vmovq   xmm1, QWORD PTR .LC3[rip]
vmulps  xmm0, xmm0, xmm1
vmovlps QWORD PTR [rsp+176], xmm0
vmovss  xmm0, DWORD PTR .LC4[rip]
vaddss  xmm0, xmm0, DWORD PTR [rsp+88]
vmulss  xmm0, xmm0, DWORD PTR .LC5[rip]
vmovss  DWORD PTR [rsp+184], xmm0

编译器在每个 vaddps 之后添加必要的 vmulps 指令,而无需额外的缓冲区复制(直接通过重用相同的 CPU 向量寄存器)。这部分相当于显式使用 C++ x86 内置函数,但区别在于它是自动完成的。

在 Cascadelake 处理器微体系结构上(目前是 godbolt.org 的云服务器),使用 simd=16 设置和正确的编译器标志(-std=c++2a (至少需要 c++14!!) -O3 -march=cascadelake -mavx512f -mprefer-vector-width=512 -ftree-vectorize -fno-math-errno -lpthread),计算主要部分(AVX512 的 16 个工作项 + SSE 的 4 个工作项)的输出较短

vbroadcastss    zmm0, DWORD PTR .LC6[rip]
vmovq   xmm1, QWORD PTR .LC4[rip]
vaddps  zmm0, zmm0, ZMMWORD PTR [rbp-304] <--- 512 bit add operation (16 elements)
lea     rbx, [rbp-176]
lea     r12, [rbp-84]
vmulps  zmm0, zmm0, DWORD PTR .LC7[rip]{1to16} <--- 512 bit mul (16 elements)
vmovaps ZMMWORD PTR [rbp-176], zmm0
vbroadcastss    xmm0, DWORD PTR .LC6[rip]
vaddps  xmm0, xmm0, XMMWORD PTR [rbp-240]      <--- 128 bit add (4 elements)
vmulps  xmm0, xmm0, DWORD PTR .LC7[rip]{1to4}  <--- 128 bit mul (4 elements)
vmovaps XMMWORD PTR [rbp-112], xmm0
vmovq   xmm0, QWORD PTR [rbp-224]
vaddps  xmm0, xmm0, xmm1                      <--- tail i=20, i=21
vmovq   xmm1, QWORD PTR .LC5[rip]
vmulps  xmm0, xmm0, xmm1
vmovlps QWORD PTR [rbp-96], xmm0
vmovss  xmm0, DWORD PTR .LC6[rip]
vaddss  xmm0, xmm0, DWORD PTR [rbp-216]       <--- tail i=22
vmulss  xmm0, xmm0, DWORD PTR .LC7[rip]
vmovss  DWORD PTR [rbp-88], xmm0
vzeroupper

当工作项数量增加时(例如,曼德尔布罗集有 1000x1000 像素),计算的主要部分远大于未对齐的尾部,整体性能大大提高。

内核变量

内核变量通过 lambda 的 factory 参数创建。factory 是一个模板化对象,要创建任何类型的对象,都需要显式选择变量的模板

// serial-looking, parallel variable
auto aFloat = factory.template generate<float>();  
  
// array with vertical layout in memory (strided element access --> parallelism)
auto arr = factory.template generateArray<int,5>(); 

// serial-looking, parallel integer initialized from current work-items id value
auto anInt = factory.template generate<int>(idThread);

这是一项成本高昂的操作,因为每个看起来像标量的变量内部都有一个普通数组。这会导致堆栈分配,而堆栈分配并非总是免费的。因此,变量分配应留在此类进行最密集数学计算的最内层循环之外

auto j = idThread.modulus(matrixWidth);     // slow!
while(anyTrue)
{
    a.fusedMultiplyAdd(b,c,result);         // fast!
}

while(anyTrue)
{
    auto j = idThread.modulus(matrixWidth); // slow!
    a.fusedMultiplyAdd(b,c,result);         // stalled!
}

一旦变量创建完毕,最好多次重用它以受益于 SIMD 性能。否则,编译器会添加许多 mov/vmov 指令来准备分配,而算法的实际部分会被停滞,如果计算与数据比率低,甚至会比标量版本慢。

使用 struct 内的普通数组的另一个缺点是无法避免在所有 copy/move - 构造函数中进行深度数据复制。这会给编译器和 CPU 带来额外的工作。为了不使用普通数组,项目将完全由宏定义组成,并具有更高的性能,但这 C++ 的方式。宏定义应该只在没有其他方法可以实现时使用(例如 AVX 内置函数)。

工作项索引

在内核启动期间,每个工作项都会获得一个整数 ID 值,以指示其在整个任务中的位置。对于简单的数组处理任务,它可用于索引数组。对于 2D 矩阵乘法,它可以用于找到矩阵中元素的 X-Y 坐标(工作项自己的数据),通过对 N 取模和除以 N。

内核对象 run 方法中的工作调度非常简单。第一部分计算所有工作项,直到剩下最后一个未对齐的块。第二部分通过一次启动一个工作项来计算剩余的尾部

void run(int n, Args... args)
{
            // main body of task (length aligned to SimdWidth)
            const int nLoop = (n/SimdWidth);
            const KernelDataFactory<SimdWidth> factory;
            auto id = factory.template generate<int>();
            for(int i=0;i<nLoop;i++)
            {
                id.idCompute(i*SimdWidth,[](const int prm){ return prm;});
                kernel(factory, id, args...);
            }

            // tail part of task (m<SimdWidth): launch 1 work-item at a time
            if((n/SimdWidth)*SimdWidth != n)
            {
                const KernelDataFactory<1> factoryLast;

                const int m = n%SimdWidth;
                auto id = factoryLast.template generate<int>();
                for(int i=0;i<m;i++)
                {
                    id.idCompute(nLoop*SimdWidth+i,[](const int prm){ return prm;});
                    kernel(factoryLast, id, args...);
                }
            }
}

id 值在每个工作项之间连续计算,从零开始。启动 1000 个工作项时,最后一个 id 值是 999id 值(作为另一个看起来像标量的块数据)可以在内核中的任何基于整数的运算中使用

auto kernel = Vectorization::createKernel<simd>(

                               work-item index
                                ^            kernel parameters
                                |             ^
                                |             |
    [&](auto & factory, auto & idThread, ... prms ...){

        // compute column position in matrix
        auto j = factory.template generate<int>();
        idThread.modulus(width, j); // j is result of operation

        // compute row position in matrix
        auto i = factory.template generate<int>();
        idThread.div(width, i); // i is result of operation

        

        // fill matrix with values i * j
        // 0 0 0 ... 0 0 0
        // 0 1 2 ... N-3 N-2 N-1
        // 0 2 4 ... (N-3)*2 (N-2)*2 (N-1)*2
        // ...
        // 0 N-3
        // 0 N-2
        // 0 N-1 ... (N-1) * (N-1)
        auto element = factory.template generate<int>();

        // lock-step operation, all SIMD lanes do same
        i.mul(j,element);

        // lock-step operation, all SIMD lanes do same
        element.writeTo(matrix,idThread);
       ...

    }, ... prm types ...
);

I/O 模式

有三组方法可以从缓冲区读取/写入数据:连续 read/write 方法、索引 gather/scatter 方法和索引连续 read/write 方法。

连续读/写

void writeTo(Type * const __restrict__ ptr);
void readFrom(const Type * const __restrict__ ptr);

这些方法从给定的 ptr 地址开始 read/write 操作,同一工作组(在 run 方法中启动的单个 SIMD 组)中的每个后续工作项使用 ptr 地址上均匀递增的索引。组的第一个工作项访问 ptr 的第一个项,组中的最后一个工作项访问 ptr 上的 currentSimdWidth-1 索引。当前 SIMD 宽度是从内核 lambda 的 factory 参数(第一个参数)查询的

int currentSimdWidth = factory.width;

这表示连续 read/write 方法上的 read/write 操作的宽度。对于任务的主体块,它等于传递给内核创建函数的 simd 模板参数。对于尾部块,它等于 1

Readwrite 操作与乘法和加法方法一样进行向量化。由于存在 RAM 延迟,计算与数据比率低的算法在 I/O 上往往会损失更多性能。

auto returnValue = factory.template generate<float>();
a.add(b,returnValue);

// work-item 0 writes to img,
// work-item 1 writes to img+1,
// ..., last work-item in simd writes to img+N-1
returnValue.writeTo(img); 

索引连续读/写

void readFromContiguous(Type * const __restrict__ ptr, const KernelData<int,Simd> & id);
void writeToContiguous(Type * const __restrict__ ptr, const KernelData<int,Simd> & id);

作为第一组的索引版本,它们只选择 I/O 缓冲区上的不同目标区域。每个 simd 组中的第一个工作项决定在哪里 read/write。如果 simd 大小设置为 32,则前 32 个工作项从 ptr + id[0] 开始 read/write

// writes to img + idThread.data[0], ..., img + idThread.data[0] + simd - 1
returnValue.writeToContiguous(img, idThread);

Gather/Scatter

readFrom(Type * const __restrict__ ptr, const KernelData<int,Simd> & id);
writeTo(Type * const __restrict__ ptr, const KernelData<int,Simd> & id);

Gathering 和 scattering 意味着每个工作项选择自己的目标元素来读取/写入缓冲区。以下方法可以进行每个工作项的随机访问

// writes to img + idThread.data[0], ..., img + idThread.data[simd-1]
returnValue.writeTo(img, idThread);

这是为了方便编写标量模式代码而实现的,并且不如连续版本快。即使在支持 gather/scatter 指令的 AVX512 CPU 上,由于随机访问内存的性质,它们的延迟也很高,并且失去了缓存行的优势。AVX1 和 SSE 没有 gather/scatter 指令,并且在这些 CPU 上运行的算法应始终通过连续访问进行优化。

当算法高度依赖 RAM 延迟时(由于数据集大于缓存),应选择更大的 simd 大小来隐藏加载/存储延迟。RAM 可以通过流水线并行处理多个请求。当有多个请求在进行中时,其中一些延迟会被隐藏。

对于计算与数据比率高的算法,simd 大小应选择接近(1-2 倍)SIMD 硬件宽度,以受益于 CPU 核心的物理寄存器。如果 simd 宽度选择过高,则物理寄存器不足,而是使用 L1 缓存。L1 缓存的延迟比寄存器高,带宽比寄存器低。

循环结构

CPU 核心的 SIMD 硬件没有独立的通道。每个通道与其他 SIMD 单元中的所有通道锁定,并且所有通道对不同数据执行相同的计算(指令)。由于这种工作流程,它们具有有限的循环能力。

固定迭代次数

同一 SIMD 组中的所有工作项需要同时迭代相同次数时,可以使用简单循环

// all work-items do same
for(int i=0;i<100;i++)
{
    // compute a
    a.add(1.0f,a);

    // compute b
    b.mul(1.1f,b);

    // compute result
    a.fusedMultiplyAdd(b,result,result); // a*b + result --> result
}
// all work-items in same SIMD group reach here at the same time

这仅适用于每个工作组。不同组可以迭代不同的次数。第一个 SIMD 工作项组可以迭代 100 次,而下一组可以迭代 5 次而没有问题,因为所有工作组都是独立的(它们一个接一个地启动)。

可变迭代次数

当每个工作项都需要遵循不同数量的迭代时,同一 SIMD 组中的所有工作项必须遵循相同数量的迭代,并带有掩码操作。掩码允许已中断循环的工作项继续循环,而不会改变其最后状态,例如算法的结果

bool anyWorkRemaining = true;
auto k = factory.template generate<int>(0);
auto mask = factory.template generate<int>();
auto somethingToCompute = factory.template generate<float>();

// max iterations
const int N = 100;
while(anyWorkRemaining)
{
    // ... compute loop exit condition for all work-items ...
    k.lessThan(N,mask);                  // vectorized computation
    anyWorkRemaining = mask.isAnyTrue(); // vectorized computation

    // compute algorithm, but masked
    // .. compute some new value
    // .. but don't use it if mask is off
    // (mask?newVal:something) --> something
    mask.ternary(newValueComputed,somethingToCompute,somethingToCompute);


    // increment the iteration
    k.add(variableIterationAmount,k);
}

bool anyWorkRemaining = true;

并行阶乘示例

编译器通常很聪明,甚至可以(水平)并行化像这样的阶乘函数

template<typename Type>
Type factorial(Type input)
{
    Type result = input--;
    if(input+1 == 0)
        return 1;
    while(input>0)
    {
        result *= input--;
    }

    return result>0?result:1;
}

生成的指令的入口部分是

        vpbroadcastq    zmm0, rax
        mov     rdi, r8
        vpaddq  zmm0, zmm0, ZMMWORD PTR .LC9[rip]
        vpbroadcastq    zmm1, QWORD PTR .LC2[rip]
        shr     rdi, 3
        xor     edx, edx
.L26:
        vmovdqa64       zmm2, zmm0
        inc     rdx
        vpmullq zmm1, zmm1, zmm2
        vpaddq  zmm0, zmm0, ZMMWORD PTR .LC10[rip]
        cmp     rdx, rdi
        jne     .L26
        vextracti64x4   ymm0, zmm1, 0x1
        vpmullq ymm1, ymm0, ymm1
        vmovdqa xmm0, xmm1
        vextracti64x2   xmm1, ymm1, 0x1
        vpmullq xmm0, xmm0, xmm1
        vpsrldq xmm1, xmm0, 8
        vpmullq xmm0, xmm0, xmm1
        vmovq   rdx, xmm0
        imul    rax, rdx
        mov     rdx, r8
        and     edx, 7
        and     r8d, 7
        je      .L27

经过这个优化部分后,它继续以标量版本进行计算

.L25:
        mov     rdi, rdx
        imul    rax, rdx
        dec     rdi
        je      .L27
        imul    rax, rdi
        mov     rdi, rdx
        sub     rdi, 2
        je      .L27
repeats this many times

应用于元素数组(类型为 uint64_t)值为 10+k%3 的标量阶乘,每元素运行 24 个周期,并且已经由编译器部分向量化了乘法(Cascadelake AVX512),如上所示。24 个周期的操作对于 godbolt.org 的服务器 CPU 来说相当于 8 纳秒。考虑到 RAM 延迟包含在 24 个周期中,它相当快(在 GCC v11 上)。

标量版本的基准测试程序

#include <algorithm>
#include <cstdint>
#include"VectorizedKernel.h"

#ifdef _MSC_VER
# include <intrin.h>
#else
# include <x86intrin.h>
#endif

// optional wrapper if you don't want to just use __rdtsc() everywhere
inline
uint64_t readTSC() {
    // _mm_lfence();  // optionally wait for earlier insns to retire 
                      // before reading the clock
    uint64_t tsc = __rdtsc();
    // _mm_lfence();  // optionally block later instructions until rdtsc retires
    return tsc;
}

template<typename Type>
Type factorial(Type input)
{
    Type result = input--;
    if(input+1 <= 1)
        return 1;

    while(input>0)
    {
        result *= input--;
    }

    return result>0?result:1;
}

constexpr int N = 1000000;

int main() {

  std::vector<uint64_t> input(N);
  std::vector<uint64_t> output(N);
  for(int k=0;k<N;k++)
  {
      input[k]=10+k%3
  }

  for(int i = 0;i<100;i++)
  {
    auto t1 = readTSC();
    for(int k=0;k<N;k++)
    {
        output[k] = factorial(input[k]);
    }
    auto t2 = readTSC();
    std::cout << (t2 - t1)/(N) << " cycles per factorial" << std::endl;
  }

  for(int i=0;i<20;i++)
  {
      std::cout<<output[i]<<" ";
  }
  std::cout<<std::endl;
  return 0;
}

向量化内核版本

通过在元素上而不是乘法上进行向量化,AVX512 可以更快地完成它,并且需要并行 while 循环。使用 AVX512 标志为 Cascadelake CPU 编译的编译器以每元素 12 个周期完成相同的 100 万个元素处理。

constexpr int simd = 32;
using maskType = uint64_t;
using dataType = uint64_t;
auto kernel = Vectorization::createKernel<simd>(
      [&](auto & factory, auto & id, dataType * inputBuffer, dataType * outputBuffer){
            auto result = factory.template generate<dataType>();

            auto input  = factory.template generate<dataType>();
            input.readFromContiguous(inputBuffer,id);

            // result = input--
            result.readFrom(input);
            input.sub(1,input);

            // if(input+1 <= 1) return 1;
            // mask: 1 means work is in progress, 0 means completed
            auto maskEarlyQuit = 
            factory.template generate<maskType>(1); // type: same byte-length
            auto inputPlus1 = factory.template generate<dataType>();
            input.add(1,inputPlus1);
            inputPlus1.greaterThan(1,maskEarlyQuit);

            // return 1
            maskEarlyQuit.ternary(result,(dataType)1,result);
            maskEarlyQuit.ternary(input,(dataType)1,input);

            //while(input>0){    result *= input--;}
            auto maskedResult = factory.template generate<dataType>();
            auto maskedInput = factory.template generate<dataType>();
            bool workInProgress = true;
            auto mask = 
            factory.template generate<maskType>(1); // type: same byte-length
            const dataType one = 1;
            const maskType zero = 1;
            while(workInProgress)
            {
                // result * input ---> masked result
                result.mul(input,maskedResult);

                // input-- ----> masked input
                input.sub(one,maskedInput);

                //(input>0) && (not early quit)
                maskedInput.greaterThan(zero,mask);
                mask.bitwiseAnd(maskEarlyQuit,mask);
                workInProgress = mask.isAnyTrue();

                // mask?maskedResult:result -----> result
                // mask?maskedInput:input   -----> input
                mask.ternary(maskedResult,result,result);
                mask.ternary(maskedInput,input,input);
            }

            //return result>0?result:1;
            auto gt = factory.template generate<maskType>();
            result.greaterThan(0,gt);
            gt.ternary(result,(dataType)1,result);
            result.writeToContiguous(outputBuffer,id);

      }, Vectorization::KernelArgs<dataType *, dataType *>{}
);

向量化版本的基准测试程序

#include <algorithm>
#include <cstdint>
#ifdef _MSC_VER
# include <intrin.h>
#else
# include <x86intrin.h>
#endif

// optional wrapper if you don't want to just use __rdtsc() everywhere
inline
uint64_t readTSC() {
    // _mm_lfence();  // optionally wait for earlier insns to retire 
                      // before reading the clock
    uint64_t tsc = __rdtsc();
    // _mm_lfence();  // optionally block later instructions until rdtsc retires
    return tsc;
}

template<typename Type>
Type factorial(Type input)
{
    Type result = input--;
    if(input+1 <= 1)
        return 1;
    while(input>0)
    {
        result *= input--;
    }

    return result>0?result:1;
}

#include"VectorizedKernel.h"
constexpr int N = 1000000;

int main() {
  constexpr int simd = 4;
  using maskType = uint64_t;
  using dataType = uint64_t;
  auto kernel = Vectorization::createKernel<simd>(
      [&](auto & factory, auto & id, dataType * inputBuffer, dataType * outputBuffer){
            auto result = factory.template generate<dataType>();

            auto input  = factory.template generate<dataType>();
            input.readFromContiguous(inputBuffer,id);

            // result = input--
            result.readFrom(input);
            input.sub(1,input);

            // if(input+1 <= 1) return 1;
            // mask: 1 means work is in progress, 0 means completed
            auto maskEarlyQuit = 
            factory.template generate<maskType>(1); // type: same byte-length
            auto inputPlus1 = factory.template generate<dataType>();
            input.add(1,inputPlus1);
            inputPlus1.greaterThan(1,maskEarlyQuit);

            // return 1
            maskEarlyQuit.ternary(result,(dataType)1,result);
            maskEarlyQuit.ternary(input,(dataType)1,input);

            //while(input>0){    result *= input--;}
            auto maskedResult = factory.template generate<dataType>();
            auto maskedInput = factory.template generate<dataType>();
            bool workInProgress = true;
            auto mask = factory.template generate<maskType>(1); // type: same byte-length
            const dataType one = 1;
            const maskType zero = 1;
            while(workInProgress)
            {
                // result *= input (masked)
                result.mul(input,maskedResult);

                // input-- (masked)
                input.sub(one,maskedInput);

                //(input>0) && (loop not exited)
                maskedInput.greaterThan(zero,mask);
                mask.bitwiseAnd(maskEarlyQuit,mask);
                workInProgress = mask.isAnyTrue();

                // mask?maskedResult:result -----> result
                // mask?maskedInput:input   -----> input
                mask.ternary(maskedResult,result,result);
                mask.ternary(maskedInput,input,input);
            }

            //return result>0?result:1;
            auto gt = factory.template generate<maskType>();
            result.greaterThan(0,gt);
            gt.ternary(result,(dataType)1,result);
            result.writeToContiguous(outputBuffer,id);

      }, Vectorization::KernelArgs<dataType *, dataType *>{});

  std::vector<dataType> input(N);
  std::vector<dataType> output(N);

  for(int k=0;k<N;k++)
  {
      input[k]=k%100;
  }

  for(int i = 0;i<101;i++)
  {
    if(i%2==0)
    {
        auto t1 = readTSC();
        kernel.run(N,input.data(),output.data());
        auto t2 = readTSC();
        std::cout << (t2 - t1)/(N) << " cycles per factorial (vectorized)" << std::endl;
    }
    else
    {
        auto t1 = readTSC();
        for(int k=0;k<N;k++)
        {
            output[k] = factorial(input[k]);

        }
        auto t2 = readTSC();
        std::cout << (t2 - t1)/(N) << " cycles per factorial (scalar)" << std::endl;
    }
  }

  for(int i=0;i<20;i++)
  {
      std::cout<<output[i]<<" ";
  }
  std::cout<<std::endl;
  return 0;
}

即使 while 循环中存在额外的掩码操作,它也超过了标量(但 AVX512 优化)版本的性能。虽然 AVX512 CPU 每个核心有两个 AVX512 单元,它们具有足够的性能来进行朴素循环向量化,但 AVX2 和早期架构缺乏必要的 SIMD 宽度来弥补高比例的掩码操作,因此朴素循环向量化在 AVX2 上性能相同,在 SSE 上则更慢。有多种方法可以输给智能编译器,这是一种,除非 CPU 有更优的东西,而编译器尚未考虑到(然而)。

曼德尔布罗集生成

使用复数生成曼德尔布罗集,即使具有相似的简单性,编译器也无法像预测阶乘函数那样高效地预测。

int getPoint(int a, int b)
{
    float x = static_cast<float>(b);
    float y = static_cast<float>(a);
    x = (x - width / 2 - width/4)/(width/3);
    y = (height / 2 - y)/(width/3);

    complex<float> c (x,y);

    complex <float> z(0, 0);
    size_t iter = 0;
    while (abs(z) < 2 && iter <= 35)
    {
        z = z * z + c;
        iter++;
    }
    if (iter < 34) return iter*255/33;
    else return 0;
}

编译器生成的指令 90% 是标量的(除了一些内存移动),计算性能是每像素 1043 个周期(在 Cascadelake (AVX512) CPU 上),因为编译器无法展开未知的模式数据。为了让它知道发生了什么,一些标准的 struct 被展开了。例如,std::complexstd::abs 默认不会被编译器优化掉。因此,测试了三种不同的朴素标量版本

  • 上面代码块中的简单朴素版本
  • std::complex 被显式展开为两个 float,不使用平方根进行 abs 计算
  • std::complex 再次展开,但带有平方根

基准测试结果表明,通过简单的代码更改,朴素版本可以得到高度优化。

                           Performance (cycles per pixel)
Compiler      CPU          std::complex  c;     float creal,cimag; + std::sqrt    
gcc   10      Fx-8150      1770                 175                  215
gcc   11      Xeon-8275CL  1075                 91                  111
clang 9       Xeon-8275CL  252(average 500+)    97                  97              
clang 14      Xeon-8275CL  255(average 500+)    77                  79
icc   2021.5  Xeon-8275CL  800+                 66                  70
icx   2022    Xeon-8275CL  800+                 79                  91
msvc  19      Xeon-8275CL  490                  71                  137

速度提升最大的是当 std::complex 被展开为两个 float(实部和虚部)并在复数乘法中显式计算时。这有助于编译器看到 std::complex 看不到的优化。然后,为了测试是否是 abs 函数中的平方根,它被加回了循环条件计算中,这略微增加了时间。

对于朴素的简单朴素版本,msvc-19 表现最好,每像素 490 个周期,而 clang 有时达到每周期 252 个周期。当 std::complex 被显式计算为两个 float 时,icc 表现最好。当平方根计算被显式加回时,获胜者仍然是 icc。

尽管 GCC(本项目中用于基准测试的默认编译器)在未优化代码中的表现最差,但在 KernelData 对象(这些是非常小的数组,恰好包含一个 AVX 或 AVX512 指令可以替换的元素数量)的自动向量化方面表现最好。

将未优化的朴素版本与完全优化的向量化版本进行比较是不公平的。

FX8150 的实际速度提升是从每像素 175 个周期到每像素 80 个周期(性能提升 2.1 倍),因为编译器可以进行“水平”并行化,该并行化一次处理一个像素,但在 while 循环深度上每个像素处理得更快。然而,对于某些算法,垂直并行化优于水平并行化,曼德尔布罗集生成看起来就是其中之一。

Cascadelake CPU 的实际速度提升是从每像素 91 个周期到每像素 11 个周期(约 8 倍)。由于 while 循环的深度有限(35),并且 AVX512 的宽度是 AVX2 的 2 倍,并且核心中有 2 倍的 FMA 单元,与编译器进行的水平向量化相比,速度提升显著。

三个朴素标量版本的测试代码

#include <vector>
#include <stdint.h>
#include <iostream>
#include <sstream>
#include <fstream>
#include <complex>

#ifdef _MSC_VER
# include <intrin.h>
#else
# include <x86intrin.h>
#endif

// optional wrapper if you don't want to just use __rdtsc() everywhere
inline
uint64_t readTSC() {
    // _mm_lfence();  // optionally wait for earlier insns to retire 
                      // before reading the clock
    uint64_t tsc = __rdtsc();
    // _mm_lfence();  // optionally block later instructions until rdtsc retires
    return tsc;
}

constexpr int width = 2000;
constexpr int height = 2000;

unsigned char getPoint(int a, int b)
{
    float x = static_cast<float>(b);
    float y = static_cast<float>(a);
    x = (x - width / 2.0f - width/4.0f)/(width/3.0f);
    y = (height / 2.0f - y)/(width/3.0f);

    std::complex<float> c (x,y);

    std::complex <float> z(0, 0);
    size_t iter = 0;
    while (std::abs(z) < 2 && iter <= 35)
    {
        z = z * z + c;
        iter++;
    }
    if (iter < 34) return iter*255/33;
    else return 0;
}

// std::complex replaced by two floats
// sqrt is optimized out
unsigned char getPointExplicit(int a, int b)
{
    float x = static_cast<float>(b);
    float y = static_cast<float>(a);
    x = (x - width / 2.0f - width/4.0f)/(width/3.0f);
    y = (height / 2.0f - y)/(width/3.0f);

    //std::complex<float> c (x,y);
    float creal = x;
    float cimag = y;

    //std::complex <float> z(0, 0);
    float zreal = 0;
    float zimag = 0;

    size_t iter = 0;
    while ((zreal*zreal + zimag*zimag) < 4 && iter <= 35)
    {
        //z = z * z + c;
        float znewreal = zreal*zreal - zimag*zimag + creal;
        float znewimag = 2.0f*zreal*zimag + cimag;
        zreal=znewreal;
        zimag=znewimag;
        iter++;
    }
    if (iter < 34) return iter*255/33;
    else return 0;
}

// std::complex replaced by two floats
unsigned char getPointExplicitWithSqrt(int a, int b)
{
    float x = static_cast<float>(b);
    float y = static_cast<float>(a);
    x = (x - width / 2.0f - width/4.0f)/(width/3.0f);
    y = (height / 2.0f - y)/(width/3.0f);

    //std::complex<float> c (x,y);
    float creal = x;
    float cimag = y;

    //std::complex <float> z(0, 0);
    float zreal = 0;
    float zimag = 0;

    size_t iter = 0;
    while (std::sqrt(zreal*zreal + zimag*zimag) < 2 && iter <= 35)
    {
        //z = z * z + c;
        float znewreal = zreal*zreal - zimag*zimag + creal;
        float znewimag = 2.0f*zreal*zimag + cimag;
        zreal=znewreal;
        zimag=znewimag;
        iter++;
    }
    if (iter < 34) return iter*255/33;
    else return 0;
}

void createImage()
{
    std::vector<unsigned char> image(width*height);

    for(int k=0;k<20;k++)
    {
        auto t1 = readTSC();
        for (size_t i = 0; i < height; i++)
        {
            for (size_t j = 0; j < width; j++)
            {
                image[j+i*width]= getPointExplicit(i, j);
            }
        }
        auto t2 = readTSC();
        std::cout<<(t2-t1)/(width*height)<<" cycles per pixel"<<std::endl;
    }

  // write to file at once
  std::ofstream fout;
  fout.open("mandelbrot.ppm");
  if (fout.is_open()) {
    fout << "P5\n" << width << ' ' << height << " 255\n";
    for(int i=0;i<width*height;i++)
    {
        fout << image[i];
    }
    fout.close();
  } else {
    std::cout << "Could not open the file!\n";
  }
}

int main()
{
    createImage();
    return 0;
}

以下代码示例是同一曼德尔布罗集生成的向量化内核不带平方根

  • 垂直并行化,一次计算 16-32 个像素
  • 一次访问 16-32 个像素的内存
  • 使用融合乘加方法进行快速 a*b+c 计算
  • 掩码操作数类型选择为 int,以与 float(4 字节)相同的宽度,因为这可能有助于编译器生成高效的 CPU 指令,而无需将其转换为 char
#include <algorithm>
#include <cstdint>
#include <fstream>
#include <iostream>
#include "VectorizedKernel.h"

constexpr int width = 2000;
constexpr int height = 2000;
constexpr int simd = 64;

void Mandelbrot(std::vector<char> & image) {
    // imag: imaginary part of complex number
    // real: real part of complex number
	auto kernel = Vectorization::createKernel<simd>([&]
                  (auto & factory, auto & idThread, char * img){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail

		auto j = factory.template generate<int>();
		idThread.modulus(width, j);

		auto i = factory.template generate<int>();
		idThread.div(width, i);

		auto x0 = factory.template generate<float>();
		j.template castAndCopyTo<float>(x0);
		auto y0 = factory.template generate<float>();
		i.template castAndCopyTo<float>(y0);

		const auto heightDiv2 = factory.template generate<float>(height/2.0f);

		auto x = factory.template generate<float>();
		x0.sub(width/2.0f, x0);

		x0.sub(width/4.0f, x0);
		x0.div(width/3.0f,x);

		auto y =  factory.template generate<float>();
		heightDiv2.sub(y0,y0);
		y0.div(width/3.0f, y);

        // c constant variable's components
		const auto imagc = factory.template generate<float>(y);
		const auto realc = factory.template generate<float>(x);

        // z variable's imaginary and real components
		auto imagz = factory.template generate<float>(0);
		auto realz = factory.template generate<float>(0);

		// loop
		bool anyTrue = true;
		auto iteration = factory.template generate<int>(0);

		// allocate all re-used resources for the while-loop
		auto imagzSquared = factory.template generate<float>();
		auto absLessThan2 = factory.template generate<int>();
		auto tmp1 = factory.template generate<float>();
		auto whileLoopCondition = factory.template generate<int>();
		auto tmp2 = factory.template generate<int>();
		auto zzReal = factory.template generate<float>();

		auto zzImag = factory.template generate<float>();
		auto tmp3 = factory.template generate<float>();
		auto tmpAdd1 = factory.template generate<float>();
		auto tmpAdd2 = factory.template generate<float>();
		auto tmp4 = factory.template generate<int>();

        // no allocation within the loop
		while(anyTrue)
		{
			// computing while loop condition start
            imagz.mul(imagz, imagzSquared);
			realz.fusedMultiplyAdd(realz,imagzSquared,tmp1);
			tmp1.lessThan(4.0f, absLessThan2);

			iteration.lessThanOrEquals(35, tmp2);
			absLessThan2.logicalAnd(tmp2, whileLoopCondition);
			anyTrue = whileLoopCondition.isAnyTrue();
			// computing while loop condition end

			// do complex multiplication z = z*z + c
			realz.fusedMultiplySub(realz,imagzSquared, zzReal);
			imagz.mul(realz, tmp3);
			realz.fusedMultiplyAdd(imagz,tmp3, zzImag);

			// if a lane has completed work, do not modify it
			zzReal.add(realc, tmpAdd1);
			zzImag.add(imagc, tmpAdd2);
			whileLoopCondition.ternary(tmpAdd1, realz, realz);
			whileLoopCondition.ternary(tmpAdd2, imagz, imagz);

			// increment iteration
			whileLoopCondition.ternary(1,0, tmp4);
			iteration.add(tmp4, iteration);
		}

        // compute output
		const auto thirtyFour = factory.template generate<int>(34);
		auto ifLessThanThirtyFour = factory.template generate<int>();
		iteration.lessThan(thirtyFour, ifLessThanThirtyFour);

		auto conditionalValue1 = factory.template generate<int>();
		iteration.mul(255, conditionalValue1);
		conditionalValue1.div(33, conditionalValue1);

		auto conditionalValue2 = factory.template generate<int>(0);
		auto returnValue = factory.template generate<int>(0);
		ifLessThanThirtyFour.ternary
              (conditionalValue1, conditionalValue2, returnValue);
		auto tmp5 = factory.template generate<int>();
		i.mul(width, tmp5);
		auto writeAddr = factory.template generate<int>(0);
		j.add(tmp5, writeAddr);
		auto returnValueChar =  factory.template generate<char>(0);
		returnValue.template castAndCopyTo<char>(returnValueChar);
		returnValueChar.writeTo(img,writeAddr);

	},Vectorization::KernelArgs<char*>{});
    if(image.size()<width*height)
	    image.resize(width*height);
	kernel.run(width*height,image.data());
}

#ifdef _MSC_VER
# include <intrin.h>
#else
# include <x86intrin.h>
#endif

// optional wrapper if you don't want to just use __rdtsc() everywhere
inline
uint64_t readTSC() {
    // _mm_lfence();  // optionally wait for earlier insns to retire 
                      // before reading the clock
    uint64_t tsc = __rdtsc();
    // _mm_lfence();  // optionally block later instructions until rdtsc retires
    return tsc;
}

int main() {
  std::vector<char> image;
  for(int i = 0;i<100;i++)
  {
    auto t1 = readTSC();
    Mandelbrot(image);
    auto t2 = readTSC();
    std::cout << (t2 - t1)/(width*height) << " cycles per pixel" << std::endl;
  }
  // write to file at once
  std::ofstream fout;
  fout.open("mandelbrot.ppm");
  if (fout.is_open()) {
    fout << "P5\n" << width << ' ' << height << " 255\n";
    for(int i=0;i<width*height;i++)
    {
        fout << image[i];
    }
    fout.close();
  } else {
    std::cout << "Could not open the file!\n";
  }

  return 0;
}

这次,计算量大于掩码操作,向量化比阶乘示例更有效。在 FX8150 3.6GHz CPU 和 GCC v10 上,性能为每像素 80 个周期。在 Cascadelake 和GCC v10 上,性能为每像素 11 个周期。与未修改的朴素代码性能相比,Cascadelake 几乎有 100 倍的速度提升,与优化版本相比也有约 8 倍的速度提升。

性能的一部分来自于并行内存 I/O。所有 64 个工作项并行计算,然后所有结果一次性写入输出缓冲区。逐个写入会导致每次操作额外的 RAM 或缓存延迟。当所有结果一次性写入时,部分总延迟会被隐藏。然后是基于 SIMD 的计算速度提升,最高可达 16 倍。godbolt.org 的 Cascadelake CPU 每个核心有两个 AVX512 fma 单元。这使得与 FX8150 的单个 AVX 单元相比,速度提升了 4 倍,并且架构差异(如 L1 带宽、指令延迟、总物理寄存器等)使得即使 CPU 频率较低,差异也达到了 8 倍。

感谢GCC v10的高效自动向量化,VectorizedKernel 甚至在与同样优化代码路径的 std::valarray 版本相比时,实现了 100% 的速度提升。

程序生成此图像

图像图案主要由均匀的颜色分布组成,这有利于使用 SIMD 使用来扩展性能。当图像缩放时,可能不会像这样扩展。这取决于相邻像素之间的分支。测试程序使用了扫描线计算。如果它使用了平铺计算,那么由于更好的相邻计算打包,它可以获得更好的扩展。在对曼德尔布罗集生成性能进行基准测试时,生成的图像和计算精度必须相同,才能对不同 CPU 进行公平比较。还有其他因素,如 AVX512 节流问题与未知的服务器负载。当另一个客户端正在计算 AVX512 时,当前客户端的标量代码也会被节流。这通常是 Web 服务器的一个坏副作用,但对于高性能计算或工作站来说则不然。Xeon AVX512 核心类似于一个小型 GPU 核心(在低频率下有 192 个流水线),可以自由访问 RAM。

将变量转换为不同类型

类型转换由 castAndCopyTo 模板方法支持。在曼德尔布罗内核中,有转换方法

auto x0 = factory.template generate<float>(); // if j was 3
j.template castAndCopyTo<float>(x0);          // then x0 becomes 3.0f

这会将所有 SIMD 车道值转换为显式调用的 template 类型。在整数变量和浮点变量之间转换时,这会改变数据的位表示。为了在更改值的同时保持位表示相同,使用了 castBitwiseAndCopyTo 方法。

// j=100
j.broadcast(100);                            // 4-byte integer
auto x = factory.template generate<float>(); // 4-byte float (important! 4==4)

// x becomes 1.4013e-43
j.template castBitwiseAndCopyTo<float>(x);

SIMD(所谓的)线程组车道之间的通信

还有方法可以将一个变量的一个车道的值广播到它自己的所有车道,或者广播到另一个变量的所有车道。

broadcast 将所有车道初始化为相同的常量字面值。

auto x = factory.template generate<int>(0); // 0-initializes all lanes
x.broadcast(5); // all lanes have the value 5 now

broadcastFromLaneToVector 将一个车道的值复制到目标变量的所有车道

#include <algorithm>
#include <iostream>
#include"VectorizedKernel.h"

constexpr int simd = 4;

int main() {
  std::vector<int> data(500);

  // initialize data vector to 0,0,0,0,4,4,4,4,8,8,8,8,12,12,12,12,16,16,16,16
  auto kernel = Vectorization::createKernel<simd>([&]
                (auto & factory, auto & idThread, int * data){

            auto value = factory.template generate<int>();

            // take lane 0's value and broadcast to all lanes of value
            idThread.broadcastFromLaneToVector(0,value);

            value.writeTo(data,idThread);
        },
        Vectorization::KernelArgs<int*>{}
  );

  kernel.run(500,data.data());

  for(int i=0;i<30;i++)
  {
      std::cout<<data[i]<<" ";
  }
  std::cout<<std::endl;
  return 0;
}

输出如下

0 0 0 0 4 4 4 4 8 8 8 8 12 12 12 12 16 16 16 16 20 20 20 20 24 24 24 24 28 28

broadcastFromLane 方法将一个车道的值复制到同一变量的所有其他车道。

auto kernel = Vectorization::createKernel<simd>([&]
              (auto & factory, auto & idThread, int * data){
      const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                  // 1 on non-aligned tail

            auto value = factory.template generate<int>();
            value.readFrom(idThread);

            // take lane 0's value and broadcast to all lanes
            value.broadcastFromLane(0);

            value.writeTo(data,idThread);
        },
        Vectorization::KernelArgs<int*>{}
);

gatherFromLane 方法为每个车道进行独立的车道选择,并将值复制到结果变量。

左移示例(仅适用于非尾部 SIMD 线程组)

auto kernel = Vectorization::createKernel<simd>([&]
              (auto & factory, auto & idThread, int * data){
            // simd on aligned-body, 1 on non-aligned tail
            const int currentSimdWidth = factory.width; 

            auto value = factory.template generate<int>();
            auto laneId = factory.template generate<int>();

            // laneId = idThread % currentSimdWidth
            idThread.modulus(currentSimdWidth, laneId);

            // laneId++; ----> will gather from right-neighbor
            laneId.add(1,laneId);

            // laneId = laneId % currentSimdWidth ---> wrap around to 0
            laneId.modulus(currentSimdWidth,laneId);

            // { 0,1,2,3 } becomes { 1,2,3,0 }
            idThread.gatherFromLane(laneId,value);

            value.writeTo(data,idThread);
        },
        Vectorization::KernelArgs<int*>{}
);

输出如下

1 2 3 0 5 6 7 4 9 10 11 8 13 14 15 12 17 18 19 16 21 22 23 20 25 26 27 24 29 30

在 Cascadelake CPU 和 simd=32 设置下,每个元素从另一个车道收集并以每元素 3 个周期的速度写入输出。除了内核启动延迟外,还有 RAM 访问延迟(部分隐藏)、valuelaneId 变量的两次分配、2 次取模、1 次加法、1 次 gather 和写入操作,平均而言都在 3 个周期/元素内完成,或 1 纳秒(4GB/s,200kB 缓冲区)。

左移车道,不使用 gatherFromLane 方法,可以使用 lanesLeftShift 实现

auto kernel = Vectorization::createKernel<simd>([&]
              (auto & factory, auto & idThread, int * data){
            const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                        // 1 on non-aligned tail

            auto value = factory.template generate<int>(idThread);
            auto value2 = factory.template generate<int>();

            // quicker to compute than add+modulus+gather
            value.lanesLeftShift(10,value2);

            value2.writeTo(data,idThread);
        },
        Vectorization::KernelArgs<int*>{}
); // same output

从相邻车道(同一 SIMD 单元内)收集值可以更容易地计算缩减。

缩减示例

朴素且可读的缩减算法如下所示

---64-element sum-reduction sample using 32 SIMD lanes---

Type data[64];

for(i = 32; i>0; i/=2)
{
    if(current lane index < i)
    {
         data[current lane index] += data[current lane index + i];
    }
}
data[0]---> has sum of all elements from 0 to 63

当存在 if 时,块内的指令需要另一个掩码。+= 运算符需要掩码

SIMD 中的缩减

SIMD 车道内的缩减仅使用一半的车道来添加两半,然后递归地通过将每个新左半部分减半来继续,直到第一个元素成为唯一的一半。

constexpr int simd=64;
auto kernel = Vectorization::createKernel<simd>([&]
              (auto & factory, auto & idThread, int * data){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail

        auto value = factory.template generate<int>();
        auto value2 = factory.template generate<int>();
        auto value3 = factory.template generate<int>();

        value.readFrom(data,idThread);

        auto laneIndex = factory.template generate<int>();
        auto laneIndexIncremented = factory.template generate<int>();
        idThread.modulus(currentSimdWidth,laneIndex);

        auto mask = factory.template generate<int>();

        auto gatheredValue = factory.template generate<int>();

        // reduction
        for(int i = 32; i>0; i/=2)
        {
            // lane index < i ?
            laneIndex.lessThan(i,mask);

            // lane index + i
            laneIndex.add(i,laneIndexIncremented);

            // do not increment right-half of lanes to not overflow SIMD lanes
            mask.ternary(laneIndexIncremented,laneIndex,laneIndexIncremented);

            // value2 = gathered from [lane index + i]
            value.gatherFromLane(laneIndexIncremented,value2);

            // value3 = value + value2
            value.add(value2,value3);

            // if(lane index < i) then value3 ---> value else value ----> value
            mask.ternary(value3,value,value);
        }

        // sum from 0 to 63 = 2016
        // only first lane writes the result: { 2016, 1, 2, 3, ...}
        value.writeToMasked(data, idThread, mask); 
      },
      Vectorization::KernelArgs<int*>{}
);

也可以在一个缓冲区上运行缩减,将元素数量缩减两倍

缓冲区上的缩减

constexpr int simd=64;
auto kernel = Vectorization::createKernel<simd>([&]
              (auto & factory, auto & idThread, int * data){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail

        auto value = factory.template generate<int>();
        auto value2 = factory.template generate<int>();
        auto value3 = factory.template generate<int>();

        value.readFrom(data,idThread);

        auto laneIndex = factory.template generate<int>();
        auto laneIndexIncremented = factory.template generate<int>();
        idThread.modulus(currentSimdWidth,laneIndex);

        auto mask = factory.template generate<int>();

        auto gatheredValue = factory.template generate<int>();

        // reduction
        for(int i = 64; i>0; i/=2)
        {
            // lane index < i ?
            laneIndex.lessThan(i,mask);

            // lane index + i
            laneIndex.add(i,laneIndexIncremented);

            // value = gathered from [lane]
            value.readFromMasked(data, laneIndex, mask);

            // value2 = gathered from [lane index + i]
            value2.readFromMasked(data, laneIndexIncremented, mask);

            // value3 = value + value2
            value.add(value2,value3);

            // if(lane index < i) then value3 ---> value else value ----> value
            mask.ternary(value3,value,value);

            value.writeToMasked(data,idThread,mask);
        }
      },
      Vectorization::KernelArgs<int*>{}
);

结果在零位置为 8128(值 0,1,2,...127 的总和)。

由于缓冲区中没有溢出,并且 I/O 始终是连续的,因此可以使用非掩码的连续版本替换掩码读/写方法

// reduction
for(int i = 64; i>0; i/=2)
{
      // lane index < i ?
      laneIndex.lessThan(i,mask);

      // lane index + i
      laneIndex.add(i,laneIndexIncremented);

      // value = gathered from [lane]
      value.readFromContiguous(data, laneIndex);

      // value2 = gathered from [lane index + i]
      value2.readFromContiguous(data, laneIndexIncremented);

      // value3 = value + value2
      value.add(value2,value3);

      // if(lane index < i) then value3 ---> value else value ----> value
      mask.ternary(value3,value,value);

      value.writeToContiguous(data,idThread);
}

这运行速度更快,平均每个 128 元素块在 1248 个周期内完成(对于 Cascadelake CPU,每元素约 3 纳秒)。这是 7 次变量分配、6 次循环迭代和每次迭代 7 次操作的成本。此示例的计算与数据比率较低(特别是分配)。

仅测量缩减循环时

#include <algorithm>

#include <iostream>
#include"VectorizedKernel.h"

constexpr int simd = 64;

#ifdef _MSC_VER
# include <intrin.h>
#else
# include <x86intrin.h>
#endif

// optional wrapper if you don't want to just use __rdtsc() everywhere
inline
uint64_t readTSC() {
    // _mm_lfence();  // optionally wait for earlier insns to retire 
                      // before reading the clock
    uint64_t tsc = __rdtsc();
    // _mm_lfence();  // optionally block later instructions until rdtsc retires
    return tsc;
}

int main() {
    std::vector<int> data(512);

    auto kernel = Vectorization::createKernel<simd>([&]
                  (auto & factory, auto & idThread, int * data){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail

        auto value = factory.template generate<int>();
        auto value2 = factory.template generate<int>();
        auto value3 = factory.template generate<int>();

        value.readFrom(data,idThread);

        auto laneIndex = factory.template generate<int>();
        auto laneIndexIncremented = factory.template generate<int>();
        idThread.modulus(currentSimdWidth,laneIndex);

        auto mask = factory.template generate<int>();

        auto gatheredValue = factory.template generate<int>();

        auto t1 = readTSC();

        // reduction
        for(int i = 64; i>0; i/=2)
        {
            // lane index < i ?
            laneIndex.lessThan(i,mask);

            // lane index + i
            laneIndex.add(i,laneIndexIncremented);

            // value = gathered from [lane]
            value.readFromContiguous(data, laneIndex);

            // value2 = gathered from [lane index + i]
            value2.readFromContiguous(data, laneIndexIncremented);

            // value3 = value + value2
            value.add(value2,value3);

            // if(lane index < i) then value3 ---> value else value ----> value
            mask.ternary(value3,value,value);

            value.writeToContiguous(data,idThread);
        }

        auto t2 = readTSC();
        std::cout<<t2-t1<<" cycles per 128-element reduction"<<std::endl;
      },
      Vectorization::KernelArgs<int*>{}
    );

    for(int i=0;i<512;i++)
    {
      data[i]=i;
    }
    kernel.run(500,data.data());

    for(int i=0;i<35;i++)
    {
      std::cout<<data[i]<<" ";
    }
    std::cout<<std::endl;
    return 0;
}

对于 FX8150 核心,每次缩减 198 个周期,对于 Cascadelake(Godbolt.org 服务器)为 28 个周期。实际周期数较低,因为 readTSC 函数(调用两次)也有自己的延迟。

128 个值缩减需要 28 个周期,相当于平均每方法调用 0.67 个周期。由于 Cascadelake 服务器通常运行 2.5GHz 或 3.0Ghz,平均每方法调用约 0.2 纳秒(每秒 4GB / 200kB 缓冲区约 300 个元素)。样本的唯一问题是变量的分配开销及其可读性与真正的 CUDA/OpenCL 内核代码相比。

28 个周期用于 128 个值缩减意味着每元素不到 0.1 纳秒,这是一个朴素的缩减算法,效率不高但缓存友好(128 个元素需要 6 步,在 L1 缓存或寄存器中完成,具体取决于 CPU 物理寄存器的大小数量)。

8x8 矩阵转置示例

应用于 FX8150 CPU 上的 100 个 8x8 矩阵的朴素转置算法

#include <algorithm>

#include <iostream>
#include"VectorizedKernel.h"

constexpr int simd = 8;
int main() {

    constexpr int N = 100;
    // N matrices of sizes simd*simd
    std::vector<int> data(simd*simd*N);
    std::vector<int> dataOut(simd*simd*N);

    auto kernel = Vectorization::createKernel<simd>([&]
                  (auto & factory, auto & idThread, int * data, int * dataOut){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail
        auto value = factory.template generate<int>();
        auto indexRead = factory.template generate<int>(idThread);
        auto indexWrite = factory.template generate<int>();
        auto column = factory.template generate<int>();
        auto row = factory.template generate<int>();

        // transpose 100 matrices of size simd*simd
        Vectorization::Bench bench(0);
        for(int i=0;i<N;i++)
        {
            for(int j=0;j<currentSimdWidth;j++)
            {
                // read
                idThread.add(j*currentSimdWidth,indexRead);

                value.readFrom(data+i*currentSimdWidth*currentSimdWidth,indexRead);

                // compute row
                indexRead.div(currentSimdWidth,row);

                // compute column
                indexRead.modulus(currentSimdWidth,column);

                // compute write address for transposed output 
                // (row becomes column, column becomes row)
                column.mul(currentSimdWidth,indexWrite);
                indexWrite.add(row,indexWrite);

                // write transposed
                value.writeTo(dataOut+i*currentSimdWidth*currentSimdWidth,indexWrite);
            }
        }
      },
      Vectorization::KernelArgs<int*,int*>{}
    );

    for(int i=0;i<simd*simd*N;i++)
    {
      data[i]=i;
    }

    kernel.run(simd,data.data(),dataOut.data());
    kernel.run(simd,data.data(),dataOut.data());
    kernel.run(simd,data.data(),dataOut.data());

    for(int i=0;i<100;i++)
    {
      std::cout<<dataOut[i]<<" ";
    }
    std::cout<<std::endl;
    return 0;
}

输出如下

1.569e-05 seconds (for 100 matrices)
1.6099e-05 seconds (for 100 matrices)
1.6281e-05 seconds (for 100 matrices)
0 8 16 24 32 40 48 56 1 9 17 25 33 41 49 57 2 10 18 26 34 42 50 58 3 11 19 
27 35 43 51 59 4 12 20 28 36 44 52 60 5 13 21 29 37 45 53 61 6 14 22 30 38 
46 54 62 7 15 23 31 39 47 55 63 64 72 80 88 96 104 112 120 65 73 81 89 97 
105 113 121 66 74 82 90 98 106 114 122 67 75 83 91 99 107 115 123 68 76 84 92

矩阵的每个元素平均以 2.5 纳秒(约 1.6 GB/s)进行转置移动。线性代数问题的朴素解决方案并不总是很快。对于带有 simd=32(用于转置 32x32 矩阵)的 Cascadelake CPU,它是 1.1 纳秒(约 3.9 GB/s)。

优化代码应使编译器生成 SIMD 混洗指令。gatherFromLane 是生成混洗命令的一种方式。

略微优化版本,不使用 gatherFromLane,而是将所有矩阵元素存储在单个 simd 变量中

#include <algorithm>

#include <iostream>
#include"VectorizedKernel.h"

// each simd group will now hold a whole 8x8 matrix
constexpr int matrixWidth = 2;
constexpr int simd = matrixWidth*matrixWidth;
int main() {

    constexpr int N = 1000;
    // N matrices of sizes simd*simd
    std::vector<int> data(simd*N);
    std::vector<int> dataOut(simd*N);

    auto kernel = Vectorization::createKernel<simd>([&]
                  (auto & factory, auto & idThread, int * data, int * dataOut){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail
        auto value = factory.template generate<int>();
        auto indexRead = factory.template generate<int>(idThread);
        auto indexWrite = factory.template generate<int>();
        auto column = factory.template generate<int>();
        auto row = factory.template generate<int>();

        // compute row
        indexRead.div(matrixWidth,row);

        // compute column
        indexRead.modulus(matrixWidth,column);

        // compute write address for transposed output 
        // (row becomes column, column becomes row)
        column.mul(matrixWidth,indexWrite);
        indexWrite.add(row,indexWrite);

        // transpose 1000 matrices of size simd*simd
        Vectorization::Bench bench(0);
        for(int i=0;i<N;i++)
        {
                // read whole matrix
                value.readFrom(data+i*currentSimdWidth,indexRead);

                // write transposed
                value.writeTo(dataOut+i*currentSimdWidth,indexWrite);
        }
      },
      Vectorization::KernelArgs<int*,int*>{}
    );

    for(int i=0;i<simd*N;i++)
    {
      data[i]=i;
    }

    kernel.run(simd,data.data(),dataOut.data());
    kernel.run(simd,data.data(),dataOut.data());
    kernel.run(simd,data.data(),dataOut.data());

    for(int i=0;i<100;i++)
    {
      std::cout<<dataOut[i]<<" ";
    }
    std::cout<<std::endl;
    return 0;
}

FX8150 上的输出如下

3.155e-06 seconds
3.002e-06 seconds
2.826e-06 seconds
0 2 1 3 4 6 5 7 8 10 9 11 12 14 13 15 16 18 17 19 20 22 21 23 24 26 25 27 
28 30 29 31 32 34 33 35 36 38 37 39 40 42 41 43 44 46 45 47 48 50 49 51 52 
54 53 55 56 58 57 59 60 62 61 63 64 66 65 67 68 70 69 71 72 74 73 75 76 78 
77 79 80 82 81 83 84 86 85 87 88 90 89 91 92 94 93 95 96 98 97 99

每个矩阵元素 0.7 纳秒(5.7 GB/s)。在具有 simd=16matrixWidth=4)的 Cascadelake CPU 上,每矩阵元素 0.11 纳秒(36GB/s)。

使用 gatherFromLane 的第二个优化

#include <algorithm>

#include <iostream>
#include"VectorizedKernel.h"

// each simd group will now hold a whole 8x8 matrix
constexpr int matrixWidth = 2;
constexpr int simd = matrixWidth*matrixWidth;
int main() {

    constexpr int N = 1000;
    // N matrices of sizes simd*simd
    std::vector<int> data(simd*N+64);
    std::vector<int> dataOut(simd*N+64);
    int * alignedInput = data.data();
    int * alignedOutput = dataOut.data();
    while(((size_t) alignedInput) % 64 !=0)
        alignedInput++;

    while(((size_t) alignedOutput) % 64 !=0)
        alignedOutput++;

    auto kernel = Vectorization::createKernel<simd>([&]
                  (auto & factory, auto & idThread, int * data, int * dataOut){
        const int currentSimdWidth = factory.width; // simd on aligned-body, 
                                                    // 1 on non-aligned tail
        auto valueIn = factory.template generate<int>();
        auto valueOut = factory.template generate<int>();

        auto indexRead = factory.template generate<int>(idThread);
        auto indexWrite = factory.template generate<int>();
        auto column = factory.template generate<int>();
        auto row = factory.template generate<int>();

        // compute row
        indexRead.div(matrixWidth,row);

        // compute column
        indexRead.modulus(matrixWidth,column);

        // compute write address for transposed output 
        // (row becomes column, column becomes row)
        column.mul(matrixWidth,indexWrite);
        indexWrite.add(row,indexWrite);

        // transpose 1000 matrices of size simd*simd
        {
            Vectorization::Bench bench(0);
            for(int i=0;i<N;i++)
            {
                    // read whole matrix
                    valueIn.readFrom(data+i*simd);

                    // gather operation
                    valueIn.gatherFromLane(indexWrite,valueOut);
                    //valueIn.transposeLanes(matrixWidth,valueOut); // same result

                    // write transposed
                    valueOut.writeTo(dataOut+i*simd);
            }
        }
      },
      Vectorization::KernelArgs<int*,int*>{}
    );

    for(int i=0;i<simd*N;i++)
    {
      alignedInput[i]=i;
    }
    for(int i=0;i<100000;i++)
        kernel.run(simd,alignedInput,alignedOutput);

    for(int i=0;i<100;i++)
    {
      std::cout<<alignedOutput[i]<<" ";
    }
    std::cout<<std::endl;
    return 0;
}
  • FX8150(simd=4):每元素 0.28 纳秒(14 GB/s)
  • Cascadelake(simd=4):每元素 0.098 纳秒(40 GB/s)
  • Cascadelake(simd=16):每元素 0.094 纳秒(42 GB/s)

矩阵转置受内存带宽限制,简单的 C++ 标量版本可以轻松达到 RAM 带宽瓶颈。Cascadelake 的 L2 到 L1 缓存带宽为每周期 64 字节。在 3GHz 下,这意味着 192 GB/s。gatherFromLane 方法调用将 192GB/s 的带宽降低到 42 GB/s。gatherFromLane 为 Cascadelake CPU 生成“vpermd”指令,其延迟为 3 个周期(来自指令表)。收集 16 个车道值(为指令进行置换)需要 3 个周期,相当于每周期约 5 个整数/浮点数。在 3GHz 下为 15 giga 整数/浮点数,或 45 GB/s 带宽,接近测得的 42 GB/s。transposeLanes 方法对于相同的任务略微更有效,因为转置的模式在编译时已知,并且在循环中少了一个 mov 命令。显然,拥有两个 AVX512 FMA 单元对纯内存任务没有帮助。为了在矩阵-矩阵乘法期间充分利用 FMA 单元,必须将多个子矩阵存储在 AVX512 寄存器中,并通过置换或混洗寄存器来选择,然后使用 fusedMultiplyAddfusedMultiplySub 方法进行计算。

矩阵乘法示例

矩阵-矩阵乘法的朴素版本实现起来很直接。

#include "VectorizedKernel.h"
#include<vector>
int main()
{
    // C = A x A kernel
    constexpr int simd = 8;
    constexpr int matrixSize = 512;
    auto kernelMatrixMultiplication = Vectorization::createKernel<simd>
    ([&](auto & factory, auto & idThread, float * bufferIn, float * bufferOut){
        const int currentSimdWidth = factory.width;

        auto ROW = factory.template generate<int>();
        idThread.div(matrixSize, ROW);
        auto COL = factory.template generate<int>();
        idThread.modulus(matrixSize, COL);

        auto tmpSum = factory.template generate<float>(0.0f);

        auto A = factory.template generate<float>(0.0f);
        auto B = factory.template generate<float>(0.0f);

        auto AmulB = factory.template generate<float>(0.0f);

        auto indexA = factory.template generate<int>(0);

        auto indexB = factory.template generate<int>(0);

        for(int i=0;i<matrixSize;i++)
        {
            // compute index of A element
            ROW.mul(matrixSize, indexA);
            indexA.add(i,indexA);

            // compute index of B element
            COL.add(i*matrixSize, indexB);

            // read A and B
            A.readFrom(bufferIn, indexA); // A[ROW * N + i] --> gather operation
            B.readFrom(bufferIn, indexB); // A[i * N + COL] --> contiguous

            // multiply
            A.mul(B,AmulB);

            // add
            tmpSum.add(AmulB, tmpSum);
        }
        // write C index
        auto writeC = factory.template generate<int>();
        ROW.mul(matrixSize, writeC);
        writeC.add(COL, writeC);
        tmpSum.writeTo(bufferOut,writeC);

    },Vectorization::KernelArgs<float*,float*>{});

    for(int j=0;j<25;j++)
    {
        std::vector<float> test1(matrixSize*matrixSize),test2(matrixSize*matrixSize);
        for(int i=0;i<matrixSize*matrixSize;i++)
        {
            test1[i]=2.0f;
        }

        std::cout<< "matrix multiplication 
        ("<<matrixSize*(size_t)matrixSize*matrixSize*2<<" operations total) ";
        {
            Vectorization::Bench bench(0);
            kernelMatrixMultiplication.run
            (matrixSize * matrixSize,test1.data(),test2.data());
        }

        for(int i=0;i<3;i++)
        {
            std::cout<<test2[i]<<" ";
        }
        std::cout<<std::endl;
    }

    return 0;
}

在 Cascadelake CPU(simd=32)上,完成 2.68 亿次浮点运算需要 0.097 秒(FX8150 simd=8 为 0.133 秒)。2.7 GFLOPs 显示了跨步读取(步长 = 大矩阵的多行)的效率以及缓存的缺乏。由于没有子矩阵优化,它甚至不能有效地使用 L1 缓存。为了能够首先使用缓存,可以进行“平铺”/“子矩阵”计算。

假设第二个矩阵是另一个矩阵缓冲区并且已经转置,数据收集部分可以完全连续,并且 mul & add 方法可以连接在一起作为融合乘加运算

#include "VectorizedKernel.h"
#include<vector>
int main()
{
    // C = A x A kernel
    constexpr int simd = 4;
    constexpr int matrixSize = 512;
    auto kernelMatrixMultiplication = Vectorization::createKernel<simd>
    ([&](auto & factory, auto & idThread, float * bufferIn, float * bufferOut){
        const int currentSimdWidth = factory.width;

        auto ROW = factory.template generate<int>();
        idThread.div(matrixSize, ROW);
        auto COL = factory.template generate<int>();
        idThread.modulus(matrixSize, COL);

        auto tmpSum = factory.template generate<float>(0.0f);

        auto A = factory.template generate<float>();
        auto B = factory.template generate<float>();

        auto AmulB = factory.template generate<float>();

        auto indexA = factory.template generate<int>();
        auto indexAmul = factory.template generate<int>();

        auto indexB = factory.template generate<int>();
        auto indexBmul = factory.template generate<int>();
        ROW.mul(matrixSize, indexAmul);
        COL.mul(matrixSize, indexBmul);
        for(int i=0;i<matrixSize;i++)
        {
            // compute index of A element
            indexAmul.add(i,indexA);

            // compute index of B element
            indexBmul.add(i,indexB);

            // read A and B
            A.readFrom(bufferIn, indexA); // A[ROW * N + i]
            B.readFrom(bufferIn, indexB); // A[COL * N + i]

            // A * B + tmpSum --> tmpSum
            A.fusedMultiplyAdd(B,tmpSum,tmpSum);
        }
        // write C index
        auto writeC = factory.template generate<int>();
        ROW.mul(matrixSize, writeC);
        writeC.add(COL, writeC);
        tmpSum.writeTo(bufferOut,writeC);

    },Vectorization::KernelArgs<float*,float*>{});

    for(int j=0;j<25;j++)
    {
        std::vector<float> test1(matrixSize*matrixSize),test2(matrixSize*matrixSize);
        for(int i=0;i<matrixSize*matrixSize;i++)
        {
            test1[i]=2.0f;
        }

        std::cout<< "matrix multiplication 
        ("<<matrixSize*(size_t)matrixSize*matrixSize*2<<" operations total) ";
        {
            Vectorization::Bench bench(0);
            kernelMatrixMultiplication.run
            (matrixSize * matrixSize,test1.data(),test2.data());
        }

        for(int i=0;i<3;i++)
        {
            std::cout<<test2[i]<<" ";
        }
        std::cout<<std::endl;
    }

    return 0;
}

更好的内存访问模式可提高性能。FX8150 完成 512x512 乘法需要 0.053 秒,即 5.1 GFLOPs。每次浮点运算,从缓存中获取 8 字节,并实现 40GB/s 带宽,而无需显式数据重用。

为了实验数据重用优化,以下示例侧重于使用 generateArray 工厂方法创建具有垂直内存布局的数组来进行寄存器平铺,以通过通常比水平版本(如点积)更有效的垂直 SIMD CPU 指令来实现简单数组操作的并行化。

#include "VectorizedKernel.h"
#include<vector>
int main()
{
    // chunk width for lock-step computation
    constexpr int simd = 4;

    // size of A, B, C tile matrices to fit in registers
    // per-work-item required register size is 3 x (matrixWidth^3) x sizeof(float)
    // 288 bytes for matrixWidth=8 (1 avx register is 256bits or 32bytes)
    constexpr int matrixWidth = 8;  

    // benchmark length
    constexpr int totalMatMatMultiplications = 10000; 

    constexpr int numMatrices = totalMatMatMultiplications*2;
    constexpr int numElements = matrixWidth*matrixWidth*numMatrices;
    size_t totalTime=0;
    auto kernelMatrixMultiplication = Vectorization::createKernel<simd>
    ([&](auto & factory, auto & idThread, float * bufferIn, float * bufferOut){

        // private array has vertical memory layout => 
        // vertical matrix multiplication in SIMD units
        auto privateMatrix1 = factory.template 
             generateArray<float,matrixWidth*matrixWidth>();
        auto privateMatrix2 = factory.template 
             generateArray<float,matrixWidth*matrixWidth>();
        auto privateMatrix3 = factory.template 
             generateArray<float,matrixWidth*matrixWidth>();

        auto readIndexStart = factory.template generate<int>();
        auto writeIndexStart = factory.template generate<int>();
        auto readIndex = factory.template generate<int>();
        auto writeIndex = factory.template generate<int>();

        // compute matrix start (1 work item = 2 matrices)
        idThread.mul(matrixWidth*matrixWidth*2,readIndexStart);
        // compute matrix output (1 work item = 1 matrix C=A*B)
        idThread.mul(matrixWidth*matrixWidth,writeIndexStart);

        // load matrix 1&2
        for(int j=0;j<matrixWidth;j++)
        {
            for(int i=0;i<matrixWidth;i++)
            {
                readIndexStart.add(j*matrixWidth + i,readIndex);
                privateMatrix1[i+j*matrixWidth].readFrom(bufferIn,readIndex);
                privateMatrix2[i+j*matrixWidth].readFrom
                              (bufferIn+(matrixWidth*matrixWidth),readIndex);
            }
        }

        // this program measures only the in-register multiplication part
        size_t tmpTime;
        {
            Vectorization::Bench benchmark(&tmpTime);
            // multiply matrices fully in registers 
            // "simd" number of sgemm multiplications at a time, vertically
            for (int i = 0; i < matrixWidth; i++)
            {
                for (int j = 0; j < matrixWidth; j++)
                {
                    //C[i][j] = 0;
                    privateMatrix1[i*matrixWidth].mul(privateMatrix2[j],
                                   privateMatrix3[i*matrixWidth+j]);
                    for (int k = 1; k < matrixWidth; k++)
                    {
                        // C[i][j] += A[i][k]*B[k][j];
                        privateMatrix1[i*matrixWidth+k].fusedMultiplyAdd
                        (privateMatrix2[k*matrixWidth+j],
                        privateMatrix3[i*matrixWidth+j],privateMatrix3[i*matrixWidth+j]);
                    }
                }
            }
        }
        totalTime += tmpTime;

        // store the result matrix
        for(int j=0;j<matrixWidth;j++)
        {
            for(int i=0;i<matrixWidth;i++)
            {
                writeIndexStart.add(j*matrixWidth + i,writeIndex);
                privateMatrix3[i+j*matrixWidth].writeTo(bufferOut,writeIndex);
            }
        }

    },Vectorization::KernelArgs<float*,float*>{});

    std::vector<float> test1(numElements),test2(numElements);

    for(int i=0;i<numElements;i++)
        test1[i]=i;

    for(int i=0;i<100;i++)
    {

        totalTime=0;
        kernelMatrixMultiplication.run(totalMatMatMultiplications,
                                       test1.data(),test2.data());
        std::cout<<"total time to do "<<totalMatMatMultiplications<<
         " multiplications is "<<totalTime<<
         " nanoseconds ("<<totalTime/((float)totalMatMatMultiplications)<<
         " per multiplication)"<<
         " ("<<1.0f/((totalTime/((float)totalMatMatMultiplications))/
         (matrixWidth*(float)matrixWidth*matrixWidth*2.0))<<
         " floating-point operations per nanosecond)"<<std::endl;
    }
    for(int i=0;i<15;i++)
        std::cout<<test2[i]<<std::endl;

    return 0;
}

对多个 simd - matrixWidth 组合重复了基准测试

Setting                |    32-bit floating-point operations per nanosecond
simd    matrixWidth    |    FX8150 Bulldozer           Xeon 8275CL Cascadelake
1       1              |    0.04                       0.10
1       2              |    0.32                       0.76
1       4              |    2.03                       3.28
1       8              |    3.47                       16.25
2       1              |    0.08                       0.18
2       2              |    0.61                       1.45
2       4              |    3.12                       4.41
2       8              |    3.98                       14.65     
4       1              |    0.17                       0.4
4       2              |    1.14                       2.67
4       4              |    6.73                       9.14
4       8              |    14.42                      30.12
8       1              |    0.33                       1.0
8       2              |    2.67                       5.33
8       4              |    10.67                      24.81
8       5              |    14.40                      11.33
8       6              |    16.84                      58.31
8       7              |    17.59 !!                   18.15
8       8              |    12.34                      22.75
16      1              |    0.67                       1.53
16      2              |    5.33                       9.20
16      4              |    14.22                      33.83 
16      5              |    14.12                      31.26
16      6              |    4.03                       55.13   
16      7              |                               64.28  !!      
16      8              |    4.12                       35.31
1       16             |    2.87                       21.06
2       16             |    5.51                       8.50
4       16             |    7.55                       14.76
8       16             |    5.33                       16.31
16      16             |    3.63                       31.50
32      2              |    8.00                       12.65
2       32             |    2.18                       5.06
32      32             |    3.75                       19.85
32      4              |                               23.22
32      5              |                               47.65
32      6              |                               42.45
32      7              |                               39.02 
32      8              |                               25.24
64      4              |                               35.04
64      5              |                               28.11
64      6              |                               22.18

由于Bulldozer 架构只有 AVX-v1,并且工作方式类似于 SSE 单元(对于单线程)连接在一起,因此使用 simd=8matrixWidth=7 实现了最佳性能。在 17.59 GFLOPs 时,它比朴素循环密集 3 倍(仅乘以 7x7 大小的流式矩阵),并且是理论峰值性能的 30%(理论峰值是使用 FMA 操作仅用于相等数量的加法和乘法以及没有其他操作影响流水线)。

Cascadelake 架构支持 AVX512。这意味着它不仅可以访问 32 个 512 位(64 字节)的架构寄存器,还可以访问 168 个重命名(作为物理寄存器)的向量寄存器,这使其能够在达到 L1 之前存储更多数据以重用相同的数据。对于 simd=16 matrixWidth=7 设置,实现了最佳性能。在 64.28 GFLOPs 时,它占 Cascadelake 服务器级 CPU 峰值性能的 30% 到 50%(取决于 godbolt.org 服务器当前负载),该 CPU 在多个客户端之间共享。当算法使用的寄存器空间少于 CPU 支持时,它可以更多地受益于流水线和 simd=32-64 设置,具体取决于流水线效率。

在最后一个示例中,使用 factory.template generateArray<Type,NumElements>(); 来模拟并行数组。这在内存中具有垂直布局,其中工作项的每个元素之间有 Simd 步长。这使得编译器能够并行化内核内的数组操作。乘法的计时测量仅围绕它们自身进行,所有 I/O 都被排除。为了能够利用这种性能,需要实现缓存平铺,以便足够快地用大矩阵的新子矩阵来填充寄存器以进行乘法。最快的矩阵乘法仅通过寄存器平铺、缓存平铺以及通过内置函数显式选择高效的 CPU 指令版本来实现。 实现没有使用任何内置函数或 gnu 语言扩展。最大性能几乎总是需要使用内置函数(和指令表经验)。当它们不使用时,只要它有一个可以自动向量化朴素循环和朴素数组的 C++ 编译器,该库就可以在任何平台上(无宏定义)移植。最后,除了将 simd=16 替换为 simd=128 时,它不需要任何代码重写,当 AVX-2048 CPU 启动时。编译器将处理 KernelData 对象每个方法中的所有朴素循环迭代朴素数组。

Sqrt 和其他数学函数有多快?

目前,该项目仅在自动并行化循环中使用标准函数。Sqrt 是一个成功自动向量化的数学函数,sinFastcosFastexpFast 也被添加进来,用于精度较低但速度很快的版本(参见文章的最后部分)。由于标准实现只显示一个函数跳转指令,因此也有非并行化的版本。在未来的版本中,PowLog 方法将提供具有优化系数(通过遗传算法找到的“魔术数字”)的多项式近似版本。

了解性能的最佳方法是进行基准测试。在下面的示例中,Quake-3(John Carmack)的快速逆平方根与朴素的 1.0f/std::sqrt(x) 循环(由编译器有效地并行化)、VectorizedKernel 版本的逆平方根(通过先除法后平方根计算,然后是第二个版本,通过直接调用 rsqrt)和 VectorizedKernel 版本的 Quake-3 快速逆平方根进行了测试。

GCC v11, simd=8 for AVX, simd=16 for AVX512, n=20million

              -------- Simple Loop ----------   ------- Kernel Version --------
              (1/std::sqrt)           Q_rsqrt   Q_rsqrt    (1/sqrt)     rsqrt 
FX8150        0.035 s                 0.092 s   0.037 s    0.036 s      0.036 s
8275CL        0.012 s                 0.014 s   0.013 s    0.012 s      0.012 s

CLANG v14(simd=16)
8275CL        0.016 s                 0.014s    0.071 s    0.068 s      0.064 s

ICC 2021(simd=16)
8275CL        0.019 s                 0.026 s   0.016 s    0.013 s      0.014 s

由于编译器能有效地自动向量化简单的 1/sqrt 循环,因此它的性能与内核版本相当,该内核版本在内部进行类似的工作流程,并且有内核启动延迟。由于 ICC 无法有效地向量化串行版本,因此内核版本具有速度提升。

基准测试代码

#include "VectorizedKernel.h"

constexpr int n = 20000000;

float Q_rsqrt( float number )
{
    int i;
    float x2, y;
    const float threehalfs = 1.5F;

    x2 = number * 0.5F;
    y  = number;
    i  = * ( int * ) &y;                      
    i  = 0x5f3759df - ( i >> 1 );             
    y  = * ( float * ) &i;
    y  = y * ( threehalfs - ( x2 * y * y ) ); // newton-raphson iteration
//    y  = y * ( threehalfs - ( x2 * y * y ) );

    return y;
}

void Q_rsqrt_simd(std::vector<float> & input, std::vector<float> & output)
{
    constexpr int simd = 16;
    auto kernel = Vectorization::createKernel<simd>([&]
         (const auto & factory, auto & idThread, float * bufIn, float * bufOut)
    {
        auto i = factory.template generate<unsigned int>();
        auto x2= factory.template generate<float>();
        auto y= factory.template generate<float>();
        const auto threehalfs = factory.template generate<float>(1.5f);
        auto threehalfsMinusX2 = factory.template generate<float>();
        auto number = factory.template generate<float>();
        auto iRightShifted = factory.template generate<unsigned int>();
        auto iRightShiftedAndSubtractedFromMagicNumber = 
                             factory.template generate<unsigned int>();
        auto magicNumber = factory.template generate<unsigned int>(0x5f3759df);

        for(int k = 0; k<n/simd; k++)
        {
            number.readFrom(bufIn,idThread);
            number.mul(0.5f,x2);

            y=number; // expensive
            y.readFrom(number);

            y.template castBitwiseAndCopyTo<unsigned int>(i);
            i.rightShift(1,iRightShifted);
            magicNumber.sub(iRightShifted,i);
            i.template castBitwiseAndCopyTo<float>(y);
            x2.mul(y,x2);
            x2.mul(y,x2);
            threehalfs.sub(x2,threehalfsMinusX2);
            y.mul(threehalfsMinusX2,y);

            y.writeTo(bufOut,idThread);
            idThread.add(simd,idThread);
        }
    },Vectorization::KernelArgs<float*,float*>{});
    kernel.run(simd,input.data(),output.data());
}

void rsqrt_simd(std::vector<float> & input, std::vector<float> & output)
{
    constexpr int simd = 16;
    auto kernel = Vectorization::createKernel<simd>([&]
         (const auto & factory, auto & idThread, float * bufIn, float * bufOut)
    {

        auto y= factory.template generate<float>();

        const auto one= factory.template generate<float>(1.0f);

         for(int k = 0; k<n; k+=simd)
         {
            y.readFrom(bufIn+k);
            y.sqrt(y);
            one.div(y,y);
            y.writeTo(bufOut+k);
         }

    },Vectorization::KernelArgs<float*,float*>{});
    kernel.run(simd,input.data(),output.data());
}

void rsqrt_simd2(std::vector<float> & input, std::vector<float> & output)
{
    constexpr int simd = 16;
    auto kernel = Vectorization::createKernel<simd>([&]
         (const auto & factory, auto & idThread, float * bufIn, float * bufOut)
    {
        auto y= factory.template generate<float>();

        for(int k = 0; k<n; k+=simd)
        {
            y.readFrom(bufIn+k);
            y.rsqrt(y);
            y.writeTo(bufOut+k);
        }
    },Vectorization::KernelArgs<float*,float*>{});
    kernel.run(simd,input.data(),output.data());
}

int main() {
  std::vector<float> input(n),output(n);
  for(int i = 0; i<n; i++)
      input[i]=1+i;
  std::cout<<" ---------------------- 1.0f/std::sqrt ----------------------------- "
           <<std::endl;
  for(int k=0;k<20;k++)
  {
      {
          Vectorization::Bench bench(0);
          for(int i = 0; i<n; i++)
          {
              output[i]=1.0f/std::sqrt(input[i]);
          }
      }
      std::cout<<output[0]<<" "<<output[1]<<" "<<output[2]<<" "<<output[3]<<" 
               "<<output[5]<<" "<<std::endl;
  }
  std::cout<<" ---------------------- Q_rsqrt ----------------------------- "
           <<std::endl;
  for(int k=0;k<10;k++)
  {
      {
          Vectorization::Bench bench(0);
          for(int i = 0; i<n; i++)
          {
              output[i]=Q_rsqrt(input[i]);
          }
      }
      std::cout<<output[0]<<" "<<output[1]<<" "<<output[2]<<" "<<output[3]<<" "
               <<output[5]<<" "<<std::endl;
  }
  std::cout<<" ---------------------- Q_rsqrt simd ----------------------------- "
           <<std::endl;
  for(int k=0;k<10;k++)
  {
      {
        Vectorization::Bench bench(0);
        Q_rsqrt_simd(input,output);
      }
      std::cout<<output[0]<<" "<<output[1]<<" "<<output[2]<<" "<<output[3]<<" "
               <<output[5]<<" "<<std::endl;
  }
  std::cout<<" ----------------------rsqrt simd ----------------------------- "
           <<std::endl;
  for(int k=0;k<10;k++)
  {
      {
        Vectorization::Bench bench(0);
        rsqrt_simd(input,output);
      }
      std::cout<<output[0]<<" "<<output[1]<<" "<<output[2]<<" "<<output[3]<<" "
               <<output[5]<<" "<<std::endl;
  }
  std::cout<<" ----------------------rsqrt simd2 ----------------------------- "
           <<std::endl;
  for(int k=0;k<10;k++)
  {
      {
        Vectorization::Bench bench(0);
        rsqrt_simd2(input,output);
      }
      std::cout<<output[0]<<" "<<output[1]<<" "<<output[2]<<" "<<output[3]<<" "
               <<output[5]<<" "<<std::endl;
  }
  return 0;
}

在 Quake-3 逆平方根开发时,CPU 还没有足够先进,无法快速计算平方根和逆平方根。当时使用自定义计算更快。今天(2022 年),它在精度降低的情况下速度更慢或性能相近,并且为了仅 1% 的可能速度提升而降低精度是不值得的。

多线程

内核启动方法有一个可选的多线程版本。runMultithreaded<numThreads>(n,Args...) 的运行方式与 run(n,Args...) 方法相同,除了额外的线程,并且可以选择使用 OpenMP(如果从编译器标志启用了 -fopenmp)。

曼德尔布罗集生成器回顾

只需更改一行代码,就可以激活一个简单的平均工作量共享调度器

//kernel.run(width*height,image.data());
kernel.runMultithreaded<8>(width*height,image.data());

相同的基准测试程序在 FX8150 3.6 GHz CPU(禁用睿频)上生成图像的速度为每像素 25 个周期(使用 OpenMP 为 22 个周期)。FX8150 有 4 个 FPU 模块,每个模块在两个核心之间共享。这限制了最高 4 倍的扩展,在此样本中为 3.5 倍。

还有一个带有原子负载均衡的第二个版本

kernel.runMultithreadedLoadBalanced<8,resolution=100>(width*height,image.data());

18 毫秒内完成相同的任务(2000x2000 像素,每个像素最多 35 次迭代)。非负载均衡版本需要超过 27 毫秒才能完成。使用 32 个线程(以获得更好的工作分布),它在 21 毫秒内完成,仍然比原子工作分布慢。算法很简单

chunk size = 1 + ( (n / simd) / number of threads) / load_balance_resolution
while work remaining
    atomically increment chunk counter
    if fetched chunk index is out of bounds, 
        work remaining = false
    if fetched chunk index is in bound
        then compute chunk

对于 2000x2000 像素,有 125000 次内核启动。对于 125000 次内核启动,使用 8 个线程和 100(默认)的负载均衡分辨率,块大小为 157。当内核启动次数很少(如 100x100)时,分辨率应降低。对于此曼德尔布罗集样本,每次原子访问 157 次内核启动可以轻松隐藏原子访问延迟。

Xeon 8275CL,1 个线程在 20 毫秒内完成相同任务。使用 2 个线程,13 毫秒。Godbolt.org 仅向每个客户端提供两个线程,并且两个线程可能通过同步多线程使用同一核心。这为每个客户端提供了比整个 FX8150 CPU 多 50% 的计算能力。

如果只有两个线程可以在 13 毫秒内生成相同的曼德尔布罗集图像,那么一个专用的服务器(所有 24 个线程)应该在约 1.1 毫秒内生成一张图像,而无需进行任何显式的硬件相关优化,而是仅使用朴素数组和简单循环的封装来实现 GPGPU 内核方法。如果封装没有开销,它将达到理论峰值 gflops 值。

加速(近似)数学函数

有许多方法可以近似重要的(三角/超越)函数。一种流行但较慢的方法是训练神经网络并在生产中使用推理。它们可以很好地映射到几乎所有问题。但是 SIMD 计算需要每一位性能来获得比标准函数(如 std::sin)可靠的速度提升。直接使用神经网络 API 会增加数据加载、计算和结果收集的开销。另一方面,在手工编写的计算中直接使用神经网络的权重矩阵可以与其他方法相媲美。

另一种成功的方法是使用截断多项式来近似函数,特别是 Chebyshev 多项式。这类多项式在“魔术”数字(系数)的基础上,在采样输入范围内具有更好的误差分布。其他类似方法,如泰勒级数,不能有效地收敛到解决方案,并且比 Chebyshev 多项式产生更差的误差分布。

当用例足够窄时,函数几乎可以变为 O(1) 操作。例如,当正弦计算需要接近零时,可以说下面的函数是可行的。

float sinUltraFast(float inputTiny)
{
    return inputTiny; // because sin(x)=~x when x goes 0
}

为了在比上述函数更大的范围内获得足够好的加速和足够好的精度,VectorizedKernel 中使用了 Chebyshev 多项式,并针对 [-1,1] 范围进行了优化。

在台式机 PC 上有足够的计算能力时,找到 Chebyshev 多项式的系数很容易。开发 PC 有三个 GPU:一个 GT1030 和两个 K420。这三个 GPU 合起来可以在实际算法中支持超过 1 TFLOPS 的计算性能。

找到近似函数的函数的多项式系数是全局优化的一个好实践。遗传算法是全局优化工具列表中的一个有用启发式方法,用于此任务的版本通过 CUDA 加速。

在使用 CUDA 加速的遗传算法之前,必须选择 Chebyshev 多项式的骨架。对于 sin(x) 函数,它在所有系数附近都有 x 的奇数幂。对于 cos(x) 函数,它有 x 的偶数幂,靠近所有系数。对于 exp(x) 函数,它同时具有偶数和奇数幂的系数(包括零,它使用一个常量字面值)。

从 Sin(x) 函数开始,存在以下全局优化问题

[x^k means computing pow(x,k=integer), not XOR]

c1 * x^9 + c2 * x^7 + c3 * x^5 + c4 * x^3 + c5 * x = sin(x)

遗传算法必须做的就是计算两边的差值

diff =  sin(x) - ( c1 * x^9 + c2 * x^7 + c3 * x^5 + c4 * x^3 + c5 * x ) 

然后将其平方值添加到误差累加器中

error += diff * diff

最后,将误差作为选定 DNA 在遗传算法中的当前适应度值返回

return error; // --> DNAs with less error will be populated more, to survive more

其余部分由遗传算法处理,它在 DNA 的最终代中接近一个不错的平均误差。(并行化方案是:每个 cuda 线程块计算 1 个 DNA,扫描 [-1,1] 范围内的 10000 个数据点并累加所有误差,并行进行)

几分钟后,遗传算法返回魔术数字

c1 = -0.0005664825439453125
c2 = 0.001037120819091796875
c3 = 0.007439136505126953125
c4 = -0.166426181793212890625
c5 = 0.999982357025146484375

当应用于多项式时,它们计算 sin(x) 比标准版本更快,但仅限于有限的范围和有限的精度。

sinFast 函数中 Chebyshev 多项式的实现反映了 VectorizedKernel 中其他方法中的计算模式

  • 以波(SIMD 元素一次,在简单循环中)的形式进行计算,就像 CUDA/OpenCL 一样
  • 让每个波只执行一两个基本操作,如加法或乘法
  • 使用对齐的缓冲区(64 对齐以支持 AVX512)
  • 内联函数
// only optimized for [-1,1] input range!!
VECTORIZED_KERNEL_METHOD
void sinFast(KernelData<Type,Simd> & result) const noexcept
{
            alignas(64)
            Type xSqr[Simd];

            alignas(64)
            Type xSqrSqr[Simd];

            alignas(64)
            Type xSqrSqr5[Simd];

            alignas(64)
            Type xSqrSqr8[Simd];

            alignas(64)
            Type tmp[Simd];

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqr[i] =   data[i]*data[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqrSqr[i] =    xSqr[i]*xSqr[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqrSqr5[i] =   xSqrSqr[i]*data[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqrSqr8[i] =   xSqrSqr[i]*xSqrSqr[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqrSqr8[i] =   xSqrSqr8[i]*data[i] ;
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                tmp[i] =   xSqrSqr5[i]*xSqr[i] ;
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqr[i] =   xSqr[i]*data[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                result.data[i]  =   Type(-0.0005664825439453125)*xSqrSqr8[i] +
                                    Type(0.001037120819091796875)*tmp[i] +
                                    Type(0.007439136505126953125)*xSqrSqr5[i] +
                                    Type(-0.166426181793212890625)*xSqr[i] +
                                    Type(0.999982357025146484375)*data[i];
            }
}

这种类型的计算通常非常适合自动向量化。当循环简单且数据是纯旧数据 (POD) 时,编译器更有可能对算法进行向量化。即使在包含链式乘法和加法的最终循环中,GCC 也有效地使用了 FMA AVX512 指令。

即使在全局优化(遗传算法)中仅使用 10000 个数据点在 -11 之间,广义解也使得在 1 亿个点(在 -11 之间)的误差测试中,平均误差为 5.2e-07,最大误差为 5.06e-06。

精度较低时,应该会获得性能提升。在 FX8150 3.6 GHz 单核执行和下面的基准测试程序中,

#include "VectorizedKernel.h"
#include <vector>
int main()
{
    constexpr int n = 100000000;
    std::vector<float> a(n),b(n);
    auto kernel = Vectorization::createKernel<8>(
        [&](auto & factory, auto & idThread,float * test, float * test2)
        {
            auto val = factory.template generate<float>();
            val.readFrom(test,idThread);
            val.sinFast(val);
            val.writeTo(test2,idThread);
        },
        Vectorization::KernelArgs<float *,float *>{});

    auto kernelSlow = Vectorization::createKernel<8>(
        [&](auto & factory, auto & idThread,float * test, float * test2)
        {
            auto val = factory.template generate<float>();
            val.readFrom(test,idThread);
            val.sin(val);
            val.writeTo(test2,idThread);
        },
        Vectorization::KernelArgs<float *,float *>{});

    for(int i=0;i<n;i++)
    {
        a[i]=i/(float)n;
    }

    size_t t;
    for(int k=0;k<10;k++)
    {
        {
            Vectorization::Bench bench(&t);
            kernel.run(n,a.data(),b.data());
            
        }
        std::cout<<t<<" nanoseconds kernel"<<std::endl;
    }
    float maxError = 0;
    float minError = 100000000;
    float avgError = 0;
    for(int i=0;i<n;i++)
    {
        auto err = std::abs(std::sin(a[i]) - b[i]);
        if(maxError<err)
            maxError=err;
        if(minError>err)
            minError=err;
        avgError += err;
        if(i<10)
        {
            std::cout<<std::sin(a[i]) << "   "<<b[i]<<" "<<std::sin(a[i]) - 
                       b[i]<<" "<<(std::sin(a[i]) - b[i])/std::sin(a[i])<<std::endl;
        }
    }
    avgError/=n;
    std::cout<<"Approximation:"<<std::endl;
    std::cout<<"average error: "<<avgError<<std::endl;
    std::cout<<"max error: "<<maxError<<std::endl;
    std::cout<<"min error: "<<minError<<std::endl;

    for(int k=0;k<10;k++)
    {
        {
            Vectorization::Bench bench(&t);
            kernelSlow.run(n,a.data(),b.data());
            
        }
        std::cout<<t<<" nanoseconds kernelSlow"<<std::endl;
    }
    maxError = 0;
    minError = 100000000;
    avgError = 0;
    for(int i=0;i<n;i++)
    {
        auto err = std::abs(std::sin(a[i]) - b[i]);
        if(maxError<err)
            maxError=err;
        if(minError>err)
            minError=err;
        avgError += err;
    }
    avgError/=n;
    std::cout<<"std::sin:"<<std::endl;
    std::cout<<"average error: "<<avgError<<std::endl;
    std::cout<<"max error: "<<maxError<<std::endl;
    std::cout<<"min error: "<<minError<<std::endl;
    return 0;
}

输出如下

210320327 nanoseconds kernel
207193156 nanoseconds kernel
209724002 nanoseconds kernel
208031048 nanoseconds kernel
208375082 nanoseconds kernel
212297393 nanoseconds kernel
269920925 nanoseconds kernel
247177762 nanoseconds kernel
226875523 nanoseconds kernel
215999626 nanoseconds kernel
0   0 0 -nan
1e-08   9.99982e-09 1.76748e-13 1.76748e-05
2e-08   1.99996e-08 3.53495e-13 1.76748e-05
3e-08   2.99995e-08 5.29354e-13 1.76451e-05
4e-08   3.99993e-08 7.0699e-13 1.76748e-05
5e-08   4.99991e-08 8.81073e-13 1.76215e-05
6e-08   5.99989e-08 1.05871e-12 1.76451e-05
7e-08   6.99988e-08 1.23634e-12 1.76621e-05
8e-08   7.99986e-08 1.41398e-12 1.76748e-05
9e-08   8.99984e-08 1.58451e-12 1.76057e-05
Approximation:
average error: 5.19494e-07
max error: 5.06639e-06
min error: 0
1514166472 nanoseconds kernelSlow
1572569295 nanoseconds kernelSlow
1543835922 nanoseconds kernelSlow
1549091614 nanoseconds kernelSlow
1487053516 nanoseconds kernelSlow
1490235113 nanoseconds kernelSlow
1500433847 nanoseconds kernelSlow
1495819895 nanoseconds kernelSlow
1486465391 nanoseconds kernelSlow
1492841472 nanoseconds kernelSlow
std::sin:
average error: 0
max error: 0
min error: 0 

std::sin 函数运行了 1 亿次,平均花费 1.5 秒。sinFast 函数进行了相同数量的计算,平均仅花费 210 毫秒。这几乎是 7 倍的速度提升。许多可以在较窄的输入范围和较低精度下工作的程序可以从这种优化中受益。

在 AMD EPYC 7R32 和 AVX2(simd=8)上,使用 1000 万次迭代的相同程序(由于 godbolt.org 服务器限制)产生以下结果

4684267 nanoseconds kernel
4724790 nanoseconds kernel
4796075 nanoseconds kernel
4798874 nanoseconds kernel
4284829 nanoseconds kernel
4936115 nanoseconds kernel
4699258 nanoseconds kernel
4255476 nanoseconds kernel
4283909 nanoseconds kernel
4253615 nanoseconds kernel
0   0 0 -nan
1e-07   9.99982e-08 1.76215e-12 1.76215e-05
2e-07   1.99996e-07 3.52429e-12 1.76215e-05
3e-07   2.99995e-07 5.28644e-12 1.76215e-05
4e-07   3.99993e-07 7.04858e-12 1.76215e-05
5e-07   4.99991e-07 8.81073e-12 1.76215e-05
6e-07   5.99989e-07 1.05729e-11 1.76215e-05
7e-07   6.99988e-07 1.2335e-11 1.76215e-05
8e-07   7.99986e-07 1.40972e-11 1.76215e-05
9e-07   8.99984e-07 1.58593e-11 1.76215e-05
Approximation:
average error: 1.25595e-06
max error: 5.06639e-06
min error: 0
29602815 nanoseconds kernelSlow
29983970 nanoseconds kernelSlow
31948362 nanoseconds kernelSlow
29545788 nanoseconds kernelSlow
31207689 nanoseconds kernelSlow
29330923 nanoseconds kernelSlow
29236516 nanoseconds kernelSlow
29234575 nanoseconds kernelSlow
39186776 nanoseconds kernelSlow
54037120 nanoseconds kernelSlow
std::sin:
average error: 0
max error: 0
min error: 0 

sinFast 函数 1000 万次迭代需要 4 毫秒,std::sin 1000 万次迭代需要 29 毫秒。这是 7.5 倍的速度提升。

同一服务器的 AVX512 实例可以获得更高的加速,但内核中唯一的工作是计算单个 sin(x) 函数,并且它受限于 factory.generate 分配和内核启动延迟。

为了更好地对 AVX512 等更快的架构进行基准测试,每个工作项进行了 3000 次 sin(x) 计算

#include <cpuid.h>
#include <vector>
int main()
{
    constexpr int simd=16;
    constexpr int n = 1000*simd;

    std::vector<float> a(n),b(n);
    auto kernel = Vectorization::createKernel<simd>(
        [&](auto & factory, auto & idThread,float * test, float * test2)
        {
            auto val = factory.template generate<float>();
            
            for(int i = 0; i<n; i+=simd)
            {
                val.readFrom(test+i,idThread);
                val.sinFast(val);
                val.sinFast(val);
                val.sinFast(val);
                val.writeTo(test2+i,idThread);
            }               
        },
        Vectorization::KernelArgs<float *,float *>{});

    auto kernelSlow = Vectorization::createKernel<simd>(
        [&](auto & factory, auto & idThread,float * test, float * test2)
        {
            auto val = factory.template generate<float>();
            
            for(int i = 0; i<n; i+=simd)
            {
                val.readFrom(test+i,idThread);
                val.sin(val);
                val.sin(val);
                val.sin(val);
                val.writeTo(test2+i,idThread);
            }
        },
        Vectorization::KernelArgs<float *,float *>{});

    for(int i=0;i<n;i++)
    {
        a[i]=i/(float)n;
    }

    size_t t;
    for(int k=0;k<10;k++)
    {
        {
            Vectorization::Bench bench(&t);
            kernel.run(simd,a.data(),b.data());
        }
        std::cout<<t<<" nanoseconds kernel"<<std::endl;
    }
    float maxError = 0;
    float minError = 100000000;
    float avgError = 0;
    for(int i=0;i<n;i++)
    {
        auto err = std::abs(std::sin(std::sin(std::sin(a[i]))) - b[i]);
        if(maxError<err)
            maxError=err;
        if(minError>err)
            minError=err;
        avgError += err;
        if(i<10)
        {
            std::cout<<std::sin(std::sin(std::sin(a[i]))) << "   "<<b[i]<<" 
            "<<std::sin(a[i]) - b[i]<<" "<<(std::sin(std::sin(std::sin(a[i]))) - 
             b[i])/std::sin(std::sin(std::sin(a[i])))<<std::endl;
        }
    }
    avgError/=n;
    std::cout<<"Approximation:"<<std::endl;
    std::cout<<"average error: "<<avgError<<std::endl;
    std::cout<<"max error: "<<maxError<<std::endl;
    std::cout<<"min error: "<<minError<<std::endl;

    for(int k=0;k<10;k++)
    {
        {
            Vectorization::Bench bench(&t);
            kernelSlow.run(simd,a.data(),b.data());

        }
        std::cout<<t<<" nanoseconds kernelSlow"<<std::endl;
    }
    maxError = 0;
    minError = 100000000;
    avgError = 0;
    for(int i=0;i<n;i++)
    {
        auto err = std::abs(std::sin(std::sin(std::sin(a[i]))) - b[i]);
        if(maxError<err)
            maxError=err;
        if(minError>err)
            minError=err;
        avgError += err;

    }
    avgError/=n;
    std::cout<<"std::sin:"<<std::endl;
    std::cout<<"average error: "<<avgError<<std::endl;
    std::cout<<"max error: "<<maxError<<std::endl;
    std::cout<<"min error: "<<minError<<std::endl;

char CPUBrandString[0x40];
unsigned int CPUInfo[4] = {0,0,0,0};

__cpuid(0x80000000, CPUInfo[0], CPUInfo[1], CPUInfo[2], CPUInfo[3]);
unsigned int nExIds = CPUInfo[0];

memset(CPUBrandString, 0, sizeof(CPUBrandString));

for (unsigned int i = 0x80000000; i <= nExIds; ++i)
{
    __cpuid(i, CPUInfo[0], CPUInfo[1], CPUInfo[2], CPUInfo[3]);

    if (i == 0x80000002)
        memcpy(CPUBrandString, CPUInfo, sizeof(CPUInfo));
    else if (i == 0x80000003)
        memcpy(CPUBrandString + 16, CPUInfo, sizeof(CPUInfo));
    else if (i == 0x80000004)
        memcpy(CPUBrandString + 32, CPUInfo, sizeof(CPUInfo));
}

std::cout << "CPU Type: " << CPUBrandString << std::endl;
    return 0;
}

godbolt.org 服务器上的 Xeon(R) Platinum 8375C CPU @ 2.90GHz 实例的输出

24426 nanoseconds kernel
26922 nanoseconds kernel
26189 nanoseconds kernel
26711 nanoseconds kernel
21866 nanoseconds kernel
12891 nanoseconds kernel
12987 nanoseconds kernel
13017 nanoseconds kernel
13011 nanoseconds kernel
12753 nanoseconds kernel
0   0 0 -nan
6.25e-05   6.24967e-05 3.31784e-09 5.30854e-05
0.000125   0.000124993 6.63567e-09 5.30854e-05
0.0001875   0.00018749 9.90985e-09 5.28526e-05
0.00025   0.000249987 1.32713e-08 5.30854e-05
0.0003125   0.000312483 1.65892e-08 5.30854e-05
0.000375   0.00037498 1.9907e-08 5.30854e-05
0.0004375   0.000437477 2.32249e-08 5.30854e-05
0.0005   0.000499973 2.65427e-08 5.30854e-05
0.0005625   0.00056247 2.98023e-08 5.2775e-05
Approximation:
average error: 2.92283e-06
max error: 5.76675e-06
min error: 0
157713 nanoseconds kernelSlow
153495 nanoseconds kernelSlow
152426 nanoseconds kernelSlow
149269 nanoseconds kernelSlow
125595 nanoseconds kernelSlow
126358 nanoseconds kernelSlow
126901 nanoseconds kernelSlow
128745 nanoseconds kernelSlow
125790 nanoseconds kernelSlow
126077 nanoseconds kernelSlow
std::sin:
average error: 0
max error: 0
min error: 0
CPU Type: Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz

Xeon 8275CL 的另一个实例

34331 nanoseconds kernel
19040 nanoseconds kernel
19582 nanoseconds kernel
23158 nanoseconds kernel
19634 nanoseconds kernel
19505 nanoseconds kernel
18382 nanoseconds kernel
19700 nanoseconds kernel
19435 nanoseconds kernel
19831 nanoseconds kernel
0   0 0 -nan
6.25e-05   6.24967e-05 3.31784e-09 5.30854e-05
0.000125   0.000124993 6.63567e-09 5.30854e-05
0.0001875   0.00018749 9.90985e-09 5.28526e-05
0.00025   0.000249987 1.32713e-08 5.30854e-05
0.0003125   0.000312483 1.65892e-08 5.30854e-05
0.000375   0.00037498 1.9907e-08 5.30854e-05
0.0004375   0.000437477 2.32249e-08 5.30854e-05
0.0005   0.000499973 2.65427e-08 5.30854e-05
0.0005625   0.00056247 2.98023e-08 5.2775e-05
Approximation:
average error: 2.92283e-06
max error: 5.76675e-06
min error: 0
256440 nanoseconds kernelSlow
202568 nanoseconds kernelSlow
209147 nanoseconds kernelSlow
203952 nanoseconds kernelSlow
208616 nanoseconds kernelSlow
247332 nanoseconds kernelSlow
210957 nanoseconds kernelSlow
200276 nanoseconds kernelSlow
206405 nanoseconds kernelSlow
201319 nanoseconds kernelSlow
std::sin:
average error: 0
max error: 0
min error: 0
CPU Type: Intel(R) Xeon(R) Platinum 8275CL CPU @ 3.00GHz

同样,EPYC 启用了 AVX2

22160 nanoseconds kernel
22440 nanoseconds kernel
41861 nanoseconds kernel
24450 nanoseconds kernel
22031 nanoseconds kernel
21640 nanoseconds kernel
21621 nanoseconds kernel
21720 nanoseconds kernel
21700 nanoseconds kernel
21701 nanoseconds kernel
0   0 0 -nan
6.25e-05   6.24967e-05 3.31784e-09 5.30854e-05
0.000125   0.000124993 6.63567e-09 5.30854e-05
0.0001875   0.00018749 9.90985e-09 5.28526e-05
0.00025   0.000249987 1.32713e-08 5.30854e-05
0.0003125   0.000312483 1.65892e-08 5.30854e-05
0.000375   0.00037498 1.9907e-08 5.30854e-05
0.0004375   0.000437477 2.32249e-08 5.30854e-05
0.0005   0.000499973 2.65427e-08 5.30854e-05
0.0005625   0.00056247 2.98023e-08 5.2775e-05
Approximation:
average error: 2.92283e-06
max error: 5.76675e-06
min error: 0
194784 nanoseconds kernelSlow
216814 nanoseconds kernelSlow
200474 nanoseconds kernelSlow
197884 nanoseconds kernelSlow
202814 nanoseconds kernelSlow
191923 nanoseconds kernelSlow
199453 nanoseconds kernelSlow
207143 nanoseconds kernelSlow
187853 nanoseconds kernelSlow
185784 nanoseconds kernelSlow
std::sin:
average error: 0
max error: 0
min error: 0
CPU Type: AMD EPYC 7R32

根据服务器负载,AVX512 实例达到了 10x 到 15x 的加速,而 AVX2 实例则更稳定地达到了 8x-9x 的加速。(请注意,相对误差仍为 5e-05 量级,尽管使用了 sin(sin(sin))) 链式计算)

Cos 和 Exp 的实现类似,唯一的区别在于所需的 FP32 乘法次数。

如果需要更大的范围,这里是 sin(x) 的 [-any,any] 优化版本,称为 x.sinFastFullRange(result)

        // Chebyshev Polynomial coefficients found by genetic algorithm 
        // running on 768 GPU pipelines for 5 minutes
        VECTORIZED_KERNEL_METHOD
        void sinFastFullRange(KernelData<Type,Simd> & result) const noexcept
        {
            // reduce range to [-pi,+pi] by modf(input, 2pi) - pi { at high precision }
            // divide by 5 (multiply 0.2)
            // compute on [-1,+1] range
            // compute T5(cos(x)) chebyshev (   16sin(x)^5 - 20sin(x)^3 + 5sin(x)   )
            // return

            alignas(64)
            double wrapAroundHighPrecision[Simd];

            alignas(64)
            double wrapAroundHighPrecisionTmp[Simd];

            alignas(64)
            double reducedData[Simd];

            alignas(64)
            double reducedDataTmp[Simd];

            alignas(64)
            Type xSqr[Simd];

            alignas(64)
            Type x[Simd];

            alignas(64)
            Type resultData[Simd];

            // these have to be as high precision as possible to 
            // let wide-range of inputs be used
            constexpr double pi =  /*Type(std::acos(-1));*/ 
            double(3.1415926535897932384626433832795028841971693993751058209749445923);

            constexpr double twoPi = double(2.0 * pi);

            constexpr double twoPiInv = double(1.0/twoPi);

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                wrapAroundHighPrecision[i] = data[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                wrapAroundHighPrecisionTmp[i] = wrapAroundHighPrecision[i] * twoPiInv;
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                wrapAroundHighPrecisionTmp[i] = std::floor(wrapAroundHighPrecisionTmp[i]);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                wrapAroundHighPrecisionTmp[i] = twoPi*wrapAroundHighPrecisionTmp[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                reducedData[i] = wrapAroundHighPrecision[i] - 
                                 wrapAroundHighPrecisionTmp[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                reducedDataTmp[i] = reducedData[i]-twoPi;
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                reducedData[i]=reducedData[i]<double(0.0)?
                               reducedDataTmp[i]:reducedData[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                reducedData[i] = reducedData[i] - pi;
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                reducedData[i] = reducedData[i]*double(0.2);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                x[i] =     reducedData[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqr[i] =     x[i]*x[i];
            }


            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =    Type(2.543240487540288086165674e-06)*xSqr[i] + 
                                   Type(-0.0001980781510937390521576162);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =    resultData[i]*xSqr[i] + 
                                   Type(0.008333159571549231259268709);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =    resultData[i]*xSqr[i] + 
                                   Type(-0.1666666483147380972695828);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =    resultData[i]*xSqr[i] + 
                                   Type(0.9999999963401755564973428);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =     resultData[i]*x[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                x[i] =     resultData[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                xSqr[i] =     resultData[i]*resultData[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =     Type(16.0)*xSqr[i] - Type(20.0);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =     resultData[i]*xSqr[i] + Type(5.0);
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                resultData[i] =     resultData[i]*x[i];
            }

            VECTORIZED_KERNEL_LOOP
            for(int i=0;i<Simd;i++)
            {
                result.data[i] =     -resultData[i];
            }
        }

伪代码如下

  • 将输入范围从 [-any,any] 减小到 [-pi,pi],精度高于 32 位(此处仅为 64 位)
  • 将新范围除以 5(这个数字也是稍后要使用的 Chebyshev 多项式的索引)
  • 使用 [-1,1] 输入进行近似
  • 通过在结果上应用 Chebyshev 多项式(L5(approx))来将范围减小回 [-pi,pi]

近似部分通过遗传算法优化,仅产生最大 1 ULPS 误差(平均 0.19 ulps)用于正弦,以及最大 1 ULPS 用于余弦(平均 0.08 ulps)。由于额外的浮点运算,此误差在范围缩减和范围去缩减过程中会增加。通常,科学程序使用超过 1000 位的 PI 来计算缩减,但此程序经过速度优化,因此存在权衡。sinFastFulRangecosFastFullRange 的最大误差在小数点后 0.0000001 位,当结果接近零时,这使得 ULP 差异很大。当需要精确数字时,可以使用 x.sin(result) 和 x.cos(result),但性能会降低。

sinFastcosFast 的最终版本中,计算通过Horner Scheme进行优化。Horner Scheme 提高了多项式计算的准确性和性能。未优化的多项式计算如下所示

y = c1 * x*x*x*x + c2 * x*x*x + c3 * x*x + c4*x + c5;

但这有太多不必要的 x 的幂运算,并且每个不必要的浮点运算都会增加舍入误差,导致在足够多次迭代后对结果产生雪崩效应。因此,使用过多的多项式项会使其速度过慢且精度较低。

Horner Scheme 优化运算如下

y = ((((c1*x + c2) * x + c3 ) * x + c4 ) * x + c5);

或者换句话说

y = c1*x + c2;
y = y *x + c3;
y = y *x + c4;
y = y *x + c5;

这不仅减少了乘法次数,而且乘法和加法次数也相等。这非常适合编译器应用 CPU 的 FMA 指令一次计算两项(乘法 + 加法),如下所示

y = fma(c1,x,c2)
y = fma(y, x,c3)
y = fma(y, x,c4)
y = fma(y, x,c5)

仅 4 条指令(加上 CPU 自身的架构“额外”临时精度)即可实现更短的运行时间和高精度。一些 CPU 在 FMA 操作中的精度高于其他 CPU,而开发计算机的 CPU(FX8150)的精度低于许多最新的 CPU,因此在很大范围的 CPU 上可以保持 1 ULPS 的差异,并且在新 CPU 上平均 ULPS 差异较低,尤其是在 Xeons 和 EPYCs 上。

为了保持简单性和高得多的性能,范围缩减(从任何值到 [-pi,pi])操作限制在 64 位,这是 AVX2/AVX512 CPU 在浮点运算上支持的最高精度。

应用 Horner Scheme 后,cosFast 在 AVX512 上的性能提高到每个 cos 0.54 个周期。这接近单核上的 50GFLOPS FMA 操作(每个 FMA 计算为 2 个浮点运算),并受限于内存访问延迟/带宽。对于 3GHz CPU,0.54 个周期意味着每个余弦计算 0.18 纳秒,或每秒 55 亿个余弦。由于 (cos/Sin)FastFullRange 具有额外的范围缩减和去缩减步骤,精度更高,因此其速度提升仅限于 std::cosstd::sin 的 3-4 倍。当输入范围本身就是 [-1,1] 时,仅使用 cosFastsinFast 比使用 std::cosstd::sin 快至少 10 倍,因为它们更通用,并且可以处理所有边界情况。

为了使编译器更容易进行自动向量化,每个基本操作都在其自己的 Simd 长度循环上计算,就像 KernelData 中的其他所有方法一样。这会降低可读性和可维护性,但可以通过 CPU 的向量硬件实现硬件可移植的速度提升。

Chebyshev 多项式

在范围去缩减部分,使用了 Chebyshev 多项式,因为它只是另一个多项式,可以将 cos(n * x) 减少到 F(cost(x)),并且易于实现

cos(4 * x) = L4(cos(x)) = 8 * cos(x)^4 - 8 * cos(x)^2 + 1 {^ is power, not xor}

这和看起来一样简单。对于正弦计算,使用 L5,并将输入除以 5 进行缩减

sin(5 * x) = L5(sin(x)) = 16 * sin(x)^5 - 20 * sin(x)^3 + 5 * cos(x)

然后用户输入被解释为

sin(Y) = sin(5 * x) => solve on [-1,1] instead of [-pi,pi]

尽管有两个多项式和一个范围缩减计算,误差仍然在小数点后 0.0000001 位,因此对于像 0.01 这样较大的结果来说,它并不算太错误,但当结果包含 0.00001 时,它肯定不是科学级的。当程序将小于 0.0001 的任何值解释为零时,使用近似算法就相对可以接受了。同样,当大于 0.99999 的结果可以解释为 1 时,误差不是太大,近似仍然有效。

如果现代 x86 CPU 没有良好的平方根支持,sqrt 可以这样优化,或者像 John Carmack 的 Q3_inv_sqrt 那样。

当 CPU 没有良好的浮点能力时,使用 CORDIC 方法。CORDIC 计算相反数以获得输入参数,并在每次失败时调整初始猜测并重复直到收敛。这仅通过位移和加法即可完成。但在现代 x86 CPU 上,浮点数学速度很快。所以简单使用 fp mul/add 更容易。此工具不实现 CORDIC,并且适用于具有良好浮点性能的 AVX/AVX512/../AVX8192 架构。

关注点

在 SIMD 单元上运行的垂直并行化操作即使在朴素应用算法时也很快,即使 CPU 中没有足够的寄存器重命名空间来容纳所有数据集。但没有宏定义的工作会对编译器生成的指令效率产生影响。如果没有宏定义,内核中的变量-变量交互实际上会产生深度数据复制,并且比完全作为宏定义工作的代码要慢。

使用朴素的数组包装器(例如,https://github.com/tugrul512bit/VectorizedKernel 中的简单 KernelData 实现)就像使用 OpenCL 内核变量一样,并模拟 GPU 风格的工作流程,工作起来很有趣,应用起来很快,并且与朴素的 GPU 目标内核(未达到宣传的峰值 GFLOPs 值的 100%)一样高效,代价是冗长。编码部分比 std::valarray 更笨拙,但比将其转换为 std::valarray 更快。易于开始输入,学习速度慢,类似于 CUDA,无需任何额外硬件,类似于 OpenCL,完全独立于任何平台。

仅通过简单循环和简单数组就能有效地向量化,但一旦算法变得过于复杂,封装(这是 C++ 的方式,而不是在堆栈上使用 100% 的宏定义和裸向量)就会显现出其对编译器生成的 CPU 指令效率的影响。这些通常是不必要的 mov 指令,并且可能会在未来得到改善,这个项目就依赖于此。如果有一天 AVX-8192(或某个无界向量处理器)在台式机上发布,那么这个项目将只需要进行如下更改

int simd = a higher power-of-2 value;

历史

  • 2022 年 4 月 24 日星期日:添加了几个基本算法在 SIMD 上运行的示例和基准测试
  • 2022 年 4 月 25 日星期一:添加了基本多线程(并修复了一些错别字)
  • 2022 年 4 月 29 日星期五:添加了负载均衡多线程、Quake-3 逆平方根基准测试和不同编译器的比较
  • 2022 年 5 月 9 日星期一:添加了 cosFastcostFastFullRangesinFastsinFastFullRange 以及 Chebyshev 多项式和 Horner Scheme 的解释
在 CPU 上模拟 CUDA/OpenCL,通过自动向量化和与串行标量代码相比的速度提升 - CodeProject - 代码之家
© . All rights reserved.