65.9K
CodeProject 正在变化。 阅读更多。
Home

超快轻量级时间序列存储引擎 – LMDB 第一部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (23投票s)

2020年10月5日

CPOL

29分钟阅读

viewsIcon

36920

downloadIcon

750

一个轻量级的时序数据存储引擎,每秒可存储数百万个时序值

引言

时序数据是指按时间顺序索引的表,而时序表中的信息是许多机器学习算法的主要成分。

时序数据中的信息是指任何随时间变化的内容,例如

  • 股票价格和成交量
  • 降水量

通过分析股票价格,您可以尝试预测未来的股票价格;通过分析降水量,您可以对未来的洪水或干旱,或者电力价格(如果主要电力来源是水力发电)做出合格的猜测。

忽略猫咪图片,时序数据存储的数据量可能比任何其他类型的数据都要多。

在他们的书《Time Series Databases: New Ways to Store and Access Data》中,Ellen Friedman 和 Ted Dunning 提出了使用具有混合设计的 NoSQL 数据库来实现高性能时序数据库引擎的论点,并以 Open TSDB 为参考案例。

这个概念很简单:使用 blob 存储时序数据。

本文及配套源代码的目的是展示这个概念是可行的——并且可以用易于理解的 C++ 实现。如果您有 C# 开发背景,可以轻松理解本文的代码,不需要深入的 C++ 知识。

构建代码

构建和运行代码需要 boost C++ 库。提供的 Visual Studio 项目期望环境变量 BOOST_ROOT 指向您解压缩发行版的目录,并且库文件可以在 $(BOOST_ROOT)\stage\lib 中找到。

请确保您编译的是 x64 版本。如果您计划在不支持 AVX2 的系统上测试代码,则需要在 C/C++Code Generation 下将 Enable Enhanced Instruction Set 设置更改为适合您系统的数值。

性能测试使用环境变量 HCC_TEST_DATA_ROOT,该变量必须设置为一个目录的完整路径,测试程序可以在其中创建子目录来存放数据库文件。请确保包含该目录的驱动器至少有 40 GB 的可用空间。

性能

既然我称之为超快的时序数据存储,就应该有一些数据来支撑我的说法。

因此,在深入介绍解决方案的细节之前,我想展示一些我用来了解其工作效果的性能测试结果。程序的源代码包含在本文的下载文件中,在评估结果时:请记住,这些测试是在笔记本电脑上执行的,而不是高性能服务器。

计时是从写入第一个时序点到最后一个时序点被写入并提交到数据库文件。读取是在一个新的事务范围内完成的。

其中一个程序 HExPerf01.exe 向单个时序数据写入了十亿个时序值,然后读取所有数据。

Database directory:F:\Database\LMDB
Wrote 1000000000 timeseries points in 43.384699 seconds - points pr. second: 23049600.966461
Read  1000000000 timeseries points in 1.232314 seconds - points pr. second: 811481423.445532
Sum inserted:500000000067108992.000000,
Sum read:    500000000067108992.000000 

这无疑是很有希望的,因为该程序

  • 每秒插入超过 2300 万个时序点
  • 每秒读取超过 8.11 亿个时序点

10 亿个时序点足够多的数据,可以证明该时序数据存储引擎不仅仅是一个玩具。

时序点是一个包含三个字段的记录

  1. Timestamp: DateTime
  2. Flags: Int64
  3. Value: double

每条记录的大小为 24 字节,因此 10 亿个时序点的大小接近 24 GB。

Sum inserted 是写入存储的时序点值的总和,Sum read 是从存储中读取的时序点值的总和。相同的总和验证了我们读取的数据与写入的数据相同。

再次运行,它写入时序点的速度更快。

Database directory:F:\Database\LMDB
Wrote 1000000000 timeseries points in 23.693139 seconds - points pr. second: 42206310.574444
Read  1000000000 timeseries points in 1.181736 seconds - points pr. second: 846212833.697684
Sum inserted:500000000067108992.000000,
Sum read:    500000000067108992.000000

这次,程序

  • 每秒插入超过 4200 万个时序点
  • 每秒读取超过 8.46 亿个时序点

写入性能提升是因为时序数据存储引擎重复使用了第一次运行时创建的存储文件。

性能看起来不错,但这 hardly 是时序数据存储引擎的正常用例。

多年来,我查看了几个时序引擎的基准测试,它们通常是批量写入时序点。HExPerf03.exe 写入了 10 亿个时序值,将数据分布到 10,000 个时序中,以 250 个值为一批写入时序点。

for ( size_t i = 0; i < BatchCount; ++i )
{
    for ( auto& timeseriesId : timeseriesIds )
    {
        timeseriesCursor1.ChangeTimeseries( timeseriesId );
        for ( size_t j = 0; j < BatchSize; ++j )
        {
            Int64 value = static_cast<Int64>( ( i * BatchSize ) + j ) + 1;
            sumWritten += value;
            timeseriesCursor1.Insert( Point( DateTime( value ), 
                                        0, static_cast<double>( value ) ) );
        }
    }
} 

读取并计算数据的简单校验和。

size_t totalRows = 0;
for ( auto& timeseriesId : timeseriesIds )
{
    timeseriesCursor2.ChangeTimeseries( timeseriesId );
    totalRows += timeseriesCursor2.ForEach( []( const Point& point, double& sumRead )
    {
        sumRead += point.Value( );
    }, sumRead );
}

输出

Database directory:F:\Database\LMDB
Inserted 1000000000 timeseries points into 10000 timeseries in 27.847767 seconds
         - points pr. second: 35909521.937612
Read  1000000000 timeseries points from 10000 timeseries in 2.563846 seconds
         - points pr. second: 390039103.370308
Sum inserted:50000500000000.000000,
Sum read:    50000500000000.000000 

正如预期的那样,性能有所下降,但 timeseries 引擎仍然能够

  • 每秒插入超过 3500 万个时序点
  • 每秒读取超过 3.9 亿个时序点

对于 timeseries 存储引擎来说,最有趣的测试是它如何很好地处理一次写入一个值到 timeseries 存储。如果您有 10,000 个传感器同时报告变化,那么您将面临与目前演示的不同的工作负载。与之前的程序一样,HExPerf04.exe 写入 10 亿个值,将数据分布到 10,000 个 timeseries 中,但它会向一个 timeseries 写入一个值,然后移到下一个。

for ( size_t i = 0; i < NumberOfPoints; ++i )
{
    for ( auto& timeseriesId : timeseriesIds )
    {
        timeseriesCursor1.ChangeTimeseries( timeseriesId );
        Int64 value = static_cast<Int64>( i + 1 );
        sumWritten += value;
        timeseriesCursor1.Insert( DateTime( value ), 0, static_cast<double>( value ) );
    }
}

输出

Database directory:F:\Database\LMDB
Inserted 1000000000 timeseries points into 10000 timeseries in 76.781371 seconds
         - points pr. second: 13023992.431581
Read  1000000000 timeseries points from 10000 timeseries in 2.515501 seconds
         - points pr. second: 397535075.811728 

同样,性能有所下降,而该程序

  • 每秒插入超过 1300 万个时序点
  • 每秒读取超过 3.97 亿个时序点

请注意,读取性能变化很大,对于此工作负载,它在每秒 3 亿到 4 亿个时序点之间波动,而写入性能相当稳定。这可能是因为读取性能在很大程度上依赖于 Windows 为内存映射 I/O 提供的缓存机制,该机制试图平衡系统上运行的所有应用程序的需求。

由于上面进行的每个测试都在读取之前写入了所有数据,因此缓存是热的,这是读取性能如此出色的主要原因。为了演示当缓存是冷的(cold cache)时 timeseries 存储引擎的读取性能,HExPerf05.exe 打开一个已有的存储并读取所有 timeseries 点一次。

Database directory:F:\Database\LMDB
Read  1000000000 timeseries points from 10000 timeseries in 10.476408 seconds
         - points pr. second: 95452568.067823
Sum read:    50000500000000.000000

正如预期的那样,读取性能有所下降,但仍然非常出色:每秒读取超过 9500 万个 timeseries 点,对缓存信息的受益甚微。

为了让这些数字更具可比性:2020 年 10 月 1 日,纳斯达克有 24,441,833 笔交易——而 timeseries 存储引擎可以在 96 秒内存储 10 亿笔交易,其中每笔交易的记录包含

  • Timestamp: DateTime
  • BidId: 64 位买入标识符
  • OfferId: 64 位卖出标识符
  • Volume: 交易量的 64 位浮点数
  • Price: 每单位成交量的价格的货币 64 位定点数

每条记录是 40 字节,timeseries 存储引擎在 96 秒内存储了近 40 GB。因此,如果这些是每笔交易存储的信息,那么它可以在不到 3 秒的时间内存储一个交易日的所有交易。

HExPerf06 项目还演示了将自定义 timeseries 点类与 timeseries 存储引擎一起使用的简便性。

如果您在自己的系统上尝试这些测试,请注意,您的防病毒解决方案可能会对测试性能产生负面影响。性能仍然应该非常出色,但如果关闭对包含数据文件的文件夹的防病毒监控,效果会更好。

我想埃伦和特德知道他们在写什么,当然,timeseries 存储引擎利用了 timeseries 数据的特性,并且针对最常见的用法模式进行了优化。

时序数据

一个高效的 timeseries 存储引擎处理数据的方式与常规数据库引擎不同,因为

  • 数据主要是追加到 timeseries
  • 对现有数据的更新相对较少
  • 时序点记录的大小通常是固定的
  • 每天会读取和写入大量数据,有时达数十亿条记录。

用于发电的典型风力发电机有多个涡流和位移传感器、加速度计以及风速和温度传感器;所有这些传感器都在不断报告信息。这些信息需要实时分析,通常需要对 timeseries 进行复杂的操作,以检测对风力发电场的安全和最佳运行至关重要的变化。

  • 趋势分析可用于将正常运行条件下的值与当前值进行比较。
  • 时间同步平均通常用于齿轮状态监测,与轴旋转同步对振动数据进行重采样,以从噪声数据中提取周期性波形。此技术可用于识别旋转轴承或变速箱缺陷。
  • 幅度解调用于检测产生撞击的缺陷,例如轴承中的滚动接触和齿轮啮合中的齿对齿接触。
  • 快速傅里叶变换和频谱分析可以区分正常旋转和特征缺陷频率。

这些只是可以利用大量高分辨率 timeseries 信息进行分析的示例,其中读取的数据量远大于写入量。我认为这很好地证明了 timeseries 存储引擎具有非凡的读取性能和良好的写入性能。

时序引擎

使用现有的 NoSQL 数据库引擎可以大大简化事情,但这也意味着设计必须适应该引擎的优点和缺点。

我将在本文中介绍的设计,极大地偏向于读取而非写入,我选择使用 Lightning Memory-Mapped Database (LMDB),https://symas.com/lmdb/,作为 blob 存储,因为它足够小,可以包含在本文的源代码中。

LMDB 是一个非常紧凑的 NoSQL 存储引擎,为其开发可能有点像驾驶一级方程式赛车:正确使用它将获得惊人的性能,但如果您不发挥其优势,性能可能会迅速下降。

该时序数据存储引擎实现为一组 C++ 类和模板,允许您为每个 timeseries 点指定自己的类型,以及可以在单个 blob 中存储的 timeseries 点的数量。timeseries 点的唯一要求是它必须满足此概念。

template<typename T>
concept PointType = requires(T t)
{
    { t.Timestamp() } -> std::convertible_to<DateTime>;
    std::is_default_constructible_v<T>;
    std::is_standard_layout_v<T>;
};

该库提供了一个类 TimeseriesPoint,它满足这些要求,并且将在整个系列中使用。它只有三个数据成员。

class TimeseriesPoint
{
    DateTime timestamp_;
    Int64 flags_;
    double value_;
public:
    ...
};

时间戳存储为 64 位整数,与 .NET System.DateTime 刻度(UTC)兼容。

这是用于存储测量数据的 timeseries 点的相当常见的表示,并且 B+ 树的初始存储开销略高于 1%。

blob 存储的策略很简单:

  • Timeseries 点被打包成固定大小的段,存储在 blob 中。
  • timeseriesGuid 标识。
  • 数据库中段的键有两个字段:
    • timeseriesGuid
    • 段中第一个 timeseries 点的 timestamp
  • 除最后一个段外,timeseries 的所有段都已完全填充数据。
  • 空段将被删除,数据库中不应有空段。
  • 每个段存储一个时间间隔的数据,并且该时间间隔的数据只能存在于该段中。

Timeseries 点在时间上可能分布不均,这对于主要处理发生时间不规则的事件的解决方案来说是好的。

由于项目中广泛使用 Guid,我对该类进行了改进,Guid 类现在依赖于 SSE 4.1。与 boost::uuids::uuid 相比,比较具有轻微的性能优势。这没有特别的原因,因为它们对于基本比较的作用相同。当 Guid 用作 std::unordered_map<K,V> 的键时,它的优势就显而易见了,因为它使 map 中的搜索速度比使用 std::unorderd_map<>boost::uuids::uuidboost::hash<boost::uuids::uuid> 快一倍以上。

CoCreateGuid 创建的 Guid 的值相当随机,因此在 std::hash<> 的特化中执行任何计算以获得良好的哈希键分布确实没有必要。

namespace std
{
  template<> struct hash<Harlinn::Common::Core::Guid>
  {
    constexpr std::size_t operator()( const Harlinn::Common::Core::Guid& guid ) const noexcept
    {
      return guid.Lo( );
    }
  };
}

由于 Guid 只是一个 16 字节的值,guid.Lo( ) 返回从 Guid 的第 9 个字节开始的 8 个字节,而这种策略在 std::unorderd_map<> 中效果非常好,其中哈希函数的性能起着重要作用。

使用引擎

使用 timeseries 存储引擎非常简单。

int main()
{
    using Engine = Timeseries::Engine<>;
    using Point = typename Engine::Point;
    using TimeseriesCursor = typename Engine::TimeseriesCursor;
    DateTime timestamp1( 2020, 1, 1 ), timestamp2( 2020, 1, 2 ),
        timestamp3( 2020, 1, 3 ), timestamp4( 2020, 1, 4 );

首先,我们接受 Timeseries::Engine 模板的默认参数,它指定 TimeseriesPoint 将用作 timeseries 点的表示,并且段可以容纳 8100 个条目,然后为 timeseries 点类型和 timeseries 光标创建一些别名。四个 timestamp 几乎不需要解释。

    auto DatabaseDir = GetDatabaseDir( );
    printf( "Database directory:%s\n", DatabaseDir.c_str( ) );
    TimeseriesEngineOptions options( DatabaseDir, true );
    auto timeseriesId1 = Test::Ids[0];

GetDatabaseDir() 检索 HCC_TEST_DATA_ROOT 环境变量的值,附加 "\\LMDB",并确保生成的路径存在。TimeseriesEngineOptions 构造函数的第二个参数指定我们是否要创建新环境。Test::Ids 只是一个包含 10,000 个预定义 GUID 的数组,我们选择第一个来标识 timeseries

    Engine engine( options );
    auto transaction = engine.BeginTransaction( );
    auto timeseriesCursor = transaction.OpenTimeseries( timeseriesId1 );

引擎构造函数初始化 LMDB,现在引擎已准备就绪。engine.BeginTransaction( ); 创建一个新的写入事务,我们在此事务中使用 OpenTimeseries(…) 打开一个 TimeseriesCursor,将标识 timeseriesGuid 作为参数传递。最后,我们准备插入一些数据。

    timeseriesCursor.Insert( timestamp1, 1 );
    timeseriesCursor.Insert( timestamp2, 2 );
    timeseriesCursor.Insert( timestamp3, 3 );
    timeseriesCursor.Insert( timestamp4, 4 );

然后,我们遍历 timeseries

    if ( timeseriesCursor.MoveFirst( ) )
    {
        do
        {
            auto& current = timeseriesCursor.Current( );
            std::cout << "Timestamp: " << current.Timestamp( )
                << ", Flags: " << current.Flags( )
                << ", Value: " << current.Value( ) << std::endl;
        } while ( timeseriesCursor.MoveNext( ) );
    }

搜索有点意思。

    DateTime timestamp( 2020, 1, 2, 12, 0, 0 );

存储中没有具有此 timestamptimeseries 点,但仍然可以使用此 timestamp 搜索前一个 timeseries 点,这一点很有意义。

    auto found = timeseriesCursor.Search( timestamp );
    if ( found )
    {
        auto& current = timeseriesCursor.Current( );
        std::cout << "Found Timestamp: " << current.Timestamp( )
            << ", Flags: " << current.Flags( )
            << ", Value: " << current.Value( ) << std::endl;
    }

此时,我们完成了想做的事情,是时候清理了。

    timeseriesCursor.Close( );
    transaction.Commit( );
}

最后两行代码都不是必需的,但如果不进行提交,析构函数将回滚对 timeseries 存储所做的任何更改。

Database directory:F:\Database\LMDB
Timestamp: 01.01.2020 00:00:00, Flags: 0, Value: 1
Timestamp: 02.01.2020 00:00:00, Flags: 0, Value: 2
Timestamp: 03.01.2020 00:00:00, Flags: 0, Value: 3
Timestamp: 04.01.2020 00:00:00, Flags: 0, Value: 4
Found Timestamp: 02.01.2020 00:00:00, Flags: 0, Value: 2

这涵盖了 timeseries 存储引擎最重要的操作。棘手的部分是确保这些操作可以快速执行,其余的都是辅助性的。

timeseries 引擎有三个主要部分:

  1. EngineTransactionTimeseriesCursor C++ 模板类。
  2. SegmentSegmentKey C++ 模板类。
  3. LMDB C++ 类。

Engine、Transaction 和 TimeseriesCursor C++ 模板类

Engine C++ 模板类是使用 timeseries 存储引擎的入口点。默认情况下,段的大小(可以写入 blob 的 timeseries 点的最大数量)设置为 8100,用于表示单个 timeseries 点的类型是 TimeseriesPoint

构造函数接受一个参数,即 EngineOptions 对象的引用。EngineOptions 允许您指定包含 LMDB 数据文件的目录,以及是创建新存储还是打开现有存储。除了构造函数和析构函数之外,Engine 模板还公开了一个函数:

Transaction<TP, segmentSize>
    BeginTransaction( TransactionFlags transactionFlags = TransactionFlags::None ) const
{
    auto lmdbTransaction = environment_.BeginTransaction( transactionFlags );
    Transaction<TP, segmentSize> result( const_cast<Engine*>(this), 
                                           std::move( lmdbTransaction ) );
    return result;
}

BeginTransaction(…) 创建一个新的 Transaction 对象,该对象包装一个 LMDB::Transaction 对象并代表光标缓存信息。

此缓存实现为以下模板的实例:

std::unordered_map<Guid, std::unique_ptr<TimeseriesInfoType>> timeseriesMap_;

其中 TimeseriesInfoType 是以下模板的实例化:

template<Segments::PointType TP, size_t segmentSize>
class TimeseriesInfo
{
public:
    using SegmentType = Segments::Segment< TP, segmentSize>;
private:
    DateTime minTimestamp_;
    DateTime maxTimestamp_;
    SegmentType modificationBuffer_;
    size_t changes_ = 0;
    std::optional<DateTime> lastSegmentTimestamp_;
    DateTime loadedTimestamp_;
    Guid id_;
public:
    ...
};

在这里,我们保留了每个 timeseries 的一些详细信息,例如 timeseries 中第一个和最后一个 timeseries 点的 timestamp。然后,我们有修改缓冲区及其更改计数,以及存储在 LMDB 中的最后一个段的 timestamploadedTimestamp_ 包含修改缓冲区中第一个 timeseries 点的 timestamp 值,在加载时

这足以维护更新存储的缓存,从而显著提高写入性能。Commit 将任何缓冲的修改刷新到存储。

void Commit( )
{
    SaveCachedUpdates( );
    lmdbTransaction_.Commit( );
    timeseriesMap_.clear( );
}

SaveCachedUpdates( ) 仅迭代 timeseriesMap_ 中的条目。

void SaveCachedUpdates( )
{
    auto database = TimeseriesDataTable( );
    for ( auto& entry : timeseriesMap_ )
    {
        TimeseriesInfoType* timeseriesInfo = entry.second.get( );
        if ( timeseriesInfo->Changes( ) )
        {
            timeseriesInfo->SetChanges( 0 );
            auto modificationBuffer = timeseriesInfo->ModificationBuffer( );
            const auto& timeseries = timeseriesInfo->Id( );
            auto loadedTimestamp = timeseriesInfo->LoadedTimestamp( );
            auto& first = modificationBuffer->front( );
            if ( loadedTimestamp && ( loadedTimestamp != first.Timestamp( ) ) )
            {
                // The key changed, so delete previously stored segment
                KeyData deleteKey( timeseries, loadedTimestamp );
                lmdbTransaction_.Delete( database, deleteKey );
            }
            KeyData keyData( timeseries, first );
            lmdbTransaction_.Write( database, keyData, *modificationBuffer );
            timeseriesInfo->SetLoadedTimestamp( first.Timestamp( ) );
        }
    }
}

在将段写入存储时,有一件事必须正确处理:第一个 timeseries 点的 timestamp 可能与加载时修改缓冲区中第一个 timeseries 点的 timestamp 不同。如果发生这种情况,则必须在存储修改缓冲区并更新 TimeseriesInfo 对象中的 loadedTimestamp_ 字段之前,删除存储中的原始键/值对。实现写入缓存不一定很复杂,但必须高效。

TimeseriesCursor 类依赖于 Transaction 类的 GetTimeseriesInfo(…) 函数来检索 TimeseriesInfo 以获取 timeseries

TimeseriesInfoType* GetTimeseriesInfo( const Guid& timeseries )
{
    auto it = timeseriesMap_.find( timeseries );
    if ( it != timeseriesMap_.end( ) )
    {
        return it->second.get( );
    }
    else
    {
        auto timeseriesInfo = std::make_unique<TimeseriesInfoType>( timeseries );
        auto* result = timeseriesInfo.get( );
        timeseriesMap_.emplace( timeseries, std::move( timeseriesInfo ) );
        return result;
    }
}

如果这是第一次为特定 timeseries 调用该函数,它将创建一个新实例并将对象的拥有权转移到 timeseriesMap_

TimeseriesCursor 本身只有少数几个字段。

template<Segments::PointType TP = TimeseriesPoint,
      size_t segmentSize = TimeseriesCursorSegmentSize>
class TimeseriesCursor
    : public Segments::SegmentContainer<
                  TimeseriesCursor<TP, segmentSize>, Guid, TP, segmentSize>
{
public:
    static constexpr size_t SegmentSize = segmentSize;
    using Base = Segments::SegmentContainer<
                  TimeseriesCursor<TP, segmentSize>, Guid, TP, segmentSize>;
    using Point = typename Base::Point;
    using Overflow = std::optional<Point>;
    using SegmentType = typename Base::DataType;
    using KeyData = typename Base::KeyType;
    using TransactionType = Transaction<TP, segmentSize>;
private:
    using TimeseriesInfoType = Internal::TimeseriesInfo<TP, segmentSize>;
    size_t position_ = 0;
    TimeseriesCursorState state_ = TimeseriesCursorState::Unknown;
    TimeseriesInfoType* timeseriesInfo_;
    SegmentType* currentSegmentData_;
    LMDB::Cursor cursor_;
    TransactionType* transaction_;
public:
    ...
};

position_ 当前是基于 1 的,这可能不是一个好主意,因为我必须一直减去 1 才能得到修改缓冲区内,或者由 currentSegmentData_ 指向的段中的当前 timeseries 点。哪个由光标的 state_ 决定。currentSegmentData_ 通常指向 LMDB 管理的内存,并且可能指向映射到数据文件的内存。

TimeseriesCursor 类实现了用于搜索和导航 timeseries 数据的操作。

  • const Point& Current( ) const noexcept:返回对光标当前位置的 timeseries 点的引用。
  • void Flush( ):将修改缓冲区刷新到存储。
  • void Insert( const Point& point ):在 timeseries 中插入或覆盖一个时序点。
  • SearchResult Search(const DateTime& timestamp ):在 timeseries 中搜索 timestamp 小于或等于给定 timestamp 的时序点。
  • bool MoveFirst( ):将光标移动到 timeseries 中的第一个时序点。
  • bool MoveLast( ):将光标移动到 timeseries 中的最后一个时序点。
  • bool MoveNext( ):将光标定位到 timeseries 中的下一个时序点。
  • bool MovePrevious( ):将光标定位到 timeseries 中的前一个时序点。
  • size_t ForEach( Func&& func, Args&& ...args ):提供对 timeseries 中每个 timeseries 点的快速访问。
  • size_t ForEach( const DateTime& start, Func&& func, Args&& ...args ):提供对 timeseries 中时序点的快速访问,从 timestamp 等于 start 的时序点开始,如果不存在具有相等 timestamp 的点,则从具有最大小于 starttimestamp 的时序点开始。
  • size_t ForEach( const DateTime& start, const DateTime& end, Func&& func, Args&& ...args ):与前一个重载类似,但将迭代停止在 timestamp 小于 end 的最后一个时序点。

实现 TimeseriesCursor 的复杂因素之一是,使用 LMDB 进行搜索会将 LMDB 光标定位到匹配搜索键的位置,或者定位到 B+ 树中的下一个条目。缺点是,当使用此机制定位时序点时,搜索几乎总是会将光标定位到下一个键/值对。

段的键有两个部分:

  1. 标识 timeseriesGuid
  2. 段中第一个 timeseries 点的 timestamp

这意味着,如果 timestamp 早于 timeseries 中任何 timeseries 点的 timestamp,简单地移动到前一个条目是不够的,因为在这种情况下,光标将定位到属于另一个 timeseries 的时序点段的条目。在这种情况下,搜索必须移回到 LMDB 定位的条目,并将 SearchResultcompareResult_ 字段设置为 CompareResult::Greater,表示光标定位在 timestamp 大于传递给 Search(…) 的值的 timeseries 的第一个时序点上。

由于 timeseries 点可以插入到 timeseries 的任何位置,因此解决方案必须能够处理它将替换段中第一个 timeseries 点的情况。在这种情况下,它必须查看前一个段的末尾,并确定新 timeseries 点的 timestamp 大于前一个段中最后一个 timeseries 点的 timestamp。如果发生这种情况,光标将移到 LMDB 的搜索功能定位的段,并且 timeseries 点将插入到段的前面。这将导致段的键发生变化,并且在将更改刷新到存储时必须删除带有旧键的条目。

LMDB C++ 类

LMDB 是一个事务性键/值存储的紧凑实现,HCCLMDB.h 包含一组类,封装了 LMDB C API 的相关部分。与本文源代码一起提供的 Visual Studio 2019 解决方案包括 libLMDB 项目,该项目会创建一个 LMDB 的 DLL。

在 LMDB 术语中,环境提供对数据文件和配套锁定文件的访问,而数据库提供对存储在环境数据文件中的键/值存储的访问。将其称为表是很诱人的,并且多个命名数据库可以驻留在同一个环境中。

几乎所有 LMDB 操作都使用事务,LMDB 支持两种事务:一种用于读取,一种用于写入。读取器不阻塞写入者,写入者也不阻塞读取者。同一台机器上运行的多个进程可以打开一个环境,每个环境支持一个并发写入事务。

默认情况下,键按字典顺序排序,但您可以提供自己的比较函数。

命名数据库

要在环境中打开多个数据库,每个数据库都必须命名,并且必须指定要在环境中打开的数据库的最大数量。这必须由创建或打开环境的第一个进程或线程来完成。

这由 LMDB::Environment 构造函数透明处理,它将数据库的最大数量作为第三个参数。

explicit Environment( const char* path,
    size_t newMemoryMapSize = 10485760,
    unsigned int maxDatabases = 0,
    EnvironmentFlags environmentFlags = EnvironmentFlags::WriteMemMap );

LMDB C++ 类通过使用 C++ 类封装 LMDB C API 中的句柄类型,使处理 LMDB 更加容易,每个句柄类型都有一个对应的类。

  • 环境
  • 事务
  • 数据库
  • 光标

LMDB C API 报告的错误会转换为 C++ 异常,并且该库会尝试为许多函数参数提供合理的默认值。

环境

应用程序必须在执行任何 LMDB 操作之前创建一个 LMDB::Environment 对象。

一个环境最多包含一个匿名数据库,或者可以包含多个命名数据库,它们驻留在单个内存映射文件中。

创建新的 LMDB::Environment 对象的最简单方法只需要包含数据库文件(或将包含数据库文件)的目录的路径。

LMDB::Environment environment( DatabaseDir );

事务

所有数据库操作都需要一个 transactiontransaction 可以是读写或只读的。

写入 transaction 可能不会跨线程。通过调用以下函数来创建新的读写 transaction

auto transaction = environment.BeginTransaction( );

而只读 transaction 使用以下语句创建:

transaction = environment.BeginTransaction( LMDB::TransactionFlags::ReadOnly );

通过以下方式将 transaction 中执行的更改提交到数据库:

transaction.Commit( );

while

transaction.Abort( );

用于回滚 transaction 中所做的所有更改。Transaction 对象的析构函数也将回滚未提交到环境的任何更改。

在您可以对数据库进行任何更改之前,必须先打开它。

auto database = transaction.OpenDatabase( );

一旦数据库打开,它就可以被修改。

transaction.Write( database, 1, 1);

或者您可以在 transaction 中打开数据库上的光标。

auto cursor = transaction.OpenCursor( database );

数据库

LMDB::Database 类封装了 LMDB 环境中的数据库句柄。

使用以下方式打开数据库:

class Transaction
{
  ...
public:
  Database OpenDatabase( const char* name, DatabaseFlags databaseFlags = DatabaseFlags::None );
  Database OpenDatabase( DatabaseFlags databaseFlags = DatabaseFlags::None )
  ...
};

数据库句柄将仅对当前事务是 private 的,直到事务提交。成功提交后,该句柄将保留在环境中,可供其他事务使用。

Transaction::OpenDatabase(…) 不能从同一进程中的多个并发事务调用,并且在同一进程中的任何其他事务调用 Transaction::OpenDatabase(…) 之前,该事务必须提交或中止。如果事务被中止,数据库句柄将被自动关闭。

Transaction::OpenDatabase(…) 被调用以打开环境中已打开的数据库时,它将返回现有的数据库句柄。数据库句柄只能通过调用 Environment::CloseDatabase(…) 来关闭一次。

光标

LMDB::Cursor 对象提供了可用于导航、搜索和修改数据库中键/值对的功能。

  • constexpr bool IsPositioned( ) const noexcept:如果光标定位在键/值对上,则返回 true
  • const LMDB::Value& Key( ) const noexcept:返回对当前键的引用。
  • template<typename T> const T& Key( ) const noexcept:用于将键的内容强制转换为指定类型的模板。
  • bool SetKey( const LMDB::Value& key ):将光标移动到指定的键,如果指定的键/值对不存在,则返回 false
  • const LMDB::Value& Value( ) const noexcept:返回对当前值的引用。
  • template<typename T> const T& Value( ) const noexcept:用于将当前值的内容强制转换为指定类型的模板。
  • bool SetValue( const LMDB::Value& value, WriteFlags writeFlags = WriteFlags::None ):更新光标当前位置的值。
  • bool Write( const LMDB::Value& key, const LMDB::Value& value, WriteFlags writeFlags = WriteFlags::None ):将键/值对存储在数据库中。
  • bool Write( const LMDB::Value& key, const LMDB::Value& value, WriteFlags writeFlags = WriteFlags::None ):使用光标将键/值对写入数据库。
  • template<ValueType T1, ValueType T2> bool Write( const T1& key, const T2& value, WriteFlags writeFlags = WriteFlags::None ):此模板创建并初始化键和值的 LMDB::Value 对象,简化了 API。
  • bool Write( const LMDB::Value& value, WriteFlags writeFlags = WriteFlags::None ):更新光标当前位置的值。
  • template<ValueType T> bool Write( const T& value, WriteFlags writeFlags = WriteFlags::None ):通过初始化参数值的 LMDB::Value 对象来简化更新光标当前位置的值。
  • bool Search( const LMDB::Value& key ):搜索数据库以查找键/值对,其键等于或大于指定的搜索键。
  • template<ValueType T> bool Search( const T& key ):通过创建和初始化键的 LMDB::Value 对象来简化 Search(…) API。
  • bool MoveTo( const LMDB::Value& key ):将光标移动到参数键的键/值对,如果键不存在,则返回 false
  • template<ValueType T> bool MoveTo(const T& key ):通过创建和初始化键的 LMDB::Value 对象来简化 MoveTo(…) API。
  • bool MoveFirst( ):将光标移动到数据库中的第一个键/值对。如果数据库为空,则返回 false
  • bool MoveFirstDup( ):定位到当前键的第一个数据项。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MoveNext( ):将光标移动到数据库中的下一个键/值对。如果光标定位在最后一个条目或数据库为空,则返回 false
  • bool MoveNextDup( ):移动到当前键的下一个数据项。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MoveNextNoDup( ):移动到下一个键/值对。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MoveNextMultiple( ):从下一个光标位置返回最多一页重复数据项。移动光标以准备 MoveNextMultiple( )。仅适用于使用 DatabaseFlags::DupFixed 标志打开的数据库。
  • bool MoveLast( ):将光标移动到数据库中的最后一个键/值对。如果数据库为空,则返回 false
  • bool MoveLastDup( ):移动到当前键的最后一个数据项。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MovePrevious( ):将光标移动到数据库中的前一个键/值对。如果光标定位在第一个条目或数据库为空,则返回 false
  • bool MovePreviousDup( ):移动到当前键的前一个数据项。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MovePreviousNoDup( ):移动到前一个键的最后一个数据项。仅适用于使用 DatabaseFlags::DupSort 标志打开的数据库。
  • bool MovePreviousMultiple( ):移动到前一页并返回最多一页重复数据项。
  • void Delete( bool noDupData = false ):删除当前键/值对。如果数据库使用 DatabaseFlags::DupSort 标志打开,则将 noDupData 设置为 true 以删除当前键的所有数据项。

LMDB::Value 对象用于保存键和值。LMDB::Value 源自 LMDB C API 的 MDB_val 结构,其作用是确保结构中的两个字段始终得到正确初始化。

Segment 和 SegmentKey C++ 模板类

SegmentSegmentKey C++ 模板类表示将在数据库中存储的数据。

Segment

Segment 类表示作为值存储的数据,其中 timeseries 点存储在 std::array 中。该数组可能未完全填充,size_ 用于告诉我们数组中有多少个槽实际上已填充了 timeseries 点。

template<PointType TP, size_t maxSegmentSize >
class Segment
{
public:
    constexpr static size_t MaxSize = maxSegmentSize;
    using Point = TP;
    using ArrayType = std::array<Point, MaxSize>;
    using iterator = typename ArrayType::iterator;
    using const_iterator = typename ArrayType::const_iterator;
    ...
private:
    size_t size_;
    ArrayType points_;
public:
    Segment( )
        : size_( 0 )
    {
    }
    ...
};

Segment 实现 front( )back( )size( )empty( )data( )begin( )end()cbegin( )cend()find(…)operator[](…),执行您通常期望的操作。

find(…) 函数使用 std::lower_bound 来定位具有指定 timestamptimeseries 点,或 timeseries 点应插入的段中的位置。

iterator find( const DateTime& timestamp )
{
    return std::lower_bound( begin( ), end( ), timestamp,
        []( const Point& point, const DateTime& timestamp )
    {
        return point.Timestamp( ) < timestamp;
    } );
}

在这里,我使用了允许我们指定谓词(或比较)的重载,这演示了 std::lower_bound 的一个不错的特性,我将 timestamp 传递给函数,而不是创建一个临时的 Point 对象。timestamp 成为二元谓词的第二个参数。这既简单又高效。

Segment 类中最有趣且对性能至关重要的函数是 insert(…)

段内的 timeseries 点根据 timeseries 点的 timestamp 进行排序,并且由于这是一个将用于 timeseries 数据的类,因此该函数会尝试优先处理追加操作。第二个参数接收溢出值(如果有)。只有当段已满,并且要插入的 timeseries 点的 timestamp 与段中现有的 timeseries 点不匹配(这将导致覆盖)时,才会发生这种情况。

该函数还试图使前置操作高效,因为在最后一个段以外的 timeseries 中插入会导致溢出值在段之间传播,因此值得付出努力。

由于对 find(…) 的调用永远不会针对大于或等于最后一个 timeseries 点的 timestamp 的值,因此我们可以在使用迭代器之前跳过测试 it == end()

这个函数并不特别复杂或高级,它只是表明通过添加几个额外的步骤,优化常见情况,我们可以获得显著的性能提升。

void insert( const Point& point, std::optional<Point>& overflow )
{
    if ( size_ )
    {
        Point& last = points_[size_ - 1];
        if ( size_ < MaxSize )
        {
            if ( last.Timestamp( ) < point.Timestamp( ) )
            {
                points_[size_] = point;
                size_++;
            }
            else if ( last.Timestamp( ) == point.Timestamp( ) )
            {
                points_[size_ - 1] = point;
            }
            else if ( point.Timestamp( ) < points_[0].Timestamp( ) )
            {
                std::copy_backward( begin(), end( ), end( ) + 1 );
                points_[0] = point;
                ++size_;
            }
            else if ( point.Timestamp( ) == points_[0].Timestamp( ) )
            {
                points_[0] = point;
            }
            else
            {
                auto it = find( point.Timestamp( ) );
                if ( it->Timestamp( ) > point.Timestamp( ) )
                {
                    std::copy_backward( it, end( ), end( ) + 1 );
                    ++size_;
                }
                *it = point;
            }
        }
        else
        {
            if ( last.Timestamp( ) < point.Timestamp( ) )
            {
                overflow = point;
            }
            else if ( last.Timestamp( ) == point.Timestamp( ) )
            {
                points_[size_ - 1] = point;
            }
            else if ( point.Timestamp( ) < points_[0].Timestamp( ) )
            {
                overflow = last;
                std::copy_backward( begin(), end( ) - 1, end( ) );
                points_[0] = point;
            }
            else if ( point.Timestamp( ) == points_[0].Timestamp( ) )
            {
                points_[0] = point;
            }
            else
            {
                auto it = find( point.Timestamp( ) );
                if ( it->Timestamp( ) > point.Timestamp( ) )
                {
                    overflow = last;
                    std::copy_backward( it, end( ) - 1, end( ) );
                }
                *it = point;
            }
        }
    }
    else
    {
        points_[0] = point;
        size_++;
    }
}

我相信 timeseries 存储引擎的整体性能证明了这些额外的步骤对于在追加和前置操作上付出额外考虑是值得的。

结束(目前)

这标志着关于 timeseries 存储引擎的第一篇文章的结束。我非常欢迎对 API 进行改进的建议,只要它们不会损害引擎的性能。

我的这个系列的计划是在引擎之上创建一个服务器,并在此之上实现一个 .NET 客户端库和示例 Web 应用程序(基于 .NET Core)。

那么,下次再见……祝您编码愉快!

历史

  • 2020年10月5日
    • 首次发布
  • 2020年10月6日
    • 修复 bug + 清理大部分单元测试
  • 2020年10月7日
    • Harlinn.Common.Core 库添加更多单元测试
  • 2021年2月11日
    • Bug 修复
    • C++ ODBC 支持
    • 增加了创建可以使用 memcmp 排序的复杂键的能力,这在使用 LMDB 时很有用。
  • 2021年2月25日
    • 更新 LMDB
    • 更新 xxHash
    • 添加了使用 LMDB 对大型复杂键的超快速基于哈希的索引的初始实现
    • 快速异步日志记录 - 几乎完成 :-)
  • 2021年3月3日
    • 新的授权相关类
      • SecurityId:SID 及相关操作的封装器。
      • ExplicitAccessEXCPLICIT_ACCESS 的封装器。
      • TrusteeTRUSTEE 的封装器。
      • SecurityIdAndDomain:保存 LookupAccountName 的结果。
      • LocalUniqueIdLUID 的封装器。
      • AccessMask:便于检查分配给 ACCESS_MASK 的权限。
        • AccessMaskT<>
          • EventWaitHandleAccessMask:检查和操作 EventWaitHandle 的权限。
          • MutexAccessMask:检查和操作互斥锁的权限。
          • SemaphoreAccessMask:检查和操作信号灯的权限。
          • WaitableTimerAccessMask:检查和操作 WaitableTimer 的权限。
          • FileAccessMask:检查和操作文件相关的权限。
          • DirectoryAccessMask:检查和操作目录相关的权限。
          • PipeAccessMask:检查和操作管道相关的权限。
          • ThreadAccessMask:检查和操作线程相关的权限。
          • ProcessAccessMask:检查和操作进程相关的权限。
      • GenericMappingGENERIC_MAPPING 的封装器。
      • AccessControlEntry:这是一组封装 ACE 结构的微小类。
        • AccessControlEntryBase<,>
          • AccessAllowedAccessControlEntry
          • AccessDeniedAccessControlEntry
          • SystemAuditAccessControlEntry
          • SystemAlarmAccessControlEntry
          • SystemResourceAttributeAccessControlEntry
          • SystemScopedPolicyIdAccessControlEntry
          • SystemMandatoryLabelAccessControlEntry
          • SystemProcessTrustLabelAccessControlEntry
          • SystemAccessFilterAccessControlEntry
          • AccessDeniedCallbackAccessControlEntry
          • SystemAuditCallbackAccessControlEntry
          • SystemAlarmCallbackAccessControlEntry
        • ObjectAccessControlEntryBase<,>
          • AccessAllowedObjectAccessControlEntry
          • AccessDeniedObjectAccessControlEntry
          • SystemAuditObjectAccessControlEntry
          • SystemAlarmObjectAccessControlEntry
          • AccessAllowedCallbackObjectAccessControlEntry
          • AccessDeniedCallbackObjectAccessControlEntry
          • SystemAuditCallbackObjectAccessControlEntry
          • SystemAlarmCallbackObjectAccessControlEntry
      • AccessControlList:ACL 的封装器。
      • PrivilegeSetPRIVILEGE_SET 的封装器。
      • SecurityDescriptorSECURITY_DESCRIPTOR 封装器的早期实现。
      • SecurityAttributesSECURITY_ATTRIBUTES 封装器的非常早期的实现。
      • Token:访问令牌的封装器的早期实现。
      • DomainObject
        • User:关于本地、工作组或域用户的用户信息。
        • Computer:关于本地、工作组或域计算机的信息。
        • Group:本地、工作组或域组。
      • UsersUser 对象的向量。
      • GroupsGroup 对象的向量。
  • 2021年3月14日 - 更多关于安全相关的内容。
    • Token:一个 Windows 访问令牌的封装器,带有一些支持类,例如:
      • TokenAccessMask:Windows 访问令牌访问权限的访问掩码实现。
      • TokenGroups:Windows TOKEN_GROUPS 类型的封装器/二进制兼容替代品,具有 C++ 容器风格的接口。
      • TokenPrivilegesTOKEN_PRIVILEGES 类型的封装器/二进制兼容替代品,具有 C++ 容器风格的接口。
      • TokenStatistics:Windows TOKEN_STATISTICS 类型的二进制兼容替代品,使用库实现的类型,如 LocalUniqueIdTokenTypeImpersonationLevel
      • TokenGroupsAndPrivileges:Windows TOKEN_GROUPS_AND_PRIVILEGES 类型的封装器/二进制兼容替代品。
      • TokenAccessInformation:Windows TOKEN_ACCESS_INFORMATION 类型的封装器/二进制兼容替代品。
      • TokenMandatoryLabel:Windows TOKEN_MANDATORY_LABEL 类型的封装器。
    • SecurityPackage:提供对 Windows 安全包信息的访问。
    • SecurityPackages:系统上已安装的安全包信息的 std::unordered_map
    • CredentialsHandle:Windows CredHandle 类型的封装器。
    • SecurityContext:Windows CtxtHandle 类型的封装器。
    • Crypto::BlobCrypto::BlobT:C++ 风格的 _CRYPTOAPI_BLOB 替代品。
    • CertificateContext:Windows PCCERT_CONTEXT 类型的封装器,提供对 X.509 证书的访问。
    • CertificateChain:Windows PCCERT_CHAIN_CONTEXT 类型的封装器,包含简单证书链的数组和一个信任状态结构,该结构指示所有连接的简单链的总有效性数据。
    • ServerOcspResponseContext:包含编码的 OCSP 响应。
    • ServerOcspResponse:表示与服务器证书链关联的 OCSP 响应的句柄。
    • CertificateChainEngine:代表应用程序的链引擎。
    • CertificateTrustList:Windows PCCTL_CONTEXT 类型的封装器,它同时包含 CTL 的编码和解码表示。它还包含一个打开的 HCRYPTMSG 句柄,用于已解码的、加密签名的消息,该消息包含 CTL_INFO 作为其内部内容。
    • CertificateRevocationList:包含证书吊销列表 (CRL) 的编码和解码表示。
    • CertificateStore:用于存储证书、证书吊销列表 (CRL) 和证书信任列表 (CTL) 的存储。
  • 2021年3月23日
    • 更新至 Visual Studio 16.9.2
    • 构建修复。
    • SecurityDescriptor:实现了安全描述符的序列化,从而可以持久化授权数据。
© . All rights reserved.