Win32 线程池和 C++11:一个快速封装






4.79/5 (12投票s)
通过单个 C++ 11 类使用 Windows 新的 ThreadPool
我的多线程库的一部分:https://github.com/WindowsNT/mt
引言
Windows 有一个新的 Threadpool
API,其接口有些混乱(参见示例代码)。这是一个简化其用法的类。
术语
- 一个 Work 项是一个可以在后台运行的工作线程。
- 一个 IO 项是一个在 Handle 中发生 I/O 时得到通知的工作线程。
- 一个 Timer 项是一个在设置的时间之后触发的工作线程。
- 一个 Wait 项是一个在某个对象发出信号时得到通知的工作线程。
- 一个 Cleanup Group 是 API 用于在等待所有句柄之后释放它们的手段。
两种模式
template <bool AutoDestruct = true>
class tpool
该类有两种模式可用,一种的销毁由 API 处理,另一种的销毁通过类的智能指针处理。
- 当你想使用 Cleanup Group 时,使用
<true>
。这让 API 可以跟踪你的对象,并且通过一个简单的Join()
,你可以等待所有对象完成。 - 当你想自己管理这些项时,使用
<false>
。这允许你使用你想等待的项来调用Join()
,取消特定的项,等等。
类 Handle
template <typename T,typename Destruction = destruction_policy<T>>
class handle
这个智能指针管理这些项。如果你用 true
实例化 tpool
,这些句柄会自动销毁它们的项,借助每个类型的模板特化。
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } };
该句柄实现了移动/复制语义等,借助 std::shared_ptr
。
tpool::Create
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
创建所需的接口。你可以传递你需要的最小和最大线程数。
项创建
有四种特化来创建你的项
template <> handle<PTP_WORK>
CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT>
CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER>
CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO>
CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);
每个调用都接受要执行的回调,以及传递给回调的参数。CreateItem<PTP_IO>
还接受用于 I/O 操作的 HANDLE
值。
项运行
有三种特化来运行你的项(Wait
对象不 "运行")。借助 std::tuple
,我们可以使用具有不同参数的相同函数签名
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);
RunItem<PTP_TIMER>
还接受计时器参数(参见SetThreadpoolTimer),而 RunItem<PTP_IO>
接受一个 bool
,true
用于取消 I/O,或 false
用于启动它。
项等待
有四种特化来等待你的项
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel);
在 Cancel
中传递 true
强制该函数在对象尚未启动的情况下将其杀死。
Joining (汇合)
借助类型特征,我们以两种方式定义 join
函数,具体取决于 tpool
是否使用 true
或 false
实例化。
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false);
对于自动销毁的情况,Join
等待你当前所有项完成。如果 Cancel
为 true
,则所有尚未启动的项都将被取消。
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
);
对于手动销毁的情况,Join
等待你指定的项完成。如果 Cancel
为 true
,则所有尚未启动的项都将被取消。
最终代码
#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>
// -------------------------
namespace tpoollib
{
// Handles template
// Destruction Policy
template<typename T>
class destruction_policy
{
public:
static void destruct(T h)
{
static_assert(false,"Must define destructor");
}
};
// Policies Specialization
template<> class destruction_policy<PTP_POOL>
{ public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
template<> class destruction_policy<PTP_WORK>
{ public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
template<> class destruction_policy<PTP_WAIT>
{ public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
template<> class destruction_policy<PTP_TIMER>
{ public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
template<> class destruction_policy<PTP_IO>
{ public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
template<> class destruction_policy<PTP_CLEANUP_GROUP>
{ public: static void destruct(PTP_CLEANUP_GROUP h)
{ CloseThreadpoolCleanupGroup(h); } };
// Template for Handles
template <typename T,typename Destruction = destruction_policy<T>>
class handle
{
private:
T hX = 0;
bool NoDestruct = true;
std::shared_ptr<size_t> ptr = std::make_shared<size_t>();
public:
// Closing items
void Close()
{
if (!ptr || !ptr.unique())
{
ptr.reset();
return;
}
ptr.reset();
if (hX != 0 && !NoDestruct)
Destruction::destruct(hX);
hX = 0;
}
handle()
{
hX = 0;
}
~handle()
{
Close();
}
handle(const handle& h)
{
Dup(h);
}
handle(handle&& h)
{
Move(std::forward<handle>(h));
}
handle(T hY,bool NoDestructOnClose)
{
hX = hY;
NoDestruct = NoDestructOnClose;
}
handle& operator =(const handle& h)
{
Dup(h);
return *this;
}
handle& operator =(handle&& h)
{
Move(std::forward<handle>(h));
return *this;
}
void Dup(const handle& h)
{
Close();
NoDestruct = h.NoDestruct;
hX = h.hX;
ptr = h.ptr;
}
void Move(handle&& h)
{
Close();
hX = h.hX;
ptr = h.ptr;
NoDestruct = h.NoDestruct;
h.ptr.reset();
h.hX = 0;
h.NoDestruct = false;
}
operator T() const
{
return hX;
}
};
template <bool AutoDestruct = true>
class tpool
{
private:
handle<PTP_POOL> p;
handle<PTP_CLEANUP_GROUP> pcg;
TP_CALLBACK_ENVIRON env;
tpool(const tpool&) = delete;
tpool(tpool&&) = delete;
void operator=(const tpool&) = delete;
void operator=(tpool&&) = delete;
public:
tpool()
{
}
~tpool()
{
End();
}
void End()
{
Join();
DestroyThreadpoolEnvironment(&env);
p.Close();
}
// Creates the interfaces
bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
{
bool jauto = AutoDestruct;
// Env
InitializeThreadpoolEnvironment(&env);
// Pool and Min/Max
handle<PTP_POOL> cx(CreateThreadpool(0),false);
p = cx;
if (!p)
{
End();
return false;
}
if (!SetThreadpoolThreadMinimum(p,nmin))
{
End();
return false;
}
SetThreadpoolThreadMaximum(p,nmax);
// Cleanup Group
if (jauto)
{
handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
pcg = cx;
if (!pcg)
{
End();
return false;
}
}
// Sets
SetThreadpoolCallbackPool(&env,p);
SetThreadpoolCallbackCleanupGroup(&env,pcg,0);
return true;
}
// Templates for each of the items, to be specialized later
template <typename J>
void Wait(handle<J> h,bool Cancel = false)
{
static_assert(false,"No Wait function is defined");
}
template <typename J,typename CB_J>
handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
{
static_assert(false,"No Create function is defined");
}
template <typename J,typename ...A>
void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
{
static_assert(false,"No Run function is defined");
}
// Work Stuff
template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
{
SubmitThreadpoolWork(h);
}
template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
{
WaitForThreadpoolWorkCallbacks(h,Cancel);
}
// Wait stuff
template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>
(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
return a;
}
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
{
WaitForThreadpoolWaitCallbacks(h,Cancel);
}
// Timer stuff
template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>
(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
{
handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,
std::tuple<FILETIME*,DWORD,DWORD>t)
{
SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
}
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
{
WaitForThreadpoolTimerCallbacks(h,Cancel);
}
// IO Stuff
template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>
(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
{
handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
return a;
}
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
{
bool Cancel = std::get<0>(t);
if (Cancel)
CancelThreadpoolIo(h);
else
StartThreadpoolIo(h);
}
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
{
WaitForThreadpoolIoCallbacks(h,Cancel);
}
// Join functions, one for each option (AutoDestruct or not)
template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false)
{
if (pcg)
{
CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
pcg.Close();
}
}
template <bool Q = AutoDestruct>
typename std::enable_if<!Q,void>::type
Join(bool Cancel = false,
std::initializer_list<handle<PTP_WORK>> h1 =
std::initializer_list<handle<PTP_WORK>>({}),
std::initializer_list<handle<PTP_TIMER>> h2 =
std::initializer_list<handle<PTP_TIMER>>({}),
std::initializer_list<handle<PTP_WAIT>> h3 =
std::initializer_list<handle<PTP_WAIT>>({}),
std::initializer_list<handle<PTP_IO>> h4 =
std::initializer_list<handle<PTP_IO>>({})
)
{
for (auto a : h1)
Wait<PTP_WORK>(a,Cancel);
for (auto a : h2)
Wait<PTP_TIMER>(a,Cancel);
for (auto a : h3)
Wait<PTP_WAIT>(a,Cancel);
for (auto a : h4)
Wait<PTP_IO>(a,Cancel);
}
};
}
示例用法
为了简单起见,已删除错误检查。
using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
{
CoInitializeEx(0, COINIT_APARTMENTTHREADED);
// Auto-Destruction of items
{
tpool<true> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join();
}
// Manual Destruction of items
{
tpool<false> t;
t.Create();
auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
{
Sleep(((rand() % 5) + 1) * 1000);
return;
},0);
for (int i = 0; i < 3; i++)
t.RunItem(workit);
t.Join(true,{workit});
}
return 0;
}
历史
- 2015年7月26日 - 首次发布