65.9K
CodeProject 正在变化。 阅读更多。
Home

一种新的快速哈希表实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (3投票s)

2020年7月30日

MIT

9分钟阅读

viewsIcon

14061

downloadIcon

235

有序数组和哈希表的合并。

引言

哈希表是一种数据结构,它允许使用字符串键来查找值。有些哈希表在插入时速度快,有些在查找值时速度快。一些实现存在删除缓慢、内存碎片化或随着项目增多而性能下降的问题。大多数哈希表不按顺序存储项目,也不通过索引检索值,并且为了快速搜索或跟踪顺序(如果支持的话)需要消耗更多内存。然而,此实现可能以更小的折衷提供更高的速度。

实现

对于一种能够通过数字检索值的数据结构,它需要是一个线性数组,当其内存块达到限制时,它会扩展到一个新的堆,并将所有项目从旧内存移动到新内存。因此,它必须是一维数组。虽然它不仅仅是一个数组或哈希表,但它两者兼备。

template <typename Value_>
class HArray {
    size_t  size_{0};
    size_t  capacity_{0};
    Value_ *storage_{nullptr};
};

变量size_存储项目的数量,capacity_用于存储堆/内存块的大小,storage_指向第一个值的存储位置。但是,项目必须存储的不仅仅是值;它们还需要变量Key及其哈希值,以便HArray能够作为哈希表运行。

template <typename Key_, typename Value_>
struct HAItem {
    size_t   Hash;
    Key_     Key;
    Value_   Value;
};

template <typename Key_, typename Value_>
class HArray {
    size_t                  size_{0};
    size_t                  capacity_{0};
    HAItem<Key_, Value_>   *storage_{nullptr};
};

线性数组的插入如下所示

private:
void insert_(Key_ key, Value_ value, size_t hash) {
    if (size_ == capacity_) {
        // expand
    }

    HAItem<Key_, Value_> &item = storage_[size_];

    item.Hash  = hash;
    item.Key   = key;
    item.Value = value;

    ++size_;
}

但是,为了使哈希表正常工作,它需要一种搜索技术并处理冲突。一种解决方法是添加一个链表,通过向结构添加一个额外的变量Next来指向下一个项目。因此,新结构如下所示

template <typename Key_, typename Value_>
struct HAItem {
    size_t     Hash;
    HAItem    *Next;
    Key_       Key;
    Value_     Value;
};

搜索技术将给定的键与存储的键进行比较。它通过将其哈希值除以 (capacity – 1) 并取余数来定位,该余数指向分配的堆中的一个索引(位置)。如果compare函数发现两个键不同,则会检查Next指针,查看其项目是否持有相同的键,并继续查找,直到找到它或遇到null指针。然而,这可能会导致无限循环。

假设capacity_21,有两个项目要插入:item1的哈希值为41item2的哈希值为60item1的位置是41 % (21 - 1) = 41 % 20 = 1。算法将查找索引1并将最后的/nullNext指针设置为指向item1。对于item260 % 20 = 0,因此,storage_[0].Next被设置为指向item2;假设Nextnull。现在,如果一个新项目进来,其哈希值为20, 40, 21, 41……它将指向item1item2,当它指向其中一个时,它会看到item1指向item2,而item2.Next又指向item1,当它尝试查找链表的末尾时,它会陷入无限循环。

这是可以通过存储起始地址来解决的,并通过检查每个Next变量来解决,然后如果它们匹配则中断循环,然后将新项目插入到最后一个Next和其后的那个之间(可能为null)。然而,有一种更快的解决方法,那就是添加另一个名为Anchor的指针。因此,HAItem的最终结构如下所示

template <typename Key_, typename Value_>
struct HAItem {
    HAItem    *Anchor{nullptr};
    size_t     Hash{0};
    HAItem    *Next{nullptr};
    Key_       Key{};
    Value_     Value{};
};

现在Next仅用于处理冲突,而Anchor指向项目的实际位置。有了这个,Next将指向前向元素(1, 2, 3, 4, 5),并且CPU prefetching可能会带来下一个项目,而search函数正在比较给定的键与当前项目的Key。然而,Anchor可能指向堆中的任何位置,如果它是null,则表示项目不存在。

为了理解这个概念,假设我们有一个容量为 6 个元素的数组,我们需要插入以下项目

哈希
item1 A 10
item2 B 15
item3 C 21
item4 D 24

哈希表的基础是(capacity – 1),即5item1被插入到位置0——因为 10 % 5 等于 0,因此,storage_[0].Anchor = 索引 0 的指针。item2被插入到索引 1,但其哈希结果指向 0(15 % 5 = 0),搜索函数将查找索引 0 并找到item1,然后检查其Next指针以找到具有null值的最终指针并将其设置为指向item2item3被放置在索引 3,其哈希指向索引 1。即使索引 1 中有一个项目,其Anchor也为null,函数将设置storage_[1].Anchor指向item3item4将获得索引 3,其哈希结果指向索引 4,因此,storage_[4].Anchor = &item3。数据将看起来像

无需重新哈希、二叉树、分离哈希表或额外的指针。此外,拥有Anchor变量意味着search函数始终可以返回指向指针的引用,并且它可以引用AnchorNext,该指针可能指向一个项目或nullsearch函数可以定义为

public:
Value_ *Find(const Key_ &key) {
    HAItem_T *item = *(find_(key, hash_fun_(key)));

    if (item != nullptr) {
        return &(item->Value);
    }

    return nullptr;
}

private:
HAItem_T **find_(const Key_ &key, size_t hash) const {
    if (capacity_ == 0) {
        // To prevent dividing by zero, allocate a small heap.
        capacity_ = 2;
        storage_  = new HAItem_T[capacity_];
    }

    HAItem_T **item = &(storage_[(hash % capacity_)].Anchor);

    while (((*item) != nullptr) && ((*item)->Key != key)) {
        item = &((*item)->Next);
    }

    return item;
}

哈希表会检查每个插入的项目以查看它是否存在,并且通过find_(),它会返回对该项目的引用。当它存在时,它会用给定值替换其值,如果不存在,则返回的引用指向新项目在哈希表中应放置的位置;一石二鸟。

public:
Value_ &operator[](Key_ &&key) {
    const size_t hash = hash_fun_(key);
    HAItem_T **item   = find_(key, hash);

    if ((*item) == nullptr) {
        insert_(static_cast<Key_ &&>(key), hash, item);
    }

    return (*item)->Value;
}

private:
void insert_(Key_ &&key, size_t hash, HAItem_T **position) {
    if (index_ == capacity_) {
        Resize(capacity_ * 2);
        position = find_(key, hash);
    }

    (*position) = (storage_ + index_);
    (*position)->Hash    = hash;
    (*position)->Key     = static_cast<Key_ &&>(key);

    ++index_;
    ++size_;
}

find_()被调用两次的唯一情况是当内存块的大小发生变化时;因为基础取决于capacity_的值。

public:
void Resize(size_t new_size) {
    capacity_ = ((new_size > 1) ? new_size : 2);

    if (index_ > new_size) {
        index_ = new_size; // Shrink
    }

    const size_t base = (capacity_ - 1);
    size_t       n    = 0;
    size_t       m    = 0;

    HAItem_T *src = storage_;
    storage_      = new HAItem_T[capacity_];

    while (n != index_) {
        HAItem_T &src_item = src[n];
        ++n;

        if (src_item.Hash != 0) {
            HAItem_T *des_item = (storage_ + m);
            des_item->Hash    = src_item.Hash;
            des_item->Key     = static_cast<Key_ &&>(src_item.Key);
            des_item->Value   = static_cast<Value_ &&>(src_item.Value);

            HAItem_T **position = &(storage_[(des_item->Hash % base)].Anchor);

            while (*position != nullptr) {
                position = &((*position)->Next);
            }

            *position = des_item;

            ++m;
        }
    }

    index_ = size_ = m;
    delete[] src;
}

由于Resize()不需要检查项目是否存在,因此重新哈希机制非常简单(4行代码)。

            HAItem_T **position = &(storage_[(des_item->Hash % base)].Anchor);

            while (*position != nullptr) {
                position = &((*position)->Next);
            }

            *position = des_item;

if (src_item.Hash != 0)会丢弃任何已删除的项目,删除与插入相同,但它会跟踪即将删除的项目之前的一个项目;以设置其Next指向下一个项目的Next。此外,它会清除值和键,但会保留NextAnchor;因为它们保存了哈希信息。

public:
inline void Delete(const Key_ &key) {
    delete_(key);
}

public:
void DeleteIndex(size_t index) {
    if (index < index_) {
        const HAItem_T &item = storage_[index];

        if (item.Hash != 0) {
            delete_(item.Key, item.Hash);
        }
    }
}

private:
void delete_(const Key_ &key, size_t hash) {
    if (capacity_ != 0) {
        HAItem_T **item = &(storage_[(hash % capacity_)].Anchor);
        HAItem_T **before = item;

        while (((*item) != nullptr) && ((*item)->Key != key)) {
            before = item; // Store the previous item
            item   = &((*item)->Next);
        }

        if ((*item) != nullptr) {
            HAItem_T *c_item = *item; // Current item

            if ((*before) >= (*item)) {
                /*
                * If "before" inserted after "item"
                * (e.g., deleting items from 0 to n).
                */
                (*before) = c_item->Next;
            } else {
                /*
                * If "before" inserted before "item"
                * (e.g., deleting items from n to 0).
                */
                (*item) = c_item->Next;
            }

            c_item->Hash  = 0;
            c_item->Next  = nullptr;
            c_item->Key   = Key_();
            c_item->Value = Value_();
            --size_;
        }
    }
}

由于HArray本质上是一个数组,因此可以添加一个接受初始大小的构造函数

explicit HArray(size_t size) : capacity_(size) {
    if (size != 0) {
        storage_ = new HAItem_T[capacity_];
    }
}

尽管如此,也可以通过索引访问任何键或值

Value_ *GetValue(size_t index) {
    if (index < index_) {
        HAItem_T &item = storage_[index];

        if (item.Hash != 0) {
            return &(item.Value);
        }
    }

    return nullptr;
}

const Key_ *GetKey(size_t index) const {
    if (index < index_) {
        const HAItem_T &item = storage_[index];

        if (item.Hash != 0) {
            return &(item.Key);
        }
    }

    return nullptr;
}

const HAItem_T *GetItem(size_t index) const {
    if (index < index_) {
        const HAItem_T &item = storage_[index];

        if (item.Hash != 0) {
            return &(item);
        }
    }

    return nullptr;
}

改进的实现

虽然之前的实现有效且性能尚可,但仍有改进空间,首先是确定最薄弱的链。

查看代码,最常用的变量是Anchor,即使没有与之关联的项目,也需要初始化它,以及HAItem的整个结构。因此,需要两件事:一,将Anchor移到其独立的数据结构中,二,分配未初始化的内存块;以避免设置变量两次,或不必初始化其任何部分,除非需要。对于第二点,std::allocator是合适的。因此,新结构如下所示

template <typename Key_, typename Value_>
struct HAItem {
    size_t  Hash;
    HAItem *Next;
    Key_    Key;
    Value_  Value;
};

struct HAItem_P {
    HAItem_T *Anchor{nullptr};
};

template <typename Key_, typename Value_, typename Hash_ = std::hash<Key_>,
          typename Allocator_ = std::allocator<HAItem<Key_, Value_>>>
    size_t       size_{0};
    size_t       index_{0};
    size_t       capacity_{0};
    HAItem_P   * hash_table_{nullptr};
    HAItem_T   * storage_{nullptr};
    Allocator_   allocator_{};
};

 

其余部分几乎相同,但由于Anchor位于其独立堆中,与之前相比,增加其大小不会产生太大影响,并且使其更大可以提供更快的查找并减少对Next的调用;因为冲突更少。

private:
void insert_(Key_ &&key, size_t hash, HAItem_T **&position) {
    if (index_ == capacity_) {
        Resize(capacity_ * 2);
        position = find_(key, hash);
    }

    (*position) = (storage_ + index_);
    new (*position) HAItem_T{hash, nullptr, static_cast<Key_ &&>(key), Value_{}};

    ++index_;
    ++size_;
}

private:
HAItem_T **find_(const Key_ &key, size_t hash) {
    if (capacity_ == 0) {
        // To prevent dividing by zero, allocate a small heap.
        capacity_   = 1;
        hash_table_ = new HAItem_P[(capacity_ * 2)];
        storage_    = allocator_.allocate(capacity_);
    }

    HAItem_T **item = &(hash_table_[(hash % (capacity_ * 2))].Anchor);

    while (((*item) != nullptr) && ((*item)->Key != key)) {
        item = &((*item)->Next);
    }

    return item;
}

private:
void delete_(const Key_ &key, size_t hash) {
    if (capacity_ != 0) {
        HAItem_T **item = &(hash_table_[(hash % (capacity_ * 2))].Anchor);

        ...
    }
}

调整大小函数可以分成两个:一个处理项目,另一个生成哈希

public:
void Resize(size_t new_size) {
    const size_t old_capacity = capacity_;

    capacity_ = ((new_size > 1) ? new_size : 2);

    if (index_ > new_size) {
        destruct_range_((storage_ + new_size), (storage_ + index_));
        index_ = new_size; // Shrink
    }

    size_t n = 0;
    size_t m = 0;

    delete[] hash_table_;
    HAItem_T *src = storage_;
    storage_      = allocator_.allocate(capacity_);

    while (n != index_) {
        HAItem_T &src_item = src[n];
        ++n;

        if (src_item.Hash != 0) {
            HAItem_T *des_item = (storage_ + m);

            new (des_item)
                HAItem_T{src_item.Hash, nullptr,
                        static_cast<Key_ &&>(src_item.Key),
                        static_cast<Value_ &&>(src_item.Value)};

            ++m;
        }
    }

    index_ = size_ = m;
    allocator_.deallocate(src, old_capacity);

    generate_hash_();
}

private:
void generate_hash_() {
    // hash_table_ should be deleted before this.

    const size_t base = (capacity_ * 2);

    if (base != 0) {
        hash_table_ = new HAItem_P[base];

        for (size_t i = 0; i < size_; i++) {
            HAItem_T *item = (storage_ + i);

            HAItem_T **position = &(hash_table_[(item->Hash % base)].Anchor);

            while (*position != nullptr) {
                position = &((*position)->Next);
            }

            *position = item;
        }
    }
}

改进的查找

现在剩下的问题是:它能更快吗?答案是:是的,可以,只需一个简单的更改;在find_()函数中将除法%替换为按位与&,使其成为(hash & (base - 1))-1是为了避免获得哈希表的大小或零。为了更深入地理解这一点,假设哈希表的大小为 4:查找将是 0 & 4 = 0, 1 & 4 = 0, 2 & 4 = 0, 3 & 4 = 0,但 4 & 4 = 4,这不是一个有效的索引。此外,更多的零意味着更多的冲突。然而,1 & 3 = 1, 2 & 3 = 2, and 3 & 3 = 3, 4 & 3 = 0。现在所有这些都可以放置和定位在哈希表中。

private:
HAItem_T **find_(const Key_ &key, size_t hash) {
    if (capacity_ == 0) {
        base_       = 1;
        capacity_   = 1;
        hash_table_ = new HAItem_P[base_ + 1];
        storage_    = allocator_.allocate(capacity_);
    }

    HAItem_T **item = &(hash_table_[(hash & base_)].Anchor);

    while (((*item) != nullptr) && ((*item)->Key != key)) {
        item = &((*item)->Next);
    }

    return item;
}

当大小可以表示为 2 的 N 次方2^n时,该方法效果很好:2, 4, 8, 16, 32, 64, 128, ... 因为计算机只处理二进制代码。以以下表格为例

N & (7-1) 结果
0 & 6 0
1 & 6 0
2 & 6 2
3 & 6 2
4 & 6 4
5 & 6 4
6 & 6 6

0 与 1 冲突,2 与 3 冲突,4 与 5 冲突。现在,如果大小是 8,即 2^3

N & (8-1) 结果
0 & 7 0
1 & 7 1
2 & 7 2
3 & 7 3
4 & 7 4
5 & 7 5
6 & 7 6

没有冲突!因此,它是 O(1),但并不那么简单,因为条目有键,而值是键的哈希。尽管如此,它产生的冲突更少。

由于HArry的增长因子为 2:1, 2, 4, 8, ... 那么使用按位与而不是让 CPU 花费更多时间计算是没有问题的,当它可以一次完成时,是的,也可能不是。是的,如果它没有初始大小就使用,而不是,因为它有Compress(), Resize()SetCapacity()。如果它合并到另一个数组,则两个数组的元素总和成为新大小,前提是它不小于容量。为此,HArry需要一个额外的方法来对齐base_

private:
void set_base_(size_t n_size) noexcept {
    base_ = 1U;
    base_ <<= CLZL(static_cast<unsigned long>(n_size));

    if (base_ < n_size) {
        base_ <<= 1U;
    }

    --base_;
}

base_成为类中的新变量

...
class HArray {
    size_t      base_{0};
    size_t      index_{0};
    size_t      size_{0};
    size_t      capacity_{0};
    HAItem_P *  hash_table_{nullptr};
    HAItem_T *  storage_{nullptr};
    Allocator_  allocator_{};
};

对于其余部分

public:
void Resize(size_t new_size) {
    if (index_ > new_size) {
        destruct_range_((storage_ + new_size), (storage_ + index_));
        index_ = new_size; // Shrink
    }

    set_base_(new_size);
    resize_(new_size);
}

private:
void resize_(size_t new_size) {
    HAItem_T *src = storage_;
    storage_      = allocator_.allocate(new_size);
    size_t n      = 0;
    size_t m      = 0;

    while (n != index_) {
        HAItem_T &src_item = src[n];
        ++n;

        if (src_item.Hash != 0) {
            HAItem_T *des_item = (storage_ + m);
            ++m;

            new (des_item)
                HAItem_T{src_item.Hash, nullptr,
                            static_cast<Key_ &&>(src_item.Key),
                            static_cast<Value_ &&>(
                                src_item.Value)}; // Construct *des_item
        }
    }

    index_ = size_ = m;

    delete[] hash_table_;
    allocator_.deallocate(src, capacity_);
    capacity_ = new_size;
    generate_hash_();
}

private:
void generate_hash_() {
    hash_table_ = new HAItem_P[(base_ + 1)];

    for (size_t i = 0; i < index_; i++) {
        HAItem_T * item     = (storage_ + i);
        HAItem_T **position = &(hash_table_[(item->Hash & base_)].Anchor);

        ...
    }
}

private:
void insert_(Key_ &&key, size_t hash, HAItem_T **&position) {
    if (size_ == capacity_) {
        ++base_;
        base_ *= 2;
        --base_;

        resize_(capacity_ * 2);
        position = find_(key, hash);
    }

    ...
}

private:
void delete_(const Key_ &key, size_t hash) {
    if (capacity_ != 0) {
        HAItem_T **item = &(hash_table_[(hash & base_)].Anchor);
        ...
    }
}

对于带有初始大小的构造函数

explicit HArray(size_t size) : capacity_{size} {
    set_base_(size);
    hash_table_ = new HAItem_P[(base_ + 1)];
    storage_    = allocator_.allocate(capacity_);
}

现在,它与最快的竞争。

基准测试

注意:英文单词来自:https://github.com/dwyl/english-words

编译

  • cmake

    • Linux

mkdir build
cd build
cmake -D BUILD_TESTING=0 ..
cmake --build .
./benchmark
  • Windows

mkdir build
cd build
cmake -D BUILD_TESTING=0 ..
cmake --build . --config Release
.\Release\benchmark.exe
  • GCC/Clang

mkdir build
c++ -O3 -std=c++11 -I ./include ./src/benchmark.cpp -o ./build/benchmark
./build/benchmark

结论

虽然有很多哈希表的实现,但很少有提供通过索引访问元素的,有些实现存在内存碎片化问题。此外,按顺序存储项目可能会导致查找缓慢。此实现采用有序数组并为其添加哈希功能,以创建一个新的、快速且更易于使用的哈希表,代码行数约 300 行。

GitHub 仓库:https://github.com/HaniAmmar/HArray

示例

#include <iostream>
#include <string>

#include "HArray.hpp"

using Qentem::HArray;

int main() {
    HArray<std::string, size_t> ha;
    const size_t *              val;

    // HArray<std::string, size_t> ha(10); // Initialize with the capacity of 10
    // ha.SetCapacity(10); // Same, but after initialization

    ha["a"]  = 1;
    ha["b"]  = 2;
    ha["c"]  = 30;
    ha["c"]  = 3; // Reset "c" to value of "3";
    ha["dd"] = 4;
    ha["e"]  = 5;
    ha["z"]  = 50;
    ha.Rename("dd", "d"); // Will rename "dd" to "d".

    std::cout << ha["a"] << '\n'; // Output: 1
    std::cout << ha["c"] << '\n'; // Output: 3

    std::cout << "\nSize: " << ha.Size() << "\nValues: "; // Output: 6

    for (size_t i = 0; i < ha.ArraySize(); i++) {
        val = ha.GetValue(i);

        if (val != nullptr) {
            std::cout << (*val) << ' '; // Output: 1 2 3 4 5 50
        }
    }

    std::cout << "\n\n";

    val = ha.Find("z");

    if (val != nullptr) {
        std::cout << "The value of \"z\" is: " << (*val); // Output: 50
    }

    std::cout << "\n\nKeys: ";

    for (size_t i = 0; i < ha.ArraySize(); i++) {
        const std::string *key = ha.GetKey(i);

        if (key != nullptr) {
            std::cout << *key << ' '; // Output: a b c d e z
        }
    }

    ha.Delete("z");
    std::cout << "\n\nSize: " << ha.Size() << '\n';     // Output: 5
    std::cout << "Index: " << ha.ArraySize() << '\n';   // Output: 6
    std::cout << "Capacity: " << ha.Capacity() << '\n'; // Output: 8

    ha.Compress(); // Will remove extra storage and/or remove any deleted items
    std::cout << "\nSize: " << ha.Size() << '\n';       // Output: 5
    std::cout << "Index: " << ha.ArraySize() << '\n';   // Output: 5
    std::cout << "Capacity: " << ha.Capacity() << '\n'; // Output: 5

    ha.Delete("d");

    /* The next line will resize it to 4 if 4 is bigger than the number of
     * items, and will remove any deleted items.
     *
     * If 4 is not bigger, the capacity will be set to 4.
     */
    ha.Resize(4);
    std::cout << "\nSize: " << ha.Size() << '\n';         // Output: 3
    std::cout << "Capacity: " << ha.Capacity() << "\n\n"; // Output: 4

    for (size_t i = 0; i < ha.ArraySize(); i++) {
        val = ha.GetValue(i);

        if (val != nullptr) {
            std::cout << (*val) << ' '; // Output: 1 2 3
        }
    }

    std::cout << "\n\n";

    ha["d"] = 4; // Add "d" again

    HArray<std::string, size_t> ha2;

    ha2["b"] = 10;
    ha2["d"] = 30;
    ha2["w"] = 220;
    ha2["z"] = 440;

    HArray<std::string, size_t> ha3 = ha; // Copy ha to ha3

    /* Add ha2's items to ha if they do not exist, and/or replace the ones that
     * exist.
     */
    ha += ha2;

    using my_item_T = Qentem::HAItem<std::string, size_t>;

    for (size_t i = 0; i < ha.ArraySize(); i++) {
        const my_item_T *item = ha.GetItem(i);

        if (item != nullptr) {
            std::cout << item->Key << ": " << item->Value << ". ";
            //  Will output: a: 1. b: 10. c: 3. d: 30. w: 220. z: 440.
        }
    }

    std::cout << "\n\n";

    ha2 += ha3; // Add ha3's items to ha2

    for (size_t i = 0; i < ha2.ArraySize(); i++) {
        const my_item_T *item = ha2.GetItem(i);

        if (item != nullptr) {
            std::cout << item->Key << ": " << item->Value << ". ";
            //  Will output: b: 2. d: 4. w: 220. z: 440. a: 1. c: 3.
        }
    }

    std::cout << '\n';

    ha.Clear(); // Will remove any existing items and set it to zero.

    return 0;
}

历史

  • 2020 年 7 月 30 日:初始版本
  • 2020 年 8 月 3 日:添加了改进的实现
  • 2020 年 8 月 6 日:添加了改进的查找
  • 2020 年 8 月 10 日:添加了示例
© . All rights reserved.