使用 C++ 简化可扩展存储引擎 (ESE) API - 第二部分





5.00/5 (1投票)
使用我的 ESE C API 的 C++ 包装器类实现数据库层
相关文章
引言
在我关于我的 Windows ESE C API 的 C++ 包装器库的第二篇文章中,我们将通过一个使用 ESE 作为存储的应用程序的数据库层设计来讲解。
该库旨在促进模块化,但并不试图强制您采用特定的模式。Ese::Table
和 Ese::Database
可以像我在上一篇文章中所述的那样使用,但另一方面,从 Ese::Database
派生数据库,并从 Ese::Table
派生各种表会更方便。
在本例中,我们将使用 ESE 来存储目录的递归结构,其中每个目录可以包含子目录和资产。一个资产可以连接多个传感器,并且对于每个传感器,我们希望存储传感器值的时序数据。
为了实现这一点,我们使用以下一组 enum
、struct
和 class
– 缩进表示继承关系。
Engine
:用于持有Ese::Instance
并管理Session
对象的类。Session
:持有Ese::Session
并提供可在数据库上执行的操作的接口。Database
:派生自Ese::Database
。Named
:简单的struct
,包含两个字段 –Guid Id
和std::string Name
。- Owned:添加一个字段 –
Guid Owner
。Sensor
CatalogItemBase
:添加一个字段 –CatalogItemType Type
。Catalog
资产
- Owned:添加一个字段 –
CatalogItem
:std::variant<Catalog, Asset>
。CatalogItemType
:enum
,包含值Unknown
、Catalog
和Asset
。OwnedTable<T>
:模板类,派生自Ese::Table
。SensorTable
:用于存储Sensor
对象数据的表。CatalogItemTable
:用于存储Catalog
和Asset
对象数据的表。
SensorValue
:struct
,包含四个字段 –Guid Sensor
、DateTime Timestamp
、Int64 Flags
和Double Value
。SensorPoint
:struct
,包含三个字段 –DateTime Timestamp
、Int64 Flags
和Double Value
。SensorValueTable
:用于存储SensorValue
对象数据的表。
此外还有一些额外的要求。
CatalogItem
的Owner
字段必须为空Guid
或标识Catalog
的Guid
,从而建立一个递归结构,其中Catalog
可以包含Asset
对象和其他Catalog
对象。Sensor
的Owner
字段必须标识一个Asset
。Name
在共享相同所有者的对象中应始终唯一。
Engine
引擎的主要目的是持有应用程序的 Ese::Instance
,并管理其会话。
引擎使用 EngineOptions
对象的引用进行初始化。
struct EngineOptions
{
std::string Database;
std::string SystemPath;
std::string LogFilePath;
bool Create = false;
bool Replace = false;
bool Unsafe = false;
bool RunTests = false;
};
ParseOptions(…)
在 HExOptionParser.h 中实现,并使用 boost::program_options
解析命令行参数并初始化 EngineOptions
对象。该函数可以轻松扩展以支持更复杂的配置。Engine
类在构造函数中处理这些选项。
class Engine
{
...
public:
Engine( const EngineOptions& options, const char* instanceName )
: options_( options ), instance_( instanceName )
{
ProcessOptions( );
}
private:
...
void ProcessOptions( )
{
instance_.SetCreatePathIfNotExist( );
instance_.SetExceptionAction( Ese::ExceptionAction::None );
instance_.SetSystemPath( options_.SystemPath );
instance_.SetLogFilePath( options_.LogFilePath );
instance_.SetMaxVersionPageCount( 262144 );
if ( options_.Create )
{
auto session = std::make_unique<Session>( *this, 1, true );
sessions_.emplace( 1, std::move( session ) );
}
}
...
};
我们使用 Engine
类的 CreateSession( )
函数创建会话对象。如果这是 CreateSession( )
的第一次调用,该函数会检查用于创建数据库的会话是否可用。后续调用将始终创建一个新的 session
对象。
Session* CreateSession( )
{
std::lock_guard lockGuard( criticalSection_ );
if ( sessions_.size( ) == 1 && sessionCounter_ == 0 )
{
++sessionCounter_;
auto result = sessions_[1].get();
return result;
}
else
{
auto id = ++sessionCounter_;
auto session = std::make_unique<Session>( *this, id, false );
auto result = session.get( );
sessions_.emplace( id, std::move( session ) );
return result;
}
}
实现 main(…)
函数现在变得微不足道。
int main( int argc, char* argv[] )
{
try
{
EngineOptions options;
if ( ParseOptions( argc, argv, options ) )
{
Engine engine( options, "TestInstance" );
auto session = engine.CreateSession( );
if ( options.RunTests )
{
RunTests( *session );
}
session->Close( );
}
}
catch ( std::exception& exc )
{
std::string message = exc.what( );
printf( "Exception: %s", message.c_str( ) );
}
return 0;
}
Session
Session
类提供了数据库特定代码与应用程序其余部分之间的接口,使应用程序的其余部分免受处理 ESE 数据库的复杂性的影响。
Session
类只有几个成员变量。
class Session
{
Engine& engine_;
Int64 id_;
Ese::Session eseSession_;
Database eseDatabase_;
public:
Session( Engine& engine, Int64 id, bool createDatabase );
...
}
其中 engine_
持有其所属 Engine
的引用,从而可以访问对 Session
实例也相关的 EngineOptions
。id_
是由 Engine
类生成的整数,唯一标识会话,这在设计客户端/服务器解决方案时非常有用,而 eseSession_
持有由 Ese::Instance::BeginSession()
创建的 Ese::Session
对象。
数据库
在此基础上,我们可以深入了解 Database
类的详细信息。
class Database : public Ese::Database
{
...
private:
CatalogItemTable catalogItemTable_;
SensorTable sensorTable_;
SensorValueTable sensorValueTable_;
public:
...
}
Database
类派生自 Ese::Database
,并有三个成员变量,分别对应于本示例将使用的三个表。
Session
的构造函数实现如下。
inline Session::Session( Engine& engine, Int64 id, bool createDatabase )
: engine_( engine ), id_( id )
{
auto& instance = engine_.Instance( );
auto& options = engine_.Options( );
eseSession_ = instance.BeginSession( );
if ( createDatabase )
{
Ese::CreateDatabaseFlags createDatabaseFlags = Ese::CreateDatabaseFlags::None;
if ( options.Replace )
{
createDatabaseFlags |= Ese::CreateDatabaseFlags::OverwriteExisting;
}
if ( options.Unsafe )
{
createDatabaseFlags |= Ese::CreateDatabaseFlags::RecoveryOff |
Ese::CreateDatabaseFlags::ShadowingOff;
}
eseDatabase_ = eseSession_.CreateDatabase<Database>( options.Database,
createDatabaseFlags );
}
else
{
eseSession_.AttachDatabase( options.Database );
eseDatabase_ = eseSession_.OpenDatabase<Database>( options.Database );
}
}
其中,传递给 Session
构造函数的第三个参数指示它创建新数据库或打开现有数据库。
像这样调用 Ese::Session::CreateDatabase(…)
。
eseDatabase_ = eseSession_.CreateDatabase<Database>( options.Database, createDatabaseFlags );
将导致 CreateDatabase<T>(…)
模板函数的实现调用我们对 Database::OnDatabaseCreated( )
的实现。
void OnDatabaseCreated( )
{
BeginTransaction( );
catalogItemTable_ = CreateTable<CatalogItemTable>( CatalogItemTable::TableName );
sensorTable_ = CreateTable<SensorTable>( SensorTable::TableName );
sensorValueTable_ = CreateTable<SensorValueTable>( SensorValueTable::TableName );
CommitTransaction( );
}
其中,每次调用 CreateTable<T>(...)
将导致模板函数在创建 T
的新实例后立即调用 T::OnTableCreated()
。
void OnTableCreated( )
{
Base::OnTableCreated( );
idColumnId_ = AddGuid( IdColumnName );
ownerColumnId_ = AddGuid( OwnerColumnName );
nameColumnId_ = AddText( NameColumnName );
CreateIndex( DerivedT::PrimaryIndexName, Ese::IndexFlags::Primary, "+Id\0", 5 );
CreateIndex( DerivedT::OwnerAndNameIndexName,
Ese::IndexFlags::Unique,
"+Owner\0+Name\0", 14 );
SetCurrentIndex( DerivedT::PrimaryIndexName );
}
允许我们如上所示添加列并创建表的索引。此处,第二次调用 CreateIndex(…)
函数确保 ESE 将处理以下要求:“Name 在共享相同所有者的对象中应始终唯一。”。
同样,
eseDatabase_ = eseSession_.OpenDatabase<Database>( options.Database );
将导致 OpenDatabase<T>(...)
模板函数的实现创建 T
的新实例后立即调用 T::OnDatabaseOpened()
函数。
void OnDatabaseOpened( )
{
catalogItemTable_ = OpenTable<CatalogItemTable>( CatalogItemTable::TableName,
Ese::OpenTableFlags::Updatable );
sensorTable_ = OpenTable<SensorTable>( SensorTable::TableName,
Ese::OpenTableFlags::Updatable );
sensorValueTable_ = OpenTable<SensorValueTable>( SensorValueTable::TableName,
Ese::OpenTableFlags::Updatable );
}
其中,每次调用 OpenTable<T>(…)
将导致模板函数在创建 T
的新实例后立即调用 T::OnTableOpened()
。
void OnTableOpened( )
{
Base::OnTableOpened( );
idColumnId_ = GetColumnId( IdColumnName );
ownerColumnId_ = GetColumnId( OwnerColumnName );
nameColumnId_ = GetColumnId( NameColumnName );
SetCurrentIndex( DerivedT::PrimaryIndexName );
}
允许我们初始化列 ID 并设置当前索引。
这种模式使得将我们自己的表和数据库类型与库集成变得容易。
会话和表设计
在处理 ESE 和大多数其他 NoSQL 数据库引擎时,我们必须在代码中处理许多“业务规则”。正如我们上面所做的,我们可以使用索引来处理“Name 在共享相同所有者的对象中应始终唯一。”,而“CatalogItem 的 Owner 字段必须为空 Guid 或标识 Catalog 的 Guid”必须在代码中处理。
Catalog CreateOrRetrieveCatalog( const Guid& owningCatalogId, const std::string& name )
{
if ( owningCatalogId.empty( ) == false && CatalogExists( owningCatalogId ) == false )
{
throw std::exception( "Invalid catalog id" );
}
auto& catalogItems = CatalogItems( );
return catalogItems.CreateOrRetrieveCatalog( owningCatalogId, name );
}
这并不特别难,但它增加了通过一个定义明确的接口进行工作的 PL,该接口负责确保这些规则得到正确处理。
该应用程序使用一组简单的 struct
来表示目录、资产和传感器的数据类型。
考虑到这种继承层次结构,探索我们的代码重用选项是值得的。
在此示例中,我们最终决定有一个表用于目录和资产对象,还有一个表用于传感器对象。现在,如果这两个表可以重用基类提供的通用实现,那将是非常好的,从而为我们提供了表的继承图。
OwnedTable
模板类 OwnedTable
实现支持 Id
、Owner
和 Name
列的功能,并且上面显示的 OnTableCreated( )
和 OnTableOpened( )
的实现属于此模板的成员。
OwnedTable
实现了一组函数,允许我们读取和写入列值。
Guid Id( ) const
{
Guid result;
Read( idColumnId_, result );
return result;
}
void SetId( const Guid& id ) const
{
SetColumn( idColumnId_, id );
}
其中,Owner
和 Name
列的实现是类似的。
此 MoveTo(…)
函数的实现允许我们按 Owner
和 Name
查找行。
bool MoveTo( const Guid& ownerId, const char* name ) const
{
SetCurrentIndex( DerivedT::OwnerAndNameIndexName );
MakeKey( ownerId, Ese::KeyFlags::NewKey );
MakeKey( name );
auto rc = Seek( Ese::SeekFlags::Equal );
return rc >= Ese::Result::Success;
}
虽然索引确保 Owner
和 Name
列的任何组合在数据库中唯一标识表中的一行,但我们仍然需要设置 DerivedT::OwnerAndNameIndexName
作为当前索引,因为这会影响 MakeKey(...)
如何将搜索键映射到表的列。
通过将 Ese::SeekFlags::Equal
传递给 Seek(…)
,我们告诉 ESE 我们希望精确匹配搜索条件。
该模板使用“好奇重复模板模式”从派生类检索索引名称,从而允许派生类指定合适的名称。
为了支持按表的主键进行搜索,我们实现
bool MoveTo( const Guid& id ) const
{
MakeKey( id, Ese::KeyFlags::NewKey );
auto rc = Seek( Ese::SeekFlags::Equal );
return rc >= Ese::Result::Success;
}
这与第一个 MoveTo(…)
重载类似,但在这里我们期望在构建搜索键和调用 Seek(…)
时当前索引为主索引。
我们也希望能够选择所有具有相同 Owner
的行。
bool FilterByOwner( const Guid& ownerId ) const
{
SetCurrentIndex( DerivedT::OwnerAndNameIndexName );
MakeKey( ownerId, Ese::KeyFlags::NewKey );
MakeKey( "" );
auto rc = Seek( Ese::SeekFlags::GreaterOrEqual );
if ( rc >= Ese::Result::Success && Owner() == ownerId )
{
MakeKey( ownerId, Ese::KeyFlags::NewKey | Ese::KeyFlags::FullColumnEndLimit );
SetIndexRange( Ese::IndexRengeFlags::Inclusive | Ese::IndexRengeFlags::UpperLimit );
return true;
}
return false;
}
对 MakeKey(…)
的最后一次调用设置了一个通配符过滤器。Ese::KeyFlags::FullColumnEndLimit
指定搜索键的创建方式,使得当前键列之后的任何键列都将作为通配符处理。这意味着搜索键可以匹配具有以下内容的索引条目:
- 为此键列和索引的所有先前键列提供的确切值。
- 后续索引列的任何值。
此选项应用于创建通配符搜索键,用于查找最接近索引末尾的索引条目。索引末尾是移动到该索引的最后一行时找到的索引条目。
这是 SensorTable
的实现用来实现
void GetSensors(const Guid& assetId, std::vector<Sensor>& sensors ) const
{
sensors.clear( );
if ( FilterByOwner( assetId ) )
{
do
{
Read( sensors.emplace_back( ) );
} while ( MoveNext( ) );
}
}
GetSensor(…)
函数检索附加到资产的所有传感器,现在实现起来非常简单。
删除目录和资产
在使用 ESE 等数据库引擎时,有一件事往往会变得复杂,那就是删除数据。当我们删除一个目录时,我们还必须确保所有其他内容都得到清理。
- 必须从
SensorValueTable
中删除值。 - 必须从
SensorTable
中删除传感器。 - 直接或间接属于该目录的资产必须被删除。
- 必须删除子目录。
虽然实现起来并不特别困难,但诀窍在于记住这不会通过级联删除自动发生,而必须在代码中处理。
bool DeleteCatalogItem( const Guid& itemId ) const
{
auto& catalogItems = CatalogItems( );
auto& sensors = Sensors( );
auto& values = Values( );
if ( catalogItems.MoveTo( itemId ) )
{
auto itemType = catalogItems.ItemType( );
switch ( itemType )
{
case CatalogItemType::Catalog:
{
while ( catalogItems.MoveToFirstChild( itemId ) )
{
auto childId = catalogItems.Id( );
DeleteCatalogItem( childId );
}
catalogItems.SetCurrentIndex( CatalogItemTable::PrimaryIndexName );
if ( catalogItems.MoveTo( itemId ) )
{
catalogItems.Delete( );
}
return true;
}
break;
case CatalogItemType::Asset:
{
if ( sensors.FilterByOwner( itemId ) )
{
do
{
auto sensorId = sensors.Id( );
values.Delete( sensorId );
sensors.Delete( );
} while ( sensors.MoveNext() );
}
catalogItems.SetCurrentIndex( CatalogItemTable::PrimaryIndexName );
catalogItems.Delete( );
return true;
}
break;
}
}
return false;
}
切换索引会带来性能上的代价,因此应注意限制执行此操作的次数。数据层尝试在完成依赖于辅助索引的操作后,将索引切换回主索引。这样,依赖于主索引的操作就不必设置索引了。
查找传感器值
SensorValueTable
有四列,与 SensorValue struct
的四个字段匹配。
struct SensorValue
{
Guid Sensor;
DateTime Timestamp;
Int64 Flags = 0;
Double Value = 0.0;
};
当搜索特定时间点的当前值时,我们希望找到与该时间点完全匹配的行,或者我们希望找到时间戳小于我们请求行的最大时间戳的行。这比 SQL 数据库更容易用 ESE 实现。
bool MoveTo( const Guid& sensorId,
const DateTime& timestamp,
bool exactMatchRequired = true ) const
{
MakeKey( sensorId, Ese::KeyFlags::NewKey );
MakeKey( timestamp );
auto rc = Seek( exactMatchRequired? Ese::SeekFlags::Equal : Ese::SeekFlags::LessOrEqual );
return rc >= Ese::Result::Success;
}
当传递 false
作为第三个参数时,我们会得到上述功能,而使用默认值允许我们实现
bool Write(const Guid& sensorId, const SensorPoint& point ) const
{
if ( MoveTo( sensorId, point.Timestamp ) )
{
ReplaceValue( point );
return false;
}
else
{
InsertValue( sensorId, point );
return true;
}
}
这样可以确保现有 timestamp
的精确匹配将导致 replace
,否则该函数将向表中插入新行。
还有一个 Filter
函数,允许我们查找间隔内的值。
bool Filter( const Guid& sensorId,
const DateTime& startTimestamp,
const DateTime& endTimestamp ) const
{
MakeKey( sensorId, Ese::KeyFlags::NewKey );
MakeKey( startTimestamp );
auto rc = Seek( Ese::SeekFlags::LessOrEqual );
if ( rc >= Ese::Result::Success )
{
MakeKey( sensorId, Ese::KeyFlags::NewKey );
MakeKey( endTimestamp );
SetIndexRange( Ese::IndexRengeFlags::UpperLimit );
return true;
}
else
{
return false;
}
}
有了这个,就可以轻松实现检索时序数据所需的相应函数了。
void GetSensorPoints( const Guid& sensorId, std::vector<SensorPoint>& sensorPoints ) const
{
sensorPoints.clear( );
if ( Filter( sensorId ) )
{
do
{
Read( sensorPoints.emplace_back() );
} while ( MoveNext( ) );
}
}
void GetSensorPoints( const Guid& sensorId,
const DateTime& startTimestamp,
const DateTime& endTimestamp,
std::vector<SensorPoint>& sensorPoints ) const
{
sensorPoints.clear( );
if ( Filter( sensorId, startTimestamp, endTimestamp ) )
{
do
{
Read( sensorPoints.emplace_back( ) );
} while ( MoveNext( ) );
}
}
void GetSensorPoints( const Guid& sensorId,
const DateTime& startTimestamp,
std::vector<SensorPoint>& sensorPoints ) const
{
GetSensorPoints( sensorId, startTimestamp, DateTime::MaxValue( ), sensorPoints );
}
实现插入或更新一组值的函数也同样简单。
void Write( const Guid& sensorId, const std::vector<SensorPoint>& sensorPoints ) const
{
for ( const auto& sensorPoint : sensorPoints )
{
Write( sensorId, sensorPoint );
}
}
使用数据层
使用 Session
类执行操作非常简单,下面的 BuildStructure(…)
函数创建了目录、资产和传感器的简单层次结构。
size_t BuildStructure( Session& session,
size_t numberOfCatalogs,
size_t numberOfAssetsPerCatalog,
size_t numberOfSensorsPerAsset )
{
Guid empty;
size_t result = 0;
auto transaction = session.StartTransaction( );
for ( size_t i = 0; i < numberOfCatalogs; i++ )
{
auto catalogName = GetCatalogName( i + 1 );
auto catalog = session.CreateOrRetrieveCatalog( empty, catalogName );
result++;
for ( size_t j = 0; j < numberOfAssetsPerCatalog; j++ )
{
auto assetName = GetAssetName( j + 1 );
auto asset = session.CreateOrRetrieveAsset( catalog.Id, assetName );
result++;
for ( size_t k = 0; k < numberOfSensorsPerAsset; k++ )
{
auto sensorName = GetSensorName( k + 1 );
Sensor sensor = session.CreateOrRetrieveSensor( asset.Id, sensorName );
result++;
}
}
}
transaction.Commit( );
return result;
}
除非我们在事务对象上调用 Commit()
,否则它将回滚自调用 StartTransaction()
以来对数据库所做的任何更改。这样,任何异常都会导致整个事务被回滚。
检索 BuildStructure(...)
创建的目录、资产和传感器数据。
size_t ReadStructure( Session& session, std::vector<CatalogData>& result )
{
size_t count = 0;
Guid root;
std::vector<CatalogItem> catalogItems;
session.GetCatalogItems( root, catalogItems );
count += catalogItems.size( );
for ( auto& catalogItem : catalogItems )
{
auto& catalog = std::get<Catalog>( catalogItem );
CatalogData& catalogData = result.emplace_back();
catalogData.Assign(catalog);
std::vector<CatalogItem> assets;
session.GetCatalogItems( catalogData.Id, assets );
count += assets.size( );
for ( auto& assetItem : assets )
{
auto& asset = std::get<Asset>( assetItem );
AssetData& assetData = catalogData.Assets.emplace_back();
assetData.Assign( asset );
session.GetSensors( assetData.Id, assetData.Sensors );
count += assetData.Sensors.size( );
}
}
return count;
}
检索时间间隔内的传感器值。
size_t ReadSensorDataForInterval( Session& session, std::vector<Sensor>& sensors )
{
DateTime start( 2020, 1, 10 );
DateTime end( 2020, 1, 25 );
size_t result = 0;
std::vector<SensorPoint> points;
points.reserve( 25000 );
for ( auto& sensor : sensors )
{
session.GetSensorPoints( sensor.Id, start, end, points );
result += points.size( );
}
return result;
}
测试数据层
为了总结本文,这里简要介绍 RunTests(…)
的实现如何使用数据库层。对于调试版本,RunTests(…)
生成的数据集与为发布版本创建的数据集相比有限,并且发布版本不测试删除功能,因为它需要太长时间才能完成大型数据集。
RunTests(…)
的发布版本调用以下函数:
BuildStructure
:创建 10 个目录、每个目录 10 个资产和每个资产 10 个传感器。ReadStructure
:将整个结构读入内存,方法是检索所有目录,然后检索每个目录的资产,最后检索每个资产的传感器。ReadSensors
:将所有传感器读入内存。GenerateSensorData
:遍历所有传感器,为发布版本写入一个月的日数据,分辨率为一分钟;为调试版本写入一天的数据到数据库。GetCatalogItemCount
:遍历CatalogItemTable
中的所有行并返回计数。GetSensorCount
:遍历SensorTable
中的所有行并返回计数。GetSensorValueCount
:遍历SensorValueTable
中的所有行并返回计数。ReadAllSensorData
:遍历所有传感器并将一个传感器的所有值读入内存。ReadSensorDataForInterval
:遍历所有传感器,并为某个传感器读取从当月 10 日到 25 日的时间间隔内的值到内存。ReadSensorDataForIntervalWithCheck
:与上一个相同,并验证数据是否在请求的间隔内且有序。
对于调试版本,它还调用:
DeleteCatalog
:删除一个目录,该目录还应删除属于该目录的资产、属于每个资产的传感器以及所有传感器的所有传感器值。- 然后,为了让我们看到
DeleteCatalog
执行了预期的操作,RunTests(…)
调用:GetCatalogItemCount
GetSensorCount
GetSensorValueCount
这演示了如何使用该库为“真实”应用程序实现数据库层,创建、搜索、检索、更新和删除数据库中的行。
历史
- 2020 年 9 月 2 日 – 首次发布。
- 2020 年 10 月 6 日 – 修复错误,清理了大部分单元测试。
- 2020 年 10 月 7 日 –
Harlinn.Common.Core
库的更多单元测试。