65.9K
CodeProject 正在变化。 阅读更多。
Home

在 C++ 中对数组结构 (SOA) 的元素进行排序和删除

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2024 年 4 月 1 日

CPOL

5分钟阅读

viewsIcon

5401

C++ 的迭代器和算法在容器上工作得很好,但我们能对数组结构 (SOA) 进行排序吗?

引言

假设我们存储 10 个点的坐标作为一个 `soa[30]` 浮点数组。前十个元素是 x 坐标,接下来的十个元素是 y 坐标,最后十个元素是 z 坐标。这种存储数据的方式通常被称为数组结构 (SOA)。我们想 (1) 移除 z 坐标为零的元素,然后 (2) 按 y 坐标对剩余数据进行排序。当然,我们可以重新排列数据,使 x, y, z 值按组存储,例如

std::vector<std::array<float,3>> v;

然后就可以用常规方法处理该向量。另一个解决方案是创建一个单独的索引序列,形式如下

std::vector<int> v(10);           // array of indices
std::iota(v.begin(), v.end(), 0); // writes a sequence 0,1,2,etc.

可以通过索引数组 `v` 间接访问原始数据。排序操作的结果是一个新的序列,可以用来重新排列元素。

这两种方法的问题在于原始数据的重复,当排序 10 个点时,这不算什么大事。不幸的是,SOA 格式经常出现在 GPU 计算中,在这种格式下,它对内存吞吐量来说是最优的。当人们使用 GPU 处理最优存储的数据时,他们通常拥有的点数远不止 10 个(可能每个 GPU 设备有 ~80 GB 数据,共有八个 GPU 设备)。

由于我们是在主机端 (CPU) 处理数据,因此在将数据从 GPU 复制到主机时进行重新排列就足够了。不幸的是,这样的操作效率会极低,所以我们又回到了最初的问题。

显然,C++ 库中的排序、分区和其他算法都可以应用于 SOA。我们不需要移动连续的数据块,而是需要单独移动各个部分,例如,一次移动 x, y, z 坐标。但我们不想重写这些函数。如果能重用现有的 C++ 库,情况会更方便。

背景

自定义迭代器用于定义用户自定义数据结构的自定义遍历行为。我们可以创建一个自定义迭代器来遍历我们的 `soa[30]` 数组,但这有什么帮助呢?数组是最简单的数据结构——我们可以直接使用指针而不是自定义迭代器来遍历它。

调用标准算法的问题不在于遍历,而在于读取和写入元素。例如,`std::sort()` 函数广泛使用 `std::iter_swap(iterator a, iterator b)` 来交换数据。迭代器不存储数据——它们可以提供一个对象的引用,该对象可以被读取、复制或覆盖。但是我们如何获得一个像 SOA 这样非连续对象的引用呢?我们不能。

代理对象

代理对象伪装成连续的数据块,但实际上不是。更准确地说,它有时会保存数据的副本,有时会访问 SOA 以提取数据。当我们只需要读取数据时,例如,第五个元素的 z 坐标,只需访问 `soa[5*pitch+2]` 即可获得。然而,像 `std::swap()` 这样的函数也想复制对象。当调用复制构造函数时,新对象也会完全复制数据。幸运的是,`std::sort()` 和 `std::remove_if()` 等函数只会创建少量这样的对象。

示例代码

代理类

下面的代码是概念验证,不是“库”。它包含了 `spdlog` 库中的打印函数 `spdlog::info()`,用于逐步跟踪执行。代理对象 `Point` 包含一个布尔变量 `isReference`,用于跟踪是应该从 `*soa` 的索引 `pos` 处提取数据,还是数据来自本地存储 `data[nArrays]`。SOA 中的数组数量是 `nArrays`,`soa` 的总大小是 `nArrays*pitch`,其中 `pitch` 是可以存储的最大元素数量。`pitch` 的值不会改变。代理类定义了一个常规构造函数 (`isReference` 设置为 `true`),复制构造函数 (`isReference` 设置为 `false`),以及通过 `x()`、`y()`、`z()` 函数访问数据的方法。它重载了 `operator=`,以便在 `isReference` 设置时写入 `*soa`。

struct Point
{
    constexpr static unsigned nArrays = 3;  // count of arrays in SOA
    bool isReference = false;
    unsigned pos, pitch;    // element # and capacity of each array in SOA
    float *soa;             // reference to SOA (assume contiguous space of size nArrays*pitch)

    float data[nArrays];    // local copy of the data when isReference==true

    Point()
    {
        isReference = false;
        spdlog::info("Point() constructor");
    }

    Point(const Point &other)
    {
        isReference = false;
        // local copy
        if(other.isReference)
        {
            for(int i=0;i<nArrays;i++) data[i] = other.soa[other.pos + i*other.pitch];
            spdlog::info("Point(other) constructor ({},{}); from ref {}; ({},{})",
                         x(),y(), other.pos, other.x(), other.y());
        }
        else
        {
            for(int i=0;i<nArrays;i++) data[i] = other.data[i];
            //            spdlog::info("vp {} ({},{}) = vp ({},{})", x(), y(), other.x(), other.y());
            spdlog::info("Point(other) constructor ({},{}); from val: ({},{})",
                         x(),y(), other.x(), other.y());
        }

    }

    // x(), y(), z() are for testing/tracing only
    float x() const
    {
        if(isReference) return soa[pos];
        else return data[0];
    }
    float y() const
    {
        if(isReference) return soa[pos+pitch];
        else return data[1];
    }
    float z() const
    {
        if(isReference) return soa[pos+2*pitch];
        else return data[2];
    }

    // user-defined copy assignment (copy-and-swap idiom)
    Point& operator=(Point other)
    {
        if(isReference)
        {
            // distribute into soa
            if(other.isReference)
            {
                for(int i=0;i<nArrays;i++) soa[pos + i*pitch] = other.soa[other.pos + i*other.pitch];
                spdlog::info("rp {} ({},{}) = rp {} ({},{})", pos, x(), y(), other.pos, other.x(), other.y());
            }
            else
            {
                for(int i=0;i<nArrays;i++) soa[pos + i*pitch] = other.data[i];
                spdlog::info("rp {} ({},{}) = vp ({},{})", pos, x(), y(), other.x(), other.y());
            }
        }
        else
        {
            // local copy
            if(other.isReference)
            {
                for(int i=0;i<nArrays;i++) data[i] = other.soa[other.pos + i*other.pitch];
                spdlog::info("vp ({},{}) = rp {} ({},{})", x(), y(), other.pos, other.x(), other.y());
            }
            else
            {
                for(int i=0;i<nArrays;i++) data[i] = other.data[i];
                spdlog::info("vp ({},{}) = vp ({},{})", x(), y(), other.x(), other.y());
            }
        }
        return *this;
    }
};

容器

容器的作用是存储实际的 SOA,跟踪元素数量 `size`,这个数量可能小于其容量 (`pitch`)。容器还提供了自定义迭代器 `begin()` 和 `end()`。

class MyContainer
{
public:
    std::vector<float> data;
    unsigned pitch, size;

    SOAIterator begin(){return SOAIterator(0, data.data(), pitch);}
    SOAIterator end(){return SOAIterator(size, data.data(), pitch);}

    void print_contents()
    {
        for(int i=0;i<size;i++) std::cout << "(" << data[i] << ',' << data[pitch+i] << ',' << data[2*pitch+i] << ") ";
        std::cout << '\n';
    }
};

迭代器

自定义迭代器通常包含大量样板代码。在这里,我们尽量将功能保持在最低限度。迭代器存储 `Point` 对象的一个副本,该对象的 `isReference` 设置为 `true`。这是代理对象,迭代器在通过 `reference operator*()` 解引用时返回它。`Point` 已经跟踪了如何访问 SOA,因此移动迭代器意味着修改 `Point` 的 `pos` 索引。`SOAIterator` 的完整代码如下。

class SOAIterator
{
public:

    using iterator_category = std::random_access_iterator_tag;
    using value_type = Point;
    using difference_type = int;
    using pointer = Point*;
    using reference = Point&;

    SOAIterator(unsigned pos, float *soa_data, unsigned pitch)
    {
        m_point.isReference = true;
        m_point.pos = pos;
        m_point.soa = soa_data;
        m_point.pitch = pitch;
        spdlog::info("SOAIterator() {}", pos);
    }

    SOAIterator(const SOAIterator& other)
    {
        m_point.isReference = true;
        m_point.pos = other.m_point.pos;
        m_point.soa = other.m_point.soa;
        m_point.pitch = other.m_point.pitch;
        spdlog::info("SOAIterator from other {}", other.m_point.pos);
    }

    
    SOAIterator& operator=(const SOAIterator& other)
    {
        m_point.isReference = other.m_point.isReference;
        m_point.pos = other.m_point.pos;
        m_point.soa = other.m_point.soa;
        m_point.pitch = other.m_point.pitch;
        return *this;
    }

    SOAIterator(){} 
    ~SOAIterator(){} 

    bool operator<(const SOAIterator& t) const {return m_point.pos<t.m_point.pos;}
    bool operator==(const SOAIterator& rawIterator)const{return (m_point.pos == rawIterator.m_point.pos);}
    bool operator!=(const SOAIterator& rawIterator)const{return (m_point.pos != rawIterator.m_point.pos);}

    SOAIterator&                  operator++()
    {
        spdlog::info("operator ++; {}", this->m_point.pos);
        ++m_point.pos;
        return (*this);
    }
    SOAIterator&                  operator--()
    {
        spdlog::info("operator --; {}", this->m_point.pos);
        --m_point.pos;
        return (*this);
    }

    SOAIterator                   operator+(const difference_type& movement)
    {
        spdlog::info("operator +; {}", this->m_point.pos);
        SOAIterator result = *this;
        result.m_point.pos += movement;
        return result;
    }

    SOAIterator                   operator-(const difference_type& movement)
    {
        spdlog::info("operator -; {}", this->m_point.pos);
        SOAIterator result = *this;
        result.m_point.pos -= movement;
        return result;
    }

    difference_type operator-(const SOAIterator& rawIterator){return m_point.pos-rawIterator.m_point.pos;}

    reference operator*()
    {
        spdlog::info("dereference operator on iterator {}", m_point.pos);
        return m_point;
    }

    Point m_point;
};

工作示例

为了测试该方法,我们用 10 个 x 值、10 个 y 值和 10 个 z 值填充包含 30 个浮点数的容器

[2024-03-31 19:16:43.205] [info] INITIAL DATA
(0,20.1,0) (1,19.1,1) (2,18.1,1) (3,17.1,0) (4,16.1,1) (5,15.1,1) (6,14.1,0) (7,13.1,1) (8,12.1,1) (9,11.1,0)

我们调用 `std::remove_if()` 来移除 z 值为零的元素。操作完成后,自定义迭代器现在指向数据末尾,因此我们修剪容器 `mc` 的大小。

SOAIterator it_result = std::remove_if(mc.begin(), mc.end(), [](Point &p){return p.z()==0;});
mc.size = it_result.m_point.pos;

打印输出现在显示

 AFTER REMOVING ELEMENTS with z()==0
[2024-03-31 19:16:43.206] [info] it_result.pos 6
(1,19.1,1) (2,18.1,1) (4,16.1,1) (5,15.1,1) (7,13.1,1) (8,12.1,1)

接下来,我们按 y 值对点进行排序

std::sort(mc.begin(),mc.end(),[](Point &p1, Point &p2){return p1.y()<p2.y();});

结果是

 AFTER SORTING
(8,12.1,1) (7,13.1,1) (5,15.1,1) (4,16.1,1) (2,18.1,1) (1,19.1,1)

示例代码

#include <iostream>
#include <vector>
#include <algorithm>

#include <spdlog/spdlog.h>

using namespace std;

int main()
{
    MyContainer mc;
    mc.data.resize(30);
    mc.size = mc.pitch = 10;
    for(int i=0;i<mc.pitch;i++)
    {
        mc.data[i] = i;
        mc.data[mc.pitch+i] = 20.1-i;
        mc.data[2*mc.pitch+i] = i%3 == 0 ? 0 : 1;
    }
    spdlog::info("INITIAL DATA");
    mc.print_contents();

    SOAIterator it_result = std::remove_if(mc.begin(), mc.end(), [](Point &p){return p.z()==0;});
    mc.size = it_result.m_point.pos;
    spdlog::info("\n\n AFTER REMOVING ELEMENTS with z()==0");
    spdlog::info("it_result.pos {}", it_result.m_point.pos);
    mc.print_contents();

    spdlog::info("\n\n BEGIN SORTING");
    std::sort(mc.begin(),mc.end(),[](Point &p1, Point &p2){return p1.y()<p2.y();});
    spdlog::info("\n\n AFTER SORTING");
    mc.print_contents();
    return 0;
}

结论

提出的方法尚未经过彻底测试,例如,它可能很慢,但似乎可以正确工作。该容器目前适用于具有 3 个数组的浮点值,但可以重写为模板以支持任何数据。对于实际使用,应删除跟踪 `spdlog` 的函数。我的目的是对来自 GPU 模拟的点云数据进行排序。如果有人发现此代码有用,请在下方留言。任何评论都值得赞赏。

© . All rights reserved.