Visual C++ 和 Windows API 的同步





5.00/5 (37投票s)
在处理线程、线程池和 I/O 完成端口时,选择正确的同步机制来创建高性能的 C++ 异步服务器
引言
本文介绍了使用 Visual C++ 编译器和 Windows API 在 Windows 平台上可用的同步机制。为了简化某些同步机制的使用,我创建了一组类,主要封装了 Windows API 提供的并发机制。
WaitableHandle
:可等待的 Windows 内核对象的基类EventWaitHandle
:用于处理 Windows 内核事件对象的类Mutex
:用于处理 Windows 内核互斥体对象的类Semaphore
:用于处理 Windows 内核信号量对象的类WaitableTimer
:用于处理 Windows 内核可等待定时器对象的类Thread
:线程类,精神上类似于std::thread
,但派生自WaitableHandle
,因为线程也是可等待的 Windows 内核对象Process
:用于执行其他进程的类。与线程一样,进程也是可等待的 Windows 内核对象
ThreadGroup
:允许您从组级别处理线程的类TimerQueue
:提供 Windows 定时器队列接口的类TimerQueueTimer
:表示 Windows 定时器队列定时器的类CriticalSection
:用于处理 Windows 临界区对象的类,满足 C++ 11BasicLockable
和Lockable
要求SynchronizationBarrier
:用于处理 Windows 同步屏障的类SlimReaderWriterLock
:用于处理 Windows 精简读/写对象的类,满足 C++ 11BasicLockable
、Lockable
和SharedMutex
要求ConditionVariable
:用于处理 Windows 条件变量的类InterlockedLinkedList
:用于处理 Windows 原子操作链表的类ConcurrentQueue
:高效的有界多生产者/多消费者队列Spinlock
:基于std::atomic
的自旋锁IO::Context
:由线程池服务的 Windows IO 完成端口
这些类是一个更大努力的一部分,旨在让您使用 C++ 轻松创建 Windows 上的原生服务器软件,以及 C# 和 C++ 的客户端。单元测试使用 boost test 实现,到目前为止,我已经实现了超过 630 个测试用例。目前还有 23 个示例程序。
该解决方案包含两个主要库:Harlinn.Common.Core
和 Harlinn.Windows
,其中 Harlinn.Common.Core
包含对服务器和客户端开发都有用的类,而 Harlinn.Windows
库包含对 UI 开发有用的类。为了便于与为 .NET 平台开发的软件集成,Harlinn.Common.Core
库提供了与其 .NET 对等项兼容的类,例如:
Guid
日期时间
TimeSpan
template<IO::StreamReader StreamT> BinaryReader
template<IO::StreamWriter StreamT> BinaryWriter
使用 C++ 实现的 BinaryReader
和 BinaryWriter
序列化的数据,可以使用 Harlinn.Common.Net
库 IO 命名空间中的 .NET 实现的 BinaryReader
和 BinaryWriter
进行序列化和反序列化。如果您需要处理 XML,那么 HCCXml.h 中的类可以轻松完成此操作。Currency
类实现了一个与 COM CY 类型 兼容的定点类型,而 Variant
类使处理 COM VARIANT 类型 变得容易。我已经实现了对两个 NoSQL 数据库引擎的支持:Microsoft 的 Extensible Storage Engine 和 Lightning Memory-Mapped Database。
Harlinn.Windows
库将实现 UI 开发的功能。它支持 GDI 和 DirectX2D 进行渲染,并且事件处理基于 boost.signals2。
构建说明
在 $(SolutionDir)Readme 文件夹中的 Build.md 文件中提供了构建代码的说明。
相关文章
本文属于一系列关于 Windows 平台服务器开发的文章。
- 使用 C++ 简化 Extensible Storage Engine (ESE) API - 第一部分,介绍 Extensible storage Engine
- 使用 C++ 简化 Extensible Storage Engine (ESE) API - 第二部分,使用 Extensible storage Engine 实现数据层
- 你好 C#! C++ 为您服务,在 C# 中实现 .NET Core 的客户端库,以及在 C++ 中编写的服务器
这是一个持续进行的项目,代码提供了演示使用 C++ 编写高性能服务器并不困难的类。
背景
在深入细节之前,我想提供一个场景,其中选择正确的同步机制将产生巨大影响。
我在 Windows 上开发服务器软件超过 20 年,并对该平台提供的功能有了真正的认识。我的经验是,I/O 完成端口机制是 Windows API 中一项非常有用的功能,可用于实现高效可扩展的解决方案。
I/O 完成端口可以被认为是高效的多生产者、多消费者队列,它与 Windows I/O 子系统紧密集成。它们可用作线程池,执行异步函数调用
IO::Context context( 8 );
context.Start( );
context.Async( []( )
{
printf( "Hello from thread pool\n" );
} );
context.Stop( );
顾名思义,它们可用于 I/O。
发送和接收 5GB/s
I/O 完成端口是我们创建高性能、可扩展服务器的关键技术。要充分认识到这一点,我们应该记住,我正在笔记本电脑上进行测试,该笔记本电脑的内存写入速度约为 27 GB/s。
IO::Context
是一个使创建多线程服务器相对容易的类。它负责 I/O 完成端口并管理用于处理服务器端活动的线程池。
为了演示,我们首先需要一个简单的客户端
void Client( ThreadData& threadData, size_t count )
{
Socket clientSocket( AddressFamily::InterNetworkV6,
SocketType::Stream,
ProtocolType::Tcp );
Address address( 42000 );
clientSocket.Connect( address );
clientSocket.Write( &count, sizeof( count ) );
Int32 index = static_cast<Int32>( threadData.index );
clientSocket.Write( &index, sizeof( index ) );
std::vector<Record> records;
records.resize( 2000, Record{Guid(),
DateTime( static_cast<Int64>( index + 1 ) ),index + 1, 1.0 } );
size_t sent = 0;
while ( sent < count )
{
clientSocket.Write( records.data( ), records.size( ) * sizeof( Record ) );
sent += records.size( );
}
}
客户端首先发送它打算发送的记录数,然后是 threadData
结构的索引,然后发送请求的记录数。
然后是服务器端接收数据的内容
class Protocol
{
std::vector<ThreadData>& threadData_;
public:
Protocol( std::vector<ThreadData>& threadData )
: threadData_(threadData )
{ }
template<IO::StreamReader StreamReaderT, IO::StreamWriter StreamWriterT>
bool Process( IO::BinaryReader<StreamReaderT>& requestReader,
IO::BinaryWriter<StreamWriterT>& replyWriter )
{
std::vector<Record> records;
records.resize( 200000 );
size_t count = requestReader.ReadUInt64( );
Int32 index = requestReader.ReadInt32( );
size_t received = 0;
while ( received < count )
{
requestReader.Read( records.data( ), records.size( ) * sizeof( Record ) );
received += records.size( );
}
auto& entry = threadData_[index];
entry.stopwatch.Stop( );
entry.serverDoneEvent.Signal( );
return false;
}
};
在这里,我们首先从客户端接收记录数,然后接收 threadData_
向量的索引,最后接收记录。完成后,我们向等待服务器从所有客户端接收数据的函数发出信号。
要创建基于 TCP/IP 的服务器,我们只需要创建一个 Server::TcpSimpleListener<>
模板实例,该实例将针对我们的 Protocol
类实现进行实例化,该类将附加到上下文
constexpr size_t ClientCount = 100;
constexpr size_t ThreadPoolSize = 12;
size_t PerClientRecordCount = 100'000'000;
std::vector<ThreadData> threadData;
for ( size_t i = 0; i < ClientCount; i++ )
{
threadData.emplace_back( ).index = static_cast<Int32>( i );
}
IO::Context context( ThreadPoolSize );
Address address( 42000 );
Server::TcpSimpleListener<Protocol>
listener( context, address, ClientCount, threadData );
Server::TcpSimpleListener<Protocol>
构造函数的最后一个参数以及任何其他参数都将传递给 Protocol
构造函数。将为 Server::TcpSimpleListener<>
管理的每个连接处理程序创建一个 Protocol
类实例,每个实例对应一个套接字连接,这些连接可以根据 ClientCount
指定的并发服务。
此时,我们已经实现了服务器,并通过启动上下文来运行它
context.Start( );
服务器启动并运行时,就可以启动客户端了
stopwatch.Start( );
for ( size_t i = 0; i < ClientCount; i++ )
{
Thread thread( [&threadData, i, PerClientRecordCount]( )
{
Client( threadData[i], PerClientRecordCount );
} );
}
现在,我们只需等待服务器从所有客户端接收数据
for ( size_t i = 0; i < ClientCount; i++ )
{
auto& threadDataEntry = threadData[i];
threadDataEntry.serverDoneEvent.Wait( );
}
stopwatch.Stop( );
程序最后告诉我们所有客户端和服务器之间传输了多少数据
Sent 10000000000 records in 71.646624 seconds using 100 concurrent client(s),
5.199534 GB per second.
服务器和客户端都由同一个可执行文件实现,可以通过构建和执行进行测试
HExContextPerf.exe
使用 I/O 完成端口进行同步
将程序 structuring 成这样一种方式,以尽量减少基于可能导致当前线程阻塞的机制的同步需求,是一个好主意。使用 I/O 完成端口的基于 TCP/IP 的服务器可以设计为确保线程池线程之间不会为处理每个连接的完成通知的对象发生争用。
ExampleSocketServer01
演示了这一点,而客户端 ExampleSocketClient01
会向服务器发送预期的数据。
服务器主要由两个类实现
Listener
:管理监听传入连接请求的套接字。ConnectionHandler
:管理每个并发连接使用的套接字、连接的当前状态以及从客户端接收数据。
这是一个非常简单的服务器,客户端只发送两部分信息
- 包含要发送的传感器值数量的头
- 一系列传感器值
所有 I/O 操作都是异步执行的;服务器使用一对低级 C++ 模板来实现使用 IO::Context
和 I/O 完成端口的基于 TCP/IP 的服务器,这些模板除了将完成结果路由到处理程序对象的实现之外,功能非常有限。
该设计通过确保服务器一次只为每个 ConnectionHandler
实例执行一个异步请求,从而为 ConnectionHandler
对象提供了线程安全。
main 函数很简单
int main()
{
try
{
WSA wsa;
constexpr size_t ThreadPoolSize = 12;
IO::Context context( ThreadPoolSize );
Address address( 42000 );
Listener listener( context, address, 2 );
context.Start( );
puts( "Press enter to exit" );
while ( getc( stdin ) != '\n' );
context.Stop( );
}
catch ( std::exception& exc )
{
auto* message = exc.what( );
printf( "Exception: %s\n", message );
}
}
Listener
类的实现
class Listener : public Server::TcpListenerBase<Listener>
{
template<typename DerivedT>
friend class SocketHandler;
public:
using Base = Server::TcpListenerBase<Listener>;
private:
CriticalSection criticalSection_;
Address address_;
boost::container::flat_map<SOCKET,
std::unique_ptr<ConnectionHandler>> connectionHandlers_;
public:
Listener( IO::Context& context, const Address& address,
size_t numberOfConnections )
: Base( context ), address_( address )
{
for ( size_t i = 0; i < numberOfConnections; ++i )
{
auto handler = std::make_unique<ConnectionHandler>( context, this );
connectionHandlers_.emplace( handler->Client( ).Handle( ),
std::move( handler ) );
}
}
Listener
为每个它能够并发处理的连接创建一个 ConnectionHandler
处理程序类的实例。
void Start( ) override
{
printf( "Starting listener\n" );
Server( ).Bind( address_ );
Server( ).Listen( );
for ( auto& entry : connectionHandlers_ )
{
Accept( entry.second.get( ) );
}
}
调用 IO::Context
类的 Start()
函数时,它会调用已连接处理程序的 Start()
函数。Server()
函数返回一个指向用于监听传入连接请求的 Socket
对象的引用,而 Accept(...)
函数启动一个异步 Accept
操作
void Accept( ConnectionHandler* connectionHandler )
{
auto* request = connectionHandler->GetAcceptRequest( Server( ) );
BeginAsyncAccept( request );
}
当异步 Accept
操作完成时,框架会将传递给 BeginAsyncAccept(...)
函数的 AcceptRequest
指针传递给 HandleAcceptCompleted(...)
函数
bool HandleAcceptCompleted( AcceptRequest* request )
{
if ( request->IoResult( ) != ERROR_OPERATION_ABORTED &&
request->IoResult( ) != STATUS_CANCELLED &&
request->IoResult( ) != STATUS_LOCAL_DISCONNECT &&
request->IoResult( ) != STATUS_REMOTE_DISCONNECT )
{
auto acceptSocket = request->AcceptSocket( );
auto it = connectionHandlers_.find( acceptSocket );
if ( it != connectionHandlers_.end( ) )
{
it->second->HandleAcceptCompleted( request );
}
}
return true;
}
IoResult()
返回请求的状态,如果请求未被取消,我们只需将调用转发给 ConnectionHandler
类实现的 HandleAcceptCompleted
。ConnectionHandler
类执行的所有 I/O 都是异步的,ConnectionHandlerState
枚举中的值有助于跟踪处理程序的当前状态。
enum class ConnectionHandlerState
{
Unknown,
Accepting,
ReceivingHeader,
ReceivingValues,
Disconnecting
};
Listener
跟踪每个 ConnectionHandler
对象,而 ConnectionHandler
跟踪它所需的所有资源。
class ConnectionHandler
: public Server::TcpConnectionHandlerBase<ConnectionHandler>
{
template<typename DerivedT>
friend class SocketHandler;
public:
using Base = Server::TcpConnectionHandlerBase<ConnectionHandler>;
private:
Listener* listener_;
ConnectionHandlerState state_;
size_t received_;
Stopwatch stopwatch_;
AcceptRequestBuffer header_;
WSABUF wsabuf_;
std::vector<SensorValue> records_;
std::unique_ptr<AcceptRequest> acceptRequest_;
std::unique_ptr<ReceiveRequest> receiveRequest_;
std::unique_ptr<DisconnectRequest> disconnectRequest_;
public:
ConnectionHandler( IO::Context& context, Listener* listener )
: Base( context ),
listener_( listener ),
state_( ConnectionHandlerState::Accepting ),
received_(0),
wsabuf_{}
{
}
好处是框架添加的开销微不足道,但您也负责几乎所有正在发生的事情。处理套接字有趣的部分在于,您永远不知道在调用之后会收到多少数据。
bool HandleAcceptCompleted( AcceptRequest* request )
{
if ( request->IoResult( ) == NO_ERROR )
{
stopwatch_.Start( );
received_ = 0;
auto nextRequest = GetReceiveRequest( );
if ( request->NumberOfBytesTransferred( ) < request->NumberOfBytesToRead( ) )
{
state_ = ConnectionHandlerState::ReceivingHeader;
wsabuf_.buf = ((char*)request->Buffer( )) + request->NumberOfBytesTransferred( );
wsabuf_.len = request->NumberOfBytesToRead( )
- static_cast<UInt32>( request->NumberOfBytesTransferred( ) );
}
else
{
state_ = ConnectionHandlerState::ReceivingValues;
wsabuf_.buf = (char*)records_.data( );
wsabuf_.len = static_cast<int>( records_.size( ) * sizeof( SensorValue ) );
}
BeginAsyncReceive( nextRequest );
}
else
{
Disconnect( );
}
return true;
}
在这种情况下,我们通过将 header_
对象的地址传递给 AcceptRequest
的构造函数,为头提供了足够的空间。
AcceptRequest* GetAcceptRequest( const Socket& server )
{
if ( !acceptRequest_ )
{
acceptRequest_ = std::make_unique<AcceptRequest>( server,
Client(), &header_, static_cast<UInt32>(sizeof(Header)) );
}
else
{
acceptRequest_->Clear( );
}
return acceptRequest_.get( );
}
header_
实际上是一个 AcceptRequestBuffer
对象,它派生自 Header
并增加了本地和远程地址信息所需的空间。
struct AcceptRequestBuffer : public Header
{
using Base = Header;
constexpr static size_t AddressInfoSize = AcceptRequest::CalculateBufferSizeFor( 0 );
Byte AddressInfo[AddressInfoSize];
AcceptRequestBuffer()
: Base(), AddressInfo{}
{}
};
如果传递给 HandleAcceptCompleted(...)
的请求接收的字节数少于 Header
所需的字节数,它会将 state_
设置为 ConnectionHandlerState::ReceivingHeader
并启动一个异步读取操作来读取头剩余的字节,或者将 state_
设置为 ConnectionHandlerState::ReceivingValues
来开始从客户端接收值。
读取操作完成后,框架会调用 HandleReceiveCompleted(...)
。
bool HandleReceiveCompleted( ReceiveRequest* request )
{
if ( request->IoResult( ) == NO_ERROR )
{
if ( request->NumberOfBytesTransferred( ) < static_cast<size_t>(wsabuf_.len) )
{
wsabuf_.buf = wsabuf_.buf + request->NumberOfBytesTransferred( );
wsabuf_.len = wsabuf_.len -
static_cast<UInt32>( request->NumberOfBytesTransferred( ) );
request->Clear( );
BeginAsyncReceive( request );
}
else
{
switch ( state_ )
{
case ConnectionHandlerState::ReceivingHeader:
{
state_ = ConnectionHandlerState::ReceivingValues;
wsabuf_.buf = (CHAR*)records_.data( );
wsabuf_.len = static_cast<int>( records_.size( ) * sizeof( SensorValue ) );
request->Clear( );
BeginAsyncReceive( request );
}
break;
case ConnectionHandlerState::ReceivingValues:
{
if ( received_ + BatchSize < header_.RecordCount )
{
received_ += BatchSize;
wsabuf_.buf = (CHAR*)records_.data( );
wsabuf_.len = static_cast<int>( records_.size( ) *
sizeof( SensorValue ) );
request->Clear( );
BeginAsyncReceive( request );
}
else
{
received_ += BatchSize;
stopwatch_.Stop( );
auto duration = stopwatch_.Elapsed( ).TotalSeconds( );
auto pointsPerSecond = ( received_ ) / duration;
auto gbPerSecond = ( pointsPerSecond * sizeof( SensorValue ) ) /
( 1024ll * 1024 * 1024 );
wprintf( L"Received %llu records in %f seconds, "
"%f records and %f GB per second.\n",
received_, duration, pointsPerSecond, gbPerSecond );
Disconnect( );
}
}
break;
}
}
}
else
{
Disconnect( );
}
return true;
}
只要不发生错误,它就会确保已从客户端接收到预期数量的字节。当头完全接收后,它开始以每次 20,000 个批次接收值,当所有值都从客户端接收完毕后,它开始异步断开连接。
void Disconnect( )
{
state_ = ConnectionHandlerState::Disconnecting;
auto nextRequest = GetDisconnectRequest( );
BeginAsyncDisconnect( nextRequest );
}
异步断开连接完成后,框架会调用
bool HandleDisconnectCompleted( DisconnectRequest* request )
{
if ( request->IoResult( ) == NO_ERROR )
{
// Accept( );
Destroy( );
}
return true;
}
理想情况下,框架现在会重用连接,而 Accept()
的调用就是这样做的;但令人惊讶的是,这实际上会显着降低服务器的性能。Destroy()
调用 Listener
实现,导致它关闭套接字并创建一个新的 ConnectionHandler
。
void DestroyAndAddNewHandler( ConnectionHandler* connectionHandler )
{
std::unique_lock lock( criticalSection_ );
auto acceptSocket = connectionHandler->Client( ).Handle( );
auto it = connectionHandlers_.find( acceptSocket );
if ( it != connectionHandlers_.end( ) )
{
auto handlerPtr = std::move( it->second );
connectionHandlers_.erase( acceptSocket );
auto& context = Context( );
auto handler = std::make_unique<ConnectionHandler>( context, this );
auto newHandlerPtr = handler.get( );
connectionHandlers_.emplace( handler->Client( ).Handle( ), std::move( handler ) );
Accept( newHandlerPtr );
}
}
有效地使用 I/O 完成端口需要对并发有一定的理解,了解一些陷阱,以及对某些事物(如内存分配)的工作方式有一些基本了解。
无论我们是用 C#、Java 还是 C++ 开发软件,我们的程序都将使用动态分配的内存。在分配和释放内存时,软件正在访问必须视为共享资源的内容。内存可以在一个线程中分配,在另一个线程中释放;这两个操作在某个时候都会操作全局结构,如果我们的软件在每个线程上执行大量分配和释放,执行将围绕内存管理函数进行串行化。
我估计一个线程对动态内存的请求至少有 95% 是针对寿命很短的小缓冲区。一个好的设计会为每个线程或会话预留一些内存,以避免不必要的内存子系统垃圾回收。预分配一些将用于 I/O 的固定大小缓冲区也是一个好主意。原子单链表是管理这些缓冲区的绝佳机制,提供了一种高效的机制,可用于根据为 I/O 完成端口服务的线程池的需要分配和释放缓冲区。
基于此基础设施,创建可扩展性良好的服务器,能够服务数千并发用户,是相对容易的。
为什么我们需要同步
一个普遍的观察是,随着时间的推移,用户通常会对可用数据的一个常见子集感兴趣,在这种情况下,添加某种缓存对服务器性能非常有益。拥有每个线程的缓存是有意义的,但在某个时候,一个线程所做的更新必须对其他线程可见。
创建某种会话来表示服务器内的事务上下文也很常见。在使用 I/O 完成端口时,一个会话通常由多个线程服务,有时也为会话让多个线程并发操作是有意义的。对会话的读写访问显然是必须仔细管理的内容,对缓存也是如此——这就是同步的全部内容:安全、高效地管理多个线程之间共享资源的访问。
有多种可用的同步机制,选择适合特定任务的机制需要了解其基本行为和性能特征。
在性能方面,我主要对探索原子操作和 CRITICAL_SECTION
对象感兴趣。我知道 CRITICAL_SECTION
性能非常好。它在内部部分使用应用程序地址空间中执行的原子操作来实现,并且易于使用。我也知道原子操作存在许多陷阱,很容易设计出浪费 CPU 周期的方案——因此识别一些这些陷阱很有用。
在继续之前,我们需要选择一个不干扰主题的简单任务,增量一个值将是本文其余部分的重点。
constexpr size_t MaxIterations = 4'000'000'000;
size_t result = 0;
auto stopwatch = Stopwatch::StartNew( );
for ( size_t i = 0; i < MaxIterations; i++ )
{
++result;
}
stopwatch.Stop( );
auto duration = stopwatch.TotalSeconds( );
printf( "Result = %zu, duration = %f seconds\n", result, duration );
输出
Result = 4000000000, duration = 0.000000 seconds
快速查看反汇编视图,揭示了刚才发生的事情
mov edx, 0EE6B2800h
lea rcx, [string "Result = %zu, duration = %f sec@"... ( 07FF7474FE158h )]
call printf( 07FF747457000h )
0x0EE6B2800
等于 4,000,000,000。因此,编译器自然地优化掉了整个循环,创建了一个瞬时执行的解决方案。正如编译器确定的那样,这是“count
”到 4,000,000,000
的最高效方法。
我们将首先展示交错“普通”代码和原子操作(被认为是轻量级同步机制)如何产生显著成本。
为了防止编译器优化循环,我们稍微改变它
constexpr size_t MaxIterations = 4'000'000'000;
size_t result = 0;
auto stopwatch = Stopwatch::StartNew( );
for ( size_t i = 0; i < MaxIterations; i++ )
{
_mm_mfence( );
++result;
}
stopwatch.Stop( );
auto duration = stopwatch.TotalSeconds( );
printf( "Result = %zu, duration = %f seconds\n", result, duration );
输出
Result = 4000000000, duration = 33.696560 seconds
_mm_mfence( )
是一个内在函数,它
对在该指令之前发出的所有内存加载和内存存储指令执行一个串行化操作。确保在程序顺序上位于内存围栏指令之前的每个内存访问,在全球可见,然后再执行程序顺序上位于其后的任何内存指令。
现在我们既阻止了编译器,也阻止了 CPU 做任何聪明的事情,并且每个核心将访问 L3 缓存或多 CPU 系统的内存,以确保它具有内存的更新视图。这对于每次迭代都会发生——这是残酷的,并且是通过一行代码完成的。
切换到 std::atomic<size_t>
作为计数器,性能有所提升。
constexpr size_t MaxIterations = 4'000'000'000;
std::atomic<size_t> counter = 0;
auto stopwatch = Stopwatch::StartNew( );
for ( size_t i = 0; i < MaxIterations; i++ )
{
++counter;
}
stopwatch.Stop( );
auto duration = stopwatch.TotalSeconds( );
size_t result = counter;
printf( "Result = %zu, duration = %f seconds\n", result, duration );
输出
Result = 4000000000, duration = 17.257034 seconds
上述性能与使用 InterlockedIncrement
相当,后者由 std::atomic<size_t>
的实现内部使用;它仍然非常昂贵。
我认为这很重要,因为使用 InterlockedIncrement
和 InterlockedDecrement
进行引用的类数量巨大,而在循环中执行递增和递减引用计数器的代码可能会严重降低程序的性能。这通常可以通过按引用传递对象而不是按值传递来轻松避免,即使对象的大小仅为 64 位。我的观点是,如果不小心使用原子操作,可能会严重降低程序的性能。
分区 第 1 部分
为了尝试提高计数器的性能,我们将工作分配给四个线程。
constexpr size_t ThreadCount = 4;
constexpr size_t MaxIterations = 4'000'000'000;
constexpr size_t PerThreadIterations = MaxIterations / ThreadCount;
std::atomic<size_t> counter;
ThreadGroup threadGroup;
auto stopwatch = Stopwatch::StartNew( );
for ( int i = 0; i < ThreadCount; ++i )
{
threadGroup.Add( [&counter]( )
{
for ( size_t j = 0; j < PerThreadIterations; j++ )
{
++counter;
}
} );
}
threadGroup.Wait( );
stopwatch.Stop( );
size_t result = counter;
auto duration = stopwatch.TotalSeconds( );
printf( "Result = %zu, duration = %f seconds\n", result, duration );
输出
Result = 4000000000, duration = 71.819745 seconds
这比单线程情况下的 17.25 秒慢得多。如果说有什么的话,这表明对原子变量的频繁并发更新会严重降低我们软件的性能。
分区 第 2 部分
为了真正提高计数器的性能,我们仍然将工作分配给四个线程。
constexpr size_t ThreadCount = 4;
constexpr size_t MaxIterations = 4'000'000'000;
constexpr size_t PerThreadIterations = MaxIterations / ThreadCount;
struct ThreadData
{
Int64 counter = 0;
};
ThreadData threadData[ThreadCount];
ThreadGroup threadGroup;
auto stopwatch = Stopwatch::StartNew( );
for ( int i = 0; i < ThreadCount; ++i )
{
auto& perThreadData = threadData[i];
threadGroup.Add( [&perThreadData]( )
{
size_t counter = 0;
for ( size_t j = 0; j < PerThreadIterations; j++ )
{
counter = Performance::Increment( counter );
}
perThreadData.counter = counter;
} );
}
threadGroup.Wait( );
stopwatch.Stop( );
size_t result = 0;
for ( int i = 0; i < ThreadCount; ++i )
{
result += threadData[i].counter;
}
auto duration = stopwatch.TotalSeconds( );
printf( "Result = %zu, duration = %f seconds\n", result, duration );
其中 Performance::Increment(…)
实现位于单独的 DLL 中,以确保它在每次迭代时都会被调用,而不会被优化掉。
Int64 Increment( Int64 value )
{
return ++value;
}
输出
Result = 4000000000, duration = 1.472855 seconds
每个线程完成一部分工作,将结果放在 perThreadData.counter
中。threadGroup.Wait( )
等待所有线程完成;由于这是使用内核模式等待完成的,我们可以确信主线程具有数据的更新视图。然后,主线程聚合每个线程的结果。
为了高效地并行化一项工作,以下是一个好主意:
- 将工作划分为独立的任务。
- 在执行任务之前分配所需的资源。
- 让每个任务运行,而不执行需要同步的操作。
- 在所有任务完成后聚合结果。
这可能并不总是可能的,但它将提供最佳性能。
替换
counter = Performance::Increment( counter );
用
++counter;
这使我们能够实现完全优化的代码的并行执行。
Result = 4000000000, duration = 0.001265 seconds
显然,编译器现在可以在编译时评估循环,因此这四个线程只返回了结果值。
这样计数当然是牵强的,但它保持了简单性,并让我们专注于主题。
正常程序运行
正常程序执行对并行执行的其他线程是无意识的,并且在现代 Intel x86/x64 处理器上,一个核心有时每周期可以处理多达四个指令。
这是由于处理器在运行时可以分析缓存的指令和缓存数据而实现的并行性,而且大多数时候我们对这种功能都满意。
分层缓存设计使处理器能够非常快速地读写内存地址,而不管主内存是否已更新,只要缓存向核心提供了内存中存储内容的统一视图即可。根据线程访问内存信息的方式,吞吐量收益可能非常可观。
- L1 缓存访问延迟:4 个周期
- L2 缓存访问延迟:11 个周期
- L3 缓存访问延迟:39 个周期
- 主内存访问延迟:107 个周期
一个最优设计的并行程序将使每个核心的所有执行单元都忙于有用的工作。只要线程能够在其依赖它的操作执行之前将相关数据移入内部寄存器文件,并且流水线能够将结果写回缓存而不等待操作完成,线程就能让核心全速运行。
但有时,线程会到达一个点,它必须访问可能已被另一个线程修改的信息,或者更新一个可能引起另一个线程兴趣的地址。
为了可靠地完成此操作,我们必须使用某种协调机制,这总是以性能为代价的。
经验法则:我们应该始终努力将所需的同步保持在最低限度。
如果我们确信数据在被多个线程访问时不会被修改,那么就不需要同步——这在设计并行代码时可以充分利用。
原子操作
原子操作的目的是促进对可能已被多个核心修改的内存的协调访问,而无需刷新整个缓存并强制核心从内存中重新读取所有内容。原子操作由处理器执行,而不是操作系统服务。
标准模板库和 Visual C++ 编译器都提供了对变量执行原子操作所需的功能;并且当不需要复杂操作时,使用此功能通常是并发读写线程之间共享数据的最简单、最好的方法。使用 InterlockedIncrement
和 InterlockedDecrement
实现引用计数机制属于此类。
该库提供了一个简单的 Spinlock
类,实现在HCCSync.h 中。
class Spinlock
{
private:
enum LockState
{
Locked, Unlocked
};
std::atomic<LockState> state_;
public:
constexpr Spinlock( ) noexcept
: state_( Unlocked )
{}
void lock( ) noexcept
{
while ( state_.exchange( Locked, std::memory_order_acquire ) == Locked )
{
}
}
bool try_lock( ) noexcept
{
return state_.exchange( Locked, std::memory_order_acquire ) == Unlocked;
}
void unlock( ) noexcept
{
state_.store( Unlocked, std::memory_order_release );
}
};
使用 Spinlock
类很简单,这里有一个由 Spinlock
保护的简单计数器,以及一组每线程计数器,它们演示了这一点按预期工作。
constexpr size_t ThreadCount = 4;
constexpr size_t ThreadIterationCount = 4'000'000;
struct Data
{
Spinlock spinlock;
size_t counter = 0;
};
Data data;
std::array<size_t, ThreadCount> threadCounters = {};
ThreadGroup threadGroup;
for ( size_t i = 0; i < ThreadCount; ++i )
{
threadGroup.Add( [i, &data,&threadCounters]( )
{
for ( size_t j = 0; j < ThreadIterationCount; ++j )
{
std::unique_lock lock( data.spinlock );
data.counter++;
threadCounters[i]++;
}
} );
}
puts( "Main thread waiting on background threads" );
threadGroup.join( );
printf( "Data counter: %zu\n", data.counter );
for ( size_t i = 0; i < ThreadCount; ++i )
{
printf( "Thread %zu counter: %zu\n",i, threadCounters[i] );
}
输出
Main thread waiting on background threads
Data counter : 16000000
Thread 0 counter : 4000000
Thread 1 counter : 4000000
Thread 2 counter : 4000000
Thread 3 counter : 4000000
它被称为自旋锁,因为 lock()
函数会忙等待锁,消耗 CPU 周期直到获得锁。这种自旋锁永远不应作为 CriticalSection
或 Mutex
的即插即用替代品,它适用于我们知道争用概率非常低且锁是短暂的、不超过几百个周期的情况。原子操作的全部意义在于提高性能,而忙等待并不是系统资源的有效利用。
我见过一些自旋锁实现,它们在 lock()
函数的 while
循环中调用 Sleep(0)
,在我看来,这违背了自旋锁的目的,因为它将导致线程放弃其当前时间片的剩余部分。即使 Sleep
的文档说明
值为零会导致线程将剩余的时间片让给任何优先级相等且已准备好运行的其他线程。如果没有其他优先级相等且已准备好运行的线程,则函数立即返回,线程继续执行。
重要的是要意识到 Sleep(0)
需要往返内核模式,这里有一个片段可以让我们了解调用 Sleep(0)
的成本。
constexpr size_t Count = 10'000'000;
Stopwatch stopwatch;
stopwatch.Start( );
auto cyclesStart = CurrentThread::QueryCycleTime( );
for ( size_t i = 0; i < Count; ++i )
{
Sleep( 0 );
}
auto cyclesEnd = CurrentThread::QueryCycleTime( );
stopwatch.Stop( );
auto cycles = cyclesEnd - cyclesStart;
auto cyclesPerIteration = cycles / Count;
auto duration = stopwatch.TotalSeconds( );
printf( "Cycles per iteration %zu - loop done in %f", cyclesPerIteration, duration );
输出
Cycles per iteration 1398 - loop done in 5.170509
在空闲系统上执行时,上述片段表明调用 Sleep(0)
的成本太高,无法用作自旋锁的一部分——使用 CriticalSection
的效果会好得多,因为它性能更好。
原子操作可以提高程序的性能,为了让我们了解其程度,这里有两种实现,它们使用两个线程来递增计数器。第一个使用 std::atomic<size_t>
作为计数器,第二个使用由临界区保护的 64 位整数作为计数器。
constexpr size_t Count = 1'000'000'000;
std::atomic<size_t> counter = 0;
size_t cyclesThread = 0;
Stopwatch stopwatch;
stopwatch.Start( );
SynchronizationBarrier barrier(2);
Thread thread( [&barrier, &counter, &cyclesThread]( )
{
barrier.Enter( );
auto cyclesStart = CurrentThread::QueryCycleTime( );
for ( size_t i = 0; i < Count; i++ )
{
++counter;
}
auto cyclesEnd = CurrentThread::QueryCycleTime( );
cyclesThread = cyclesEnd - cyclesStart;
} );
barrier.Enter( );
auto cyclesStart = CurrentThread::QueryCycleTime( );
for ( size_t i = 0; i < Count; i++ )
{
++counter;
}
auto cyclesEnd = CurrentThread::QueryCycleTime( );
size_t cyclesMainThread = cyclesEnd - cyclesStart;
thread.join( );
stopwatch.Stop( );
auto duration = stopwatch.TotalSeconds( );
auto cyclesPerIteration = (cyclesMainThread + cyclesThread ) / counter;
printf( "Cycles per atomic increment using two threads %zu in %f seconds\n",
cyclesPerIteration, duration);
输出
Cycles per atomic increment using two threads 83 in 31.029148 seconds
将上述代码更改为使用 CriticalSection
来保护计数器变量。
constexpr size_t Count = 1'000'000'000;
size_t counter = 0;
size_t cyclesThread = 0;
CriticalSection criticalSection;
Stopwatch stopwatch;
stopwatch.Start( );
SynchronizationBarrier barrier( 2 );
Thread thread( [&barrier,&criticalSection,&counter, &cyclesThread]( )
{
barrier.Enter( );
auto cyclesStart = CurrentThread::QueryCycleTime( );
for ( size_t i = 0; i < Count; i++ )
{
std::unique_lock lock( criticalSection );
++counter;
}
auto cyclesEnd = CurrentThread::QueryCycleTime( );
cyclesThread = cyclesEnd - cyclesStart;
} );
barrier.Enter( );
auto cyclesStart = CurrentThread::QueryCycleTime( );
for ( size_t i = 0; i < Count; i++ )
{
std::unique_lock lock( criticalSection );
++counter;
}
auto cyclesEnd = CurrentThread::QueryCycleTime( );
size_t cyclesMainThread = cyclesEnd - cyclesStart;
thread.join( );
stopwatch.Stop( );
auto duration = stopwatch.TotalSeconds( );
auto cyclesPerIteration = ( cyclesMainThread + cyclesThread ) / counter;
printf( "Cycles per iteration using two threads with CriticalSection: %zu in %f seconds\n",
cyclesPerIteration, duration );
输出
Cycles per iteration using two threads with CriticalSection : 164 in 67.675265 seconds
使用 std::atomic
的实现几乎是使用 CriticalSection
的实现的兩倍效率,但如果我将实现更改为使用四个原子变量,它们共享同一个缓存行,与四个由单个临界区保护的 64 位整数相比,后者比前者快 32%。
虽然这些示例微不足道,但它们说明了当我们尝试使用原子操作来提高并发操作的性能时,我们的余地有多小。它们是实现高效并发软件的绝佳方式,但这有点像脑外科手术——我们应该只在别无选择且我们知道自己在做什么的时候才这样做。
缓存行
如果我们仍然想尝试原子操作,我们需要记住缓存行是缓存和内存之间数据传输的单位,缓存行通常是 64 字节。处理器在读取或写入缓存行内的任何地址时,都会读取或写入整个缓存行。处理器还会通过分析每个核心的内存访问模式来尝试预取缓存行。
我们应该始终确保需要使用原子操作进行线程间同步的独立数据不共享同一缓存行。
内核对象同步
同步可以使用几种由 Windows 内核对象实现支持的同步机制来完成。这些是 Win32/64 API 提供的最重量级、功能最丰富的同步机制。
- 它们可以命名,将它们放入内核对象命名空间。
- 它们可用于进程间同步。
- 它们可以被保护。
- 子进程可以继承创建它们或打开它们的进程的句柄。
- 在等待内核对象进入其信号状态时,我们可以指定一个超时时间(以毫秒为单位)。
- 它们的生存期可以超过创建它们的进程。
Win32/64 API 允许我们使用以下内核对象类型进行同步:
- 线程
- 进程
- 文件和控制台标准输入、输出和错误流
- 作业
- 事件
- 互斥体
- 信号量
- 可等待定时器
WaitableHandle
WaitableHandle
是为提供对程序可以等待的内核对象的访问的类提供的基类。它有一个数据成员。
class WaitableHandle
{
private:
HANDLE handle_;
public:
...
};
WaitableHandle
是可移动赋值、可移动构造的,但不可复制赋值和不可复制构造。实现确保句柄的生命周期得到妥善管理。该类如其名,实现了允许我们等待内核对象的函数。
Wait(…)
函数是 WaitForSingleObject(…)
函数的轻量级包装。
bool Wait( UInt32 timeoutInMillis = INFINITE ) const
{
auto rc = WaitForSingleObject( handle_, timeoutInMillis );
if ( rc == WAIT_FAILED )
{
ThrowLastOSError( );
}
return rc == WAIT_TIMEOUT ? false : true;
}
如果等待成功且内核对象处于信号状态,Wait(…)
函数将返回 true
;如果由于超时而导致等待过期,则返回 false
。如果 WaitForSingleObject(…)
函数返回 WAIT_FAILED
表示错误,Wait(…)
将抛出包含 GetLastError()
返回的错误代码和操作系统提供的附加错误消息的异常。
当 WaitForSingleObject(…)
返回时,缓存将与主内存同步,并且线程无需进一步操作即可确保对句柄保护的对象进行安全访问。
为了与 std::lock_guard
和标准模板库的其他模板兼容,WaitableHandle
还实现了
void lock( ) const
{
Wait( );
}
bool try_lock( ) const
{
return Wait( );
}
由派生类负责实现缺失的 unlock( )
成员函数。
EventWaitHandle
EventWaitHandle
类提供了一种通知等待线程事件发生情况的机制。此类封装了 Event 内核对象并管理句柄的生命周期。
有两种类型的事件对象:手动重置和自动重置。当事件进入信号状态时,手动重置事件将释放所有等待的线程以执行,而自动重置事件只会释放一个等待的线程。
事件通常用于一个线程执行一些工作,然后通知另一个线程执行依赖于其刚刚完成的工作。事件以非信号状态创建,然后在线程完成其工作后,将其设置为信号状态。此时,等待的线程被释放,可以继续其操作。
EventWaitHandle
可如此使用:
EventWaitHandle event1( true );
EventWaitHandle event2( true );
Thread thread( [&event1, &event2]( )
{
puts( "Background thread signalling event1" );
event1.Signal( );
puts( "Background thread waiting for event2" );
event2.Wait( );
} );
puts( "Main thread waiting for event1" );
event1.Wait( );
puts( "Main thread signalling event2" );
event2.Signal( );
puts( "Main thread waiting for background thread to terminate" );
thread.Wait( );
输出
Main thread waiting for event1
Background thread signalling event1
Background thread waiting for event2
Main thread signalling event2
Main thread waiting for background thread to terminate
构造函数
EventWaitHandle
对象可以通过多种方式构造,默认构造函数创建一个空的、可移动赋值给另一个 EventWaitHandle
的对象。
constexpr EventWaitHandle( ) noexcept;
explicit EventWaitHandle( bool manualReset, bool initialState = false,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit EventWaitHandle( LPCWSTR name, bool manualReset = true, bool initialState = false,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit EventWaitHandle( LPCSTR name, bool manualReset = true, bool initialState = false,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit EventWaitHandle( const std::wstring& name, bool manualReset = true,
bool initialState = false,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit EventWaitHandle( const std::string& name, bool manualReset = true,
bool initialState = false,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
如果 manualReset
设置为 true
,构造函数将创建一个手动重置事件;否则,它将创建一个自动重置事件。
initialState
指定事件将以信号状态创建。
当手动重置事件被信号化时,它会保持信号状态,直到通过 ResetEvent()
函数将其重置为非信号状态。所有等待的线程,或稍后开始等待该事件的线程,将在对象的状态被信号化时被释放。
当自动重置事件被信号化时,它会保持信号状态,直到一个等待的线程被释放,然后系统会自动将其状态重置为非信号状态。如果没有线程在等待,则自动重置事件将保持信号状态。
name 参数指定事件的名称,长度不能超过 MAX_PATH
。如果名称与现有事件对象匹配,则会创建对该事件的句柄,而 manualReset
和 initialState
参数将被忽略。
securityAttributes
指定事件的 SECURITY_ATTRIBUTES
。当 securityAttributes
为 nullptr
时,事件将获得默认的安全描述符,其 ACL 来自创建线程的主令牌或模拟令牌。
desiredAccess
使用 EventWaitHandleRights
枚举中的值指定事件的访问掩码,这些值可以使用“|
”运算符组合。
None
:无权限Delete
:删除命名事件的权限ReadPermissions
:打开和复制命名事件的访问规则和审计规则的权限Synchronize
:等待命名事件的权限ChangePermissions
:更改与命名事件关联的安全和审计规则的权限TakeOwnership
:更改命名事件所有者的权限Modify
:设置或重置命名事件信号状态的权限FullControl
:对命名事件拥有完全控制权,并修改其访问规则和审计规则的权限
OpenExisting 和 TryOpenExisting
要访问现有的事件对象,我们使用 OpenExisting(…)
函数,它有以下重载。
static EventWaitHandle OpenExisting( LPCWSTR name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle OpenExisting( LPCSTR name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle OpenExisting( const std::wstring& name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle OpenExisting( const std::string& name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
OpenExisting(…)
函数在无法打开事件时会抛出异常,而 TryOpenExisting(…)
函数将返回一个空的 EventWaitHandle
。
static EventWaitHandle TryOpenExisting( LPCWSTR name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle TryOpenExisting( LPCSTR name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle TryOpenExisting( const std::wstring& name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
static EventWaitHandle TryOpenExisting( const std::string& name,
EventWaitHandleRights desiredAccess =
EventWaitHandleRights::Synchronize |
EventWaitHandleRights::Modify,
bool inheritHandle = false );
PulseEvent
PulseEvent()
函数将事件设置为信号状态,然后根据事件类型释放等待的线程后将其重置为非信号状态。
void PulseEvent( ) const;
手动重置事件将立即释放所有可以释放的线程。然后函数将事件的状态重置为非信号状态并返回。
自动重置事件将释放一个等待的线程,然后将事件的状态重置为非信号状态。
根据 PulseEvent(…)
Windows API 函数的文档,应避免使用此函数。
等待同步对象的线程可能会被内核模式 APC 暂时移出等待状态,并在 APC 完成后返回到等待状态。如果在调用 PulseEvent 时线程已被移出等待状态,则该线程不会被释放,因为 PulseEvent 只释放调用时正在等待的线程。因此,PulseEvent 是不可靠的,不应在新应用程序中使用。
SetEvent 和 Signal
SetEvent
和 Signal
设置事件对象的信号状态。
void SetEvent( ) const;
void Signal( ) const;
Signal
只是调用 SetEvent
,使用它通常可以使调用代码的意图更清晰。
设置一个已经处于信号状态的事件没有效果。
手动重置事件在被调用 ResetEvent()
函数显式设置为非信号状态之前,将保持信号状态。在事件状态为信号状态时,等待线程和开始对事件执行等待操作的线程将被释放。
自动重置事件在释放单个等待线程之前将保持信号状态,然后会自动重置为非信号状态。
ResetEvent, Reset 和 unlock
ResetEvent
、Reset
和 unlock
将事件设置为非信号状态。
void ResetEvent( ) const;
void Reset( ) const;
void unlock( ) const;
Reset
和 unlock
只是调用 ResetEvent
。通过实现 unlock,该类满足 BasicLockable
要求,允许我们使用 std::lock_guard
等模板等待事件并在锁超出作用域时自动重置事件。根据我们的设计,这可能是合理的。
互斥体
Mutex
用于确保线程对对象的互斥访问。
拥有互斥体的线程可以对 Mutex
执行多个等待操作,而不会阻塞其执行。这可以防止线程在等待它拥有的互斥体时死锁。要释放互斥体的所有权,线程必须为每次成功的等待操作调用一次 ReleaseMutex
。
Mutex
是一个同步对象,当它不被线程拥有时,它被设置为信号状态,当它被拥有时,它被设置为非信号状态。
Mutex
类封装了互斥体内核对象并管理句柄的生命周期。该类满足 BasicLockable
要求,允许我们使用 std::unique_lock
等模板来获取和释放互斥体内核对象的所有权。
Mutex
类可如此使用:
size_t counter = 0;
Mutex mutex( true );
ThreadGroup threadGroup;
for ( int i = 0; i < 100; ++i )
{
threadGroup.Add( [i, &mutex, &counter]( )
{
auto id = i + 1;
for ( int i = 0; i < 10; ++i )
{
printf( "T%d: waiting\n", id );
std::unique_lock lock( mutex );
printf( "T%d: acquired mutex\n", id );
++counter;
printf( "T%d: value %zu\n", id, counter );
}
} );
}
mutex.unlock( );
puts( "Main thread waiting on background threads" );
threadGroup.join();
printf( "Final value %zu\n", counter );
该示例创建了 100 个线程,每个线程锁定互斥体并将其计数器值增加 10 次。互斥体最初由程序的 main 线程持有,在所有线程创建之前,任何“counter
”线程都无法继续。
输出
Main thread waiting on background threads
T1: waiting
T13: waiting
T2: waiting
T3: waiting
T4: waiting
T5: waiting
T6: waiting
T7: waiting
T20: waiting
...
...
T100: waiting
Main thread waiting on background threads
T83: acquired mutex
T83: value 2
T83: waiting
T76: acquired mutex
T76: value 3
T76: waiting
...
...
T3: value 998
T1: acquired mutex
T1: value 999
T2: acquired mutex
T2: value 1000
Final value 1000
构造函数
Mutex
对象可以通过多种方式构造,默认构造函数创建一个空的、可移动赋值给另一个 Mutex
的对象。
constexpr Mutex( ) noexcept;
explicit Mutex( bool initiallyOwned,
MutexRights desiredAccess = MutexRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Mutex( LPCWSTR name, bool initiallyOwned = true,
MutexRights desiredAccess = MutexRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Mutex( LPCSTR name, bool initiallyOwned = true,
MutexRights desiredAccess = MutexRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Mutex( const std::wstring& name, bool initiallyOwned = true,
MutexRights desiredAccess = MutexRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Mutex( const std::string& name, bool initiallyOwned = true,
MutexRights desiredAccess = MutexRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
如果调用者创建了 mutex
对象,并且 initiallyOwned
参数为 true
,则调用线程将获得新创建的 mutex
对象的所有权。多个进程可以调用 CreateMutex
或 CreateMutexEx
(由 Mutex
构造函数调用)来创建同一个命名 mutex
。第一个进程将创建 mutex
,其他进程将仅打开现有 mutex
的句柄。这允许多个进程获取同一个 mutex
的句柄,而无需强制用户确保先启动创建进程。如果这样做,我们应该将 initiallyOwned
设置为 false
,以避免对哪个进程拥有初始所有权产生不确定性。
name 参数指定 mutex
对象的名称,长度不能超过 MAX_PATH
。如果名称与现有 mutex
对象匹配,则创建对该 mutex
的句柄,并忽略 initiallyOwned
参数。
securityAttributes
指定 mutex
的 SECURITY_ATTRIBUTES
。当 securityAttributes
为 nullptr
时,mutex
将获得默认的安全描述符,其 ACL 来自创建线程的主令牌或模拟令牌。
desiredAccess
使用 MutexRights
枚举中的值指定 mutex
的访问掩码,这些值可以使用“|
”运算符组合。
None
:无权限Delete
:删除命名互斥体的权限ReadPermissions
:打开和复制命名互斥体的访问规则和审计规则的权限Synchronize
:等待命名互斥体的权限ChangePermissions
:更改与命名互斥体关联的安全和审计规则的权限TakeOwnership
:更改命名互斥体所有者的权限Modify
:设置或重置命名互斥体信号状态的权限FullControl
:对命名互斥体拥有完全控制权,并修改其访问规则和审计规则的权限
OpenExisting 和 TryOpenExisting
要访问现有的 mutex
对象,我们使用 OpenExisting(…)
函数,它有以下重载。
static Mutex OpenExisting( LPCWSTR name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex OpenExisting( LPCSTR name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex OpenExisting( const std::wstring& name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex OpenExisting( const std::string& name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
OpenExisting(…)
函数在无法打开 mutex
时会抛出异常,而 TryOpenExisting(…)
函数将返回一个空的 Mutex
。
static Mutex TryOpenExisting( LPCWSTR name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex TryOpenExisting( LPCSTR name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex TryOpenExisting( const std::wstring& name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
static Mutex TryOpenExisting( const std::string& name,
MutexRights desiredAccess = MutexRights::Synchronize |
MutexRights::Modify,
bool inheritHandle = false );
ReleaseMutex, Release 和 unlock
ReleaseMutex()
释放 mutex
的所有权,而 Release()
和 unlock()
函数只调用 ReleaseMutex()
函数。
信号量
Semaphore
对象可用于资源计数。semaphore
有一个最大计数和一个当前计数。使用最大计数来保存信号量保护的最大资源数,使用当前计数来表示当前可用资源的数量。
当信号量的计数大于零时,它被设置为信号状态,当计数为零时,它被设置为非信号状态。
每次成功等待 Semaphore
会导致计数减 1,我们必须调用 Release(…)
函数来增加信号量的计数(指定的量)。计数永远不能小于零或大于最大值。
该类满足 BasicLockable
要求,允许我们使用 std::unique_lock
等模板来等待 Semaphore
,并在锁超出作用域时调用 ReleaseSemaphore(1)
。
这是一个演示 Semaphore
对象典型用法的“玩具”资源管理器。
namespace ResourceManager
{
struct Resource
{
long long counter_ = 0;
};
class Resources
{
constexpr static size_t ResourceCount = 5;
Semaphore semaphore_;
Mutex mutex_;
std::array<Resource, ResourceCount> resources_;
std::list< Resource* > freeList_;
public:
Resources( )
: semaphore_( ResourceCount, ResourceCount ), mutex_( false )
{
for ( auto& r : resources_ )
{
freeList_.push_back( &r );
}
}
Resource* GetResource( )
{
if ( semaphore_.Wait( ) )
{
std::unique_lock lock( mutex_ );
auto* result = freeList_.back( );
freeList_.pop_back( );
return result;
}
return nullptr;
}
void Release( Resource* r )
{
std::unique_lock lock( mutex_ );
freeList_.push_back( r );
semaphore_.Release( );
}
size_t Sum( ) const
{
size_t result = 0;
for ( auto& r : resources_ )
{
result += r.counter_;
}
return result;
}
};
}
Resources
类管理对五个 Resource
对象的访问。
Semaphore
用于向等待的线程提供可用资源分配的通知,而 Mutex
用于保护空闲资源列表。
要尝试一下,我们让 100 个线程共享一个 Resources
对象,并根据资源可用情况获取对 Resource
对象的访问。
using namespace ResourceManager;
Resources resources;
ThreadGroup threadGroup;
for ( int i = 0; i < 100; ++i )
{
threadGroup.Add( [i, &resources]( )
{
auto id = i + 1;
for ( int i = 0; i < 10; ++i )
{
printf( "T%d: waiting\n", id );
auto* r = resources.GetResource( );
printf( "T%d: acquired resource\n", id );
r->counter_++;
printf( "T%d: value %zu\n", id, r->counter_ );
resources.Release( r );
}
} );
}
puts( "Main thread waiting on background threads" );
threadGroup.join( );
auto sum = resources.Sum( );
printf( "Final value %zu\n", sum );
输出
T1: waiting
T2: waiting
T1: acquired resource
T1: value 1
T3: waiting
T3: acquired resource
T3: value 2
T5: waiting
T5: acquired resource
T5: value 3
T9: waiting
T18: waiting
...
...
T100: acquired resource
T100: value 190
T92: acquired resource
T92: value 200
T98: acquired resource
T98: value 207
T74: acquired resource
T74: value 178
T81: acquired resource
T81: value 225
Final value 1000
构造函数
Semaphore
对象可以通过多种方式构造,默认构造函数创建一个空的、可移动赋值给另一个 Semaphore
的对象。
constexpr Semaphore( ) noexcept;
explicit Semaphore( long initialCount, long maximumCount,
SemaphoreRights desiredAccess = SemaphoreRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Semaphore( LPCWSTR name, long initialCount, long maximumCount,
SemaphoreRights desiredAccess = SemaphoreRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Semaphore( LPCSTR name, long initialCount, long maximumCount,
SemaphoreRights desiredAccess = SemaphoreRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Semaphore( const std::wstring& name, long initialCount, long maximumCount,
SemaphoreRights desiredAccess = SemaphoreRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit Semaphore( const std::string& name, long initialCount, long maximumCount,
SemaphoreRights desiredAccess = SemaphoreRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
initialCount
参数指定 Semaphore
对象的初始计数,不得小于零或大于 maximumCount
参数。当计数为零时,信号量是非信号的;当计数大于零时,它是信号的。每次成功等待信号量的线程,计数都会减一。通过调用 ReleaseSemaphore(…)
函数并将其参数设置为要增加的量来增加计数。
maximumCount
参数指定 Semaphore
对象的最大计数,必须为一或更大。
name
参数指定信号量对象的名称,长度不能超过 MAX_PATH
。如果名称与现有信号量对象匹配,则会创建对该信号量的句柄,而 initialCount
和 maximumCount
参数将被忽略。
securityAttributes
指定信号量的 SECURITY_ATTRIBUTES
。当 securityAttributes
为 nullptr
时,信号量将获得默认的安全描述符,其 ACL 来自创建线程的主令牌或模拟令牌。
desiredAccess
使用 SemaphoreRights
枚举中的值指定信号量的访问掩码,这些值可以使用“|
”运算符组合。
None
:无权限Delete
:删除命名信号量的权限ReadPermissions
:打开和复制命名信号量的访问规则和审计规则的权限Synchronize
:等待命名信号量的权限ChangePermissions
:更改与命名信号量关联的安全和审计规则的权限TakeOwnership
:更改命名信号量所有者的权限Modify
:设置或重置命名信号量信号状态的权限FullControl
:对命名信号量拥有完全控制权,并修改其访问规则和审计规则的权限
OpenExisting 和 TryOpenExisting
要访问现有的信号量对象,我们使用 OpenExisting(…)
函数,它有以下重载。
static Semaphore OpenExisting( LPCWSTR name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore OpenExisting( LPCSTR name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore OpenExisting( const std::wstring& name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore OpenExisting( const std::string& name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
OpenExisting(…)
函数在无法打开信号量时会抛出异常,而 TryOpenExisting(…)
函数将返回一个空的 Semaphore
。
static Semaphore TryOpenExisting( LPCWSTR name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore TryOpenExisting( LPCSTR name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore TryOpenExisting( const std::wstring& name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
static Semaphore TryOpenExisting( const std::string& name,
SemaphoreRights desiredAccess = SemaphoreRights::Synchronize |
SemaphoreRights::Modify,
bool inheritHandle = false );
ReleaseSemaphore, Release 和 unlock
ReleaseSemaphore
增加信号量的计数(指定的量)。
long ReleaseSemaphore( long releaseCount = 1 ) const;
long Release( long releaseCount = 1 ) const;
void unlock( ) const;
Release
仅调用 ReleaseSemaphore
,unlock()
也是如此,其中 releaseCount
设置为一。
WaitableTimer
可等待定时器是一个同步对象,当到达指定的截止时间时,其状态被设置为信号。Windows API 提供两种可等待定时器类型:手动重置和同步;两者都可以是周期性的。
手动重置的可等待定时器在调用 SetWaitableTimer
函数设置新的截止时间之前一直处于信号状态;而同步定时器在其信号状态下,直到释放一个等待该可等待定时器的线程后才退出。
以下片段创建一个将在定时器上等待五秒的线程。
std::cout << "Start: " << DateTime::Now( ) << std::endl;
EventWaitHandle event( true );
WaitableTimer timer(true, TimeSpan::FromSeconds( 5 ) );
Thread thread( [&timer, &event]( )
{
std::cout << "Background thread waiting on timer" << std::endl;
timer.Wait( );
std::cout << "Background thread continued: " << DateTime::Now( ) << std::endl;
std::cout << "Background thread signalling event" << std::endl;
event.Signal( );
} );
std::cout << "Main thread waiting for event" << std::endl;
event.Wait( );
std::cout << "Main thread waiting for background thread to terminate" << std::endl;
thread.Wait( );
输出
Start: 03.09.2020 22 : 02 : 59
Main thread waiting for event
Background thread waiting on timer
Background thread continued : 03.09.2020 22 : 03 : 04
Background thread signalling event
Main thread waiting for background thread to terminate
构造函数
WaitableTimer
对象可以通过多种方式构造,默认构造函数创建一个空的、可移动赋值给另一个 WaitableTimer
的对象。
constexpr WaitableTimer( ) noexcept;
explicit WaitableTimer( bool manualReset,
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( bool manualReset, const DateTime& dueTime,
const TimeSpan& interval = TimeSpan( ),
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( bool manualReset, const TimeSpan& dueTime,
const TimeSpan& interval = TimeSpan( ),
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( LPCWSTR name, bool manualReset = true,
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( LPCSTR name, bool manualReset = true,
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( const std::wstring& name, bool manualReset = true,
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
explicit WaitableTimer( const std::string& name, bool manualReset = true,
WaitableTimerRights desiredAccess = WaitableTimerRights::FullControl,
LPSECURITY_ATTRIBUTES securityAttributes = nullptr );
如果 manualReset
参数为 true
,构造函数将创建一个手动重置的可等待定时器;如果为 false
,则创建一个同步的可等待定时器,该定时器在释放单个等待线程后会自动重置。
dueTime
参数指定可等待定时器何时进入信号状态。如果 dueTime
是 DateTime
,则 dueTime
是绝对的;当 dueTime
是 TimeSpan
时,dueTime
是相对的。
name
参数指定可等待定时器对象的名称,长度不能超过 MAX_PATH
。如果名称与现有可等待定时器对象匹配,则会创建对该可等待定时器的句柄,而 manualReset
参数将被忽略。
securityAttributes
指定可等待定时器的 SECURITY_ATTRIBUTES
。当 securityAttributes
为 nullptr
时,可等待定时器将获得默认的安全描述符,其 ACL 来自创建线程的主令牌或模拟令牌。
desiredAccess
使用 WaitableTimerRights
枚举中的值指定可等待定时器的访问掩码,这些值可以使用“|
”运算符组合。
None
:无权限Delete
:删除命名可等待定时器的权限ReadPermissions
:打开和复制命名可等待定时器的访问规则和审计规则的权限Synchronize
:等待命名可等待定时器的权限ChangePermissions
:更改与命名可等待定时器关联的安全和审计规则的权限TakeOwnership
:更改命名可等待定时器所有者的权限Modify
:设置或重置命名可等待定时器信号状态的权限FullControl
:对命名可等待定时器拥有完全控制权,并修改其访问规则和审计规则的权限
OpenExisting 和 TryOpenExisting
要访问现有的可等待定时器对象,我们使用 OpenExisting(…)
函数,它有以下重载。
static WaitableTimer OpenExisting( LPCWSTR name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer OpenExisting( LPCSTR name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer OpenExisting( const std::wstring& name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer OpenExisting( const std::string& name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
OpenExisting(…)
函数在无法打开可等待定时器时会抛出异常,而 TryOpenExisting(…)
函数将返回一个空的 WaitableTimer
。
static WaitableTimer TryOpenExisting( LPCWSTR name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer TryOpenExisting( LPCSTR name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer TryOpenExisting( const std::wstring& name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
static WaitableTimer TryOpenExisting( const std::string& name,
WaitableTimerRights desiredAccess =
WaitableTimerRights::Synchronize |
WaitableTimerRights::Modify,
bool inheritHandle = false );
SetTimer
SetTimer
函数激活可等待定时器,当到达截止时间时,可等待定时器将被信号化。
void SetTimer( LARGE_INTEGER dueTime, DWORD interval,
PTIMERAPCROUTINE completionRoutine,
void* argToCompletionRoutine,
bool resumeSystemIfSuspended ) const;
void SetTimer( LARGE_INTEGER dueTime, DWORD interval,
bool resumeSystemIfSuspended = false ) const;
void SetTimer( const DateTime& dueTime, const TimeSpan& interval,
PTIMERAPCROUTINE completionRoutine,
void* argToCompletionRoutine,
bool resumeSystemIfSuspended ) const;
void SetTimer( const TimeSpan& dueTime, const TimeSpan& interval,
PTIMERAPCROUTINE completionRoutine,
void* argToCompletionRoutine,
bool resumeSystemIfSuspended ) const;
void SetTimer( const DateTime& dueTime, const TimeSpan& interval = TimeSpan( ),
bool resumeSystemIfSuspended = false ) const;
void SetTimer( const TimeSpan& dueTime, const TimeSpan& interval = TimeSpan( ),
bool resumeSystemIfSuspended = false ) const;
dueTime
指定定时器变为信号状态的时间。使用正值以 UTC 时间设置绝对时间作为 FILETIME
,或使用负值以 100 纳秒分辨率设置相对时间。当 dueTime
以 DateTime
形式给出时,该参数指定一个绝对 dueTime
;当以 TimeSpan
形式给出时,dueTime
为相对时间。
interval
参数给出定时器的周期(以毫秒为单位)。当 interval 为零时,可等待定时器将仅被信号化一次;当 interval 大于零时,可等待定时器将是周期性的,并在每次周期结束时自动重新激活,直到使用 Cancel()
函数取消定时器或使用 SetTimer(…)
函数重置。当 interval
以 TimeSpan
形式给出时,该参数将被转换为毫秒。
completionRoutine
参数指定指向可选完成例程的指针。
argToCompletionRoutine
参数指定要传递给可选完成例程的参数。
如果 resumeSystemIfSuspended
参数为 true
,则当可等待定时器变为信号状态时,系统将从挂起的电源保护模式恢复。
线程
线程也是一个同步对象,我们可以对其进行等待。Thread
对象在完成执行时进入其信号状态。
Thread
类可以像 std::thread
类一样使用,并提供额外的 Windows 特定功能。
注意:我们不应该使用 Win x86/x64 的 ExitThread(…)
或 c 运行时 _endthread(…)
和 _endthreadex(…)
函数来终止线程,因为此实现和 Visual C++ 当前提供的 std::thread
的实现都使用 std::unique_ptr<>
来保存指向包含线程参数的 tuple<…>
的指针,因此需要正确展开堆栈以允许 std::unique_ptr<>
的析构函数删除此元组。
Thread
类提供了一个替代方案,该方案是通过抛出不派生自 std::exception
的异常来实现的。这不是一个万能的“catch(…)
”就能捕获的万无一失的机制。
退出线程的最佳方法是直接从 thread
函数返回。
Thread thread( []( ) { return 5; } );
thread.join( );
auto exitCode = thread.ExitCode( );
printf( "Thread exited with exit code %d\n", exitCode );
输出
Thread exited with exit code 5
Visual C++ 提供的 std::thread
的实现会在其 join( )
的实现中关闭线程句柄,这使得无法对该对象执行进一步的操作。Thread
类将保留句柄,直到我们调用 Close()
,或该对象超出作用域。
构造函数
Thread
对象可以通过多种方式构造,默认构造函数创建一个空的、可移动赋值给另一个 Thread
的对象。
constexpr Thread( ) noexcept;
constexpr Thread( HANDLE handle, UInt32 threadId ) noexcept;
template <class Function, class... Args>
requires ( std::is_same_v<std::remove_cvref_t<Function>, Thread> == false )
explicit Thread( Function&& function, Args&&... args );
template <class Function, class... Args>
requires ( std::is_same_v<std::remove_cvref_t<Function>, Thread> == false )
explicit Thread( LPSECURITY_ATTRIBUTES securityAttributes,
Function&& function, Args&&... args );
第二个构造函数使用 handle
和 threadId
参数初始化 Thread
对象,假定它们是有效的。
最后两个构造函数创建一个新线程,使用传递给新线程的函数及其参数的衰减副本调用 std::invoke
。
securityAttributes
指定线程的 SECURITY_ATTRIBUTES
。当 securityAttributes
为 nullptr
时,线程将获得默认的安全描述符,其 ACL 来自创建线程的主令牌或模拟令牌。
用户模式同步
Windows API 提供了一套比我们到目前为止介绍的基于内核的机制更高效的同步机制。
当对象未锁定或锁在几千个 CPU 周期内释放时(大多数情况都是如此),用户模式同步机制可以避免昂贵的内核模式往返。线程开始真正的等待操作并放弃其剩余的时间片,仍然必须进入内核模式,因为只有在这里系统才能调度线程执行。
CriticalSection
CriticalSection
类封装了一个 CRITICAL_SECTION
结构,没有添加额外的数据成员。该类不可复制构造、不可复制赋值、不可移动构造或不可移动赋值。
当一个线程尝试获取一个已锁定临界区的锁时,该线程会忙等待,尝试获取临界区的锁而不放弃线程的当前时间片。如果在循环完成之前无法获取锁,线程将休眠以等待临界区被释放。
CriticalSection
实现了一种与互斥体对象类似的同步机制,但它只能用于同步单个进程中的线程。
size_t counter = 0;
CriticalSection criticalSection;
criticalSection.Enter( );
ThreadGroup threadGroup;
for ( int i = 0; i < 100; ++i )
{
threadGroup.Add( [i, &criticalSection, &counter]( )
{
auto id = i + 1;
for ( int i = 0; i < 10; ++i )
{
printf( "T%d: waiting\n", id );
std::unique_lock lock( criticalSection );
printf( "T%d: acquired mutex\n", id );
++counter;
printf( "T%d: value %zu\n", id, counter );
}
} );
}
criticalSection.Leave( );
puts( "Main thread waiting on background threads" );
threadGroup.join( );
printf( "Final value %zu\n", counter );
构造函数
CriticalSection
有一个构造函数。
explicit CriticalSection( UInt32 spinCount = DefaultSpinCount, bool noDebugInfo = true );
spinCount
参数指定临界区的自旋计数。
noDebugInfo
参数指定操作系统不应为临界区创建调试信息。
构造函数使用 InitializeCriticalSectionEx
来初始化 CRITICAL_SECTION
结构,如果 noDebugInfo
参数为 true
,则构造函数会将 CRITICAL_SECTION_NO_DEBUG_INFO
传递给 InitializeCriticalSectionEx
函数。
从 Vista、Windows Server 2008 开始,Microsoft 改变了 InitializeCriticalSection
的工作方式。
据我所理解,InitializeCriticalSection
、InitializeCriticalSectionAndSpinCount
和 InitializeCriticalSectionEx
(不带 CRITICAL_SECTION_NO_DEBUG_INFO
)现在会在进程地址空间中分配一些用于调试信息内存,而这些内存不会被 DeleteCriticalSection
释放。这会导致进程为每个删除的临界区泄漏少量内存。
TryEnter 和 try_lock
TryEnter
函数尝试在不阻塞的情况下获取临界区的锁。如果成功,调用线程将获得临界区的拥有权,并且必须调用 Leave()
或 unlock()
来释放锁。try_lock()
函数只调用 TryEnter()
。
Enter 和 lock
Enter()
函数在线程获得临界区锁的所有权时返回。lock()
函数只调用 Enter()
。
Leave 和 unlock
Leave()
函数释放临界区锁的所有权。Leave()
必须为每次成功调用 TryEnter()
或 Enter()
调用。
SlimReaderWriterLock
SlimReaderWriterLock
类封装了一个 SRWLOCK struct
,没有添加额外的数据成员。该类不可复制构造、不可复制赋值、不可移动构造或不可移动赋值。默认构造函数初始化 SRWLOCK
。
SlimReaderWriterLock
用于允许多个线程并发读取访问,同时确保当一个线程写入受保护的资源时,它将拥有独占访问权,阻塞其他写入者和读取者。
构造函数
SlimReaderWriterLock
有一个构造函数。
SlimReaderWriterLock( ) noexcept
构造函数调用 InitializeSRWLock(…)
来初始化对象的 SRWLOCK
结构。
AcquireExclusive 和 lock
以独占模式获取精简读/写锁。
AcquireShared 和 lock_shared
以共享模式获取精简读/写锁。
TryAcquireExclusive 和 try_lock
尝试以独占模式获取精简读/写锁。
TryAcquireShared 和 try_lock_shared
尝试以共享模式获取精简读/写锁。
ReleaseExclusive 和 unlock
释放以独占模式获取的锁。
ReleaseShared 和 unlock_shared
释放以共享模式获取的锁。
ConditionVariable
ConditionVariable
类封装了一个 CONDITION_VARIABLE struct
,没有添加额外的数据成员。该类不可复制构造、不可复制赋值、不可移动构造或不可移动赋值。默认构造函数初始化 CONDITION_VARIABLE
。
Condition
变量旨在让我们等待受 CriticalSection
或 SlimReaderWriterLock
保护的资源上的更改通知。
为了演示,这里有一个简单的多生产者、多消费者队列。
class SimpleQueue
{
ConditionVariable queueEmpty_;
ConditionVariable queueFull_;
CriticalSection criticalSection_;
size_t lastItemProduced_ = 0;
size_t queueSize_ = 0;
size_t startOffset_ = 0;
bool closed_ = false;
public:
static constexpr size_t MaxQueueSize = 50;
using Container = std::array<size_t, MaxQueueSize>;
private:
Container conatainer_;
public:
SimpleQueue( )
{
}
void Close( )
{
{
std::unique_lock lock( criticalSection_ );
closed_ = true;
}
queueEmpty_.WakeAll( );
queueFull_.WakeAll( );
}
bool Push( size_t item )
{
{
std::unique_lock lock( criticalSection_ );
while ( queueSize_ == MaxQueueSize && closed_ == false )
{
queueFull_.Sleep( criticalSection_ );
}
if ( closed_ )
{
return false;
}
auto containerOffset = ( startOffset_ + queueSize_ ) % MaxQueueSize;
conatainer_[containerOffset] = item;
queueSize_++;
}
queueEmpty_.Wake( );
return true;
}
private:
size_t PopValue( )
{
auto result = conatainer_[startOffset_];
queueSize_--;
startOffset_++;
if ( startOffset_ == MaxQueueSize )
{
startOffset_ = 0;
}
return result;
}
public:
bool Pop( size_t& item )
{
bool result = false;
{
std::unique_lock lock( criticalSection_ );
if ( closed_ == false )
{
while ( queueSize_ == 0 && closed_ == false )
{
queueEmpty_.Sleep( criticalSection_ );
}
}
if( queueSize_ )
{
item = PopValue( );
result = true;
}
}
if ( result && closed_ == false )
{
queueFull_.Wake( );
}
return result;
}
};
如果队列未关闭,则可以向队列推送项目;只要队列未关闭或队列中有项目,就可以弹出项目。
Push(…)
函数对队列获取独占锁,如果队列已满,它会通过调用 Sleep(criticalSection_)
在 ConditionVariable
queueFull_
上等待。Sleep(…)
原子地释放指定的临界区并初始化等待。在调用 Sleep(…)
完成之前,会重新获取临界区。这允许其他线程获取临界区并从队列中移除元素。在返回之前,Push(…)
函数调用 queueEmpty_.Wake( )
来释放一个等待队列中出现项目的线程。Pop(…)
的逻辑相同,只是该函数将在队列为空时等待,并在从队列中移除项目后通知等待的生产者有空间。
这个队列非常容易使用。
SimpleQueue queue;
std::atomic<size_t> generated;
std::atomic<size_t> consumed;
ThreadGroup producerThreadGroup;
ThreadGroup consumerThreadGroup;
for ( int i = 0; i < 4; ++i )
{
producerThreadGroup.Add( [i , &queue,&generated]( )
{
while ( queue.Push( 1 ) )
{
++generated;
}
printf( "Producer %d done.\n", i + 1 );
} );
}
for ( int i = 0; i < 4; ++i )
{
consumerThreadGroup.Add( [i, &queue, &consumed]( )
{
size_t value = 0;
while ( queue.Pop( value ) )
{
++consumed;
}
printf( "Consumer %d done.\n", i + 1 );
} );
}
puts( "Main thread going to sleep" );
CurrentThread::Sleep( TimeSpan::FromSeconds( 2 ) );
puts( "Main thread closing the queue" );
queue.Close( );
puts( "Main thread waiting on producer threads" );
producerThreadGroup.join( );
puts( "Main thread waiting on consumer threads" );
consumerThreadGroup.join( );
size_t sent = generated;
size_t received = consumed;
printf( "Result: produced %zu values and consumed %zu values\n", sent, received );
构造函数
ConditionVariable
有一个构造函数。
ConditionVariable( ) noexcept;
构造函数调用 InitializeConditionVariable(…)
来初始化对象的 CONDITION_VARIABLE
结构。
Wake
Wake()
函数唤醒一个正在等待条件变量的线程。
WakeAll
WakeAll()
函数唤醒所有正在等待条件变量的线程。
Sleep
Sleep
函数有以下重载。
bool Sleep( const CriticalSection& criticalSection,
DWORD timeoutInMillis = INFINITE ) const;
bool Sleep( const CriticalSection& criticalSection,
const TimeSpan& timeout ) const;
bool Sleep( const SlimReaderWriterLock& slimReaderWriterLock,
DWORD timeoutInMillis = INFINITE, bool sharedMode = false ) const;
bool Sleep( const SlimReaderWriterLock& slimReaderWriterLock,
const TimeSpan& timeout, bool sharedMode = false ) const;
第一个和第二个重载在指定的条件变量上休眠,并以原子操作释放指定的临界区。
第三个和第四个重载在指定的条件变量上休眠,并以原子操作释放指定的 SlimReaderWriterLock
。如果 sharedMode
为 true
,则锁以共享模式持有;否则,调用函数时锁必须以独占模式持有。
timeout
参数指定函数返回的时间间隔,无论 Sleep
的结果如何。如果发生超时,函数返回 false
;否则返回 true
。
SynchronizationBarrier
同步屏障允许多个线程等待,直到所有线程都到达一个执行阶段,然后它们等待最后一个线程到达,然后它们都继续执行。
SynchronizationBarrier
类封装了一个 SYNCHRONIZATION_BARRIER struct
,没有添加额外的数据成员。该类不可复制构造、不可复制赋值、不可移动构造或不可移动赋值。默认构造函数初始化 SYNCHRONIZATION_BARRIER
。
构造函数
SynchronizationBarrier
有一个构造函数。
explicit SynchronizationBarrier( UInt32 totalThreads, Int32 spinCount = -1 );
totalThreads
参数指定必须进入屏障的线程数量,然后才能允许所有线程继续。
spinCount
参数指定线程在等待其他线程到达屏障时自旋的次数。如果此参数为 -1
,则线程自旋 2000 次。当线程超过 spinCount
时,线程会阻塞,除非它使用 Enter(…)
并带 SynchronizationBarrierFlags::SpinOnly
调用。
Enter
Enter(…)
函数导致调用线程等待,直到所需数量的线程进入屏障。
TimerQueue
TimerQueue
是 Windows 定时器队列的包装器。该实现使得使用任何可调用对象指定回调变得容易,并且可以像向 std::thread
传递参数一样,将任意数量的参数传递给可调用实现。
size_t counter = 0;
EventWaitHandle event( true );
TimerQueue timerQueue;
auto timer = timerQueue.CreateTimer( 100, 100, TimerQueueTimerFlags::Default,
[&counter,&event]( )
{
counter++;
if ( counter == 5 )
{
event.Signal( );
}
} );
event.Wait( );
timer.Close( );
timerQueue.Close( );
结束
希望这对您有所帮助,并且我能够演示如何轻松使用 Windows 平台上使用 C++ 开发软件时可用的许多同步机制。所以,下次再见:祝您编码愉快。😊
历史
- 2020年9月8日
- 初次发布
- 2020年9月12日
- 添加了
ExampleSocketServer01
和ExampleSocketClient01
。
- 添加了
- 2020年10月6日
- 修复了 bug,清理了大部分单元测试。
- 2020年10月7日
- 为
Harlinn.Common.Core
库添加更多单元测试
- 为
- 2020年10月11日
- 为
Harlinn.Common.Core
库添加更多单元测试
- 为
- 2020年10月13日
- 为
Harlinn.Common.Core
库添加更多单元测试,并为Harlinn.Windows
库添加两个新示例。
- 为
- 2020年10月17日
- 修复
TimerQueue
和TimerQueueTimer
,为Harlinn.Common.Core
库添加更多单元测试
- 修复
- 2020年12月18日
- 修复
IO::FileStream
的错误 - 支持初始 HTTP 服务器开发
- 同步服务器:$(SolutionDir)Examples\Core\HTTP\Server\HttpServerEx01
- 异步服务器:$(SolutionDir)Examples\Core\HTTP\Server\HttpServerEx02
- 使用 Windows 线程池 API 简化了 Windows 可等待内核对象的异步 I/O、计时器、工作和事件:$(SolutionDir)Examples\Core\ThreadPools\HExTpEx01
- 修复
- 2021年1月1日
- 改进了对异步服务器开发的支持
- 用于处理套接字的新设计
- 基于概念的流实现
- 2021年2月11日
- Bug 修复
- 初始 C++ ODBC 支持
- 2021年2月25日
- 更新 LMDB
- 更新 xxHash
- 添加了使用 LMDB 对大型复杂键的超快速基于哈希的索引的初始实现
- 快速异步日志记录 - 接近完成 :-)
- 2021年3月3日
- 新的授权相关类
- SecurityId:SID 及相关操作的包装器
- ExplicitAccess:EXCPLICIT_ACCESS 的包装器
- Trustee:TRUSTEE 的包装器
- SecurityIdAndDomain:保存 LookupAccountName 的结果
- LocalUniqueId:LUID 的包装器
- AccessMask:轻松检查分配给 ACCESS_MASK 的权限
- AccessMaskT<>
- EventWaitHandleAccessMask:检查和操作 EventWaitHandle 的权限。
- MutexAccessMask:检查和操作 Mutex 的权限。
- SemaphoreAccessMask:检查和操作 Semaphore 的权限。
- WaitableTimerAccessMask:检查和操作 WaitableTimer 的权限。
- FileAccessMask:检查和操作与文件相关的权限。
- DirectoryAccessMask:检查和操作目录相关的权限。
- PipeAccessMask:检查和操作管道相关的权限。
- ThreadAccessMask:检查和操作线程相关的权限。
- ProcessAccessMask:检查和操作进程相关的权限。
- AccessMaskT<>
- GenericMapping:GENERIC_MAPPING 的包装器
- AccessControlEntry:这是一组包装 ACE 结构的小型类
- AccessControlEntryBase<,>
- AccessAllowedAccessControlEntry
- AccessDeniedAccessControlEntry
- SystemAuditAccessControlEntry
- SystemAlarmAccessControlEntry
- SystemResourceAttributeAccessControlEntry
- SystemScopedPolicyIdAccessControlEntry
- SystemMandatoryLabelAccessControlEntry
- SystemProcessTrustLabelAccessControlEntry
- SystemAccessFilterAccessControlEntry
- AccessDeniedCallbackAccessControlEntry
- SystemAuditCallbackAccessControlEntry
- SystemAlarmCallbackAccessControlEntry
- ObjectAccessControlEntryBase<,>
- AccessAllowedObjectAccessControlEntry
- AccessDeniedObjectAccessControlEntry
- SystemAuditObjectAccessControlEntry
- SystemAlarmObjectAccessControlEntry
- AccessAllowedCallbackObjectAccessControlEntry
- AccessDeniedCallbackObjectAccessControlEntry
- SystemAuditCallbackObjectAccessControlEntry
- SystemAlarmCallbackObjectAccessControlEntry
- AccessControlEntryBase<,>
- AccessControlList:ACL 的包装器
- PrivilegeSet:PRIVILEGE_SET 的包装器
- SecurityDescriptor:SECURITY_DESCRIPTOR 包装器的早期实现
- SecurityAttributes:SECURITY_ATTRIBUTES 包装器的非常早期的实现
- Token:访问令牌包装器的早期实现
- DomainObject
- User:本地、工作组或域用户信息
- Computer:本地、工作组或域计算机信息
- Group:本地、工作组或域组
- Users:User 对象向量
- Groups:Group 对象向量
- 新的授权相关类
- 2021年3月14日 - 更多安全相关工作的进展:
- Token:Windows 访问令牌的包装器,包含许多支持类,例如
- TokenAccessMask:Windows 访问令牌访问权限的访问掩码实现。
- TokenGroups:Windows TOKEN_GROUPS 类型的包装器/二进制兼容替代品,具有 C++ 容器风格的接口。
- TokenPrivileges:TOKEN_PRIVILEGES 类型的包装器/二进制兼容替代品,具有 C++ 容器风格的接口。
- TokenStatistics:Windows TOKEN_STATISTICS 类型的二进制兼容替代品,使用库实现的类型,如 LocalUniqueId、TokenType 和 ImpersonationLevel。
- TokenGroupsAndPrivileges:Windows TOKEN_GROUPS_AND_PRIVILEGES 类型的包装器/二进制兼容替代品。
- TokenAccessInformation:Windows TOKEN_ACCESS_INFORMATION 类型的包装器/二进制兼容替代品。
- TokenMandatoryLabel:Windows TOKEN_MANDATORY_LABEL 类型的包装器。
- SecurityPackage:提供对 Windows 安全包信息的访问。
- SecurityPackages:一个 std::unordered_map,包含有关系统上安装的安全包的信息。
- CredentialsHandle:Windows CredHandle 类型的包装器。
- SecurityContext:Windows CtxtHandle 类型的包装器
- Crypto::Blob and Crypto::BlobT:C++ 风格的 _CRYPTOAPI_BLOB 替代品
- CertificateContext:Windows PCCERT_CONTEXT 类型的包装器,提供对 X.509 证书的访问。
- CertificateChain:Windows PCCERT_CHAIN_CONTEXT 类型的包装器,包含一个简单的证书链数组和一个信任状态结构,指示所有连接的简单链的汇总有效性数据。
- ServerOcspResponseContext:包含一个编码的 OCSP 响应。
- ServerOcspResponse:表示与服务器证书链关联的 OCSP 响应的句柄。
- CertificateChainEngine:代表应用程序的证书链引擎。
- CertificateTrustList:Windows PCCTL_CONTEXT 类型的包装器,包含 CTL 的编码和解码表示。它还包含一个打开的 HCRYPTMSG 句柄,用于已解码的、加密签名的消息,该消息将 CTL_INFO 作为其内部内容。
- CertificateRevocationList:包含证书吊销列表 (CRL) 的编码和解码表示
- CertificateStore:证书、证书吊销列表 (CRL) 和证书信任列表 (CTL) 的存储。
- Token:Windows 访问令牌的包装器,包含许多支持类,例如
- 2021年3月23日
- 更新至 Visual Studio 16.9.2
- 构建修复
SecurityDescriptor:
实现了安全描述符的序列化,实现了授权数据的持久化。