65.9K
CodeProject 正在变化。 阅读更多。
Home

Win32 线程池和 C++11:一个快速封装

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.79/5 (12投票s)

2015年7月26日

CPOL

2分钟阅读

viewsIcon

23003

通过单个 C++ 11 类使用 Windows 新的 ThreadPool

我的多线程库的一部分:https://github.com/WindowsNT/mt

引言

Windows 有一个新的 Threadpool API,其接口有些混乱(参见示例代码)。这是一个简化其用法的类。

术语

  • 一个 Work 项是一个可以在后台运行的工作线程。
  • 一个 IO 项是一个在 Handle 中发生 I/O 时得到通知的工作线程。
  • 一个 Timer 项是一个在设置的时间之后触发的工作线程。
  • 一个 Wait 项是一个在某个对象发出信号时得到通知的工作线程。
  • 一个 Cleanup Group 是 API 用于在等待所有句柄之后释放它们的手段。

两种模式

    template <bool AutoDestruct = true>
    class tpool

该类有两种模式可用,一种的销毁由 API 处理,另一种的销毁通过类的智能指针处理。

  • 当你想使用 Cleanup Group 时,使用 <true>。这让 API 可以跟踪你的对象,并且通过一个简单的 Join(),你可以等待所有对象完成。
  • 当你想自己管理这些项时,使用 <false>。这允许你使用你想等待的项来调用 Join(),取消特定的项,等等。

类 Handle

    template <typename T,typename Destruction = destruction_policy<T>>
    class handle 

这个智能指针管理这些项。如果你用 true 实例化 tpool,这些句柄会自动销毁它们的项,借助每个类型的模板特化。

    template<>    class destruction_policy<PTP_POOL> 
    { public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
    template<>    class destruction_policy<PTP_WORK> 
    { public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
    template<>    class destruction_policy<PTP_WAIT> 
    { public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
    template<>    class destruction_policy<PTP_TIMER> 
    { public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
    template<>    class destruction_policy<PTP_IO> 
    { public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
    template<>    class destruction_policy<PTP_CLEANUP_GROUP> 
    { public: static void destruct(PTP_CLEANUP_GROUP h) { CloseThreadpoolCleanupGroup(h); } }; 

该句柄实现了移动/复制语义等,借助 std::shared_ptr

tpool::Create

bool Create(unsigned long nmin = 1,unsigned long nmax = 1) 

创建所需的接口。你可以传递你需要的最小和最大线程数。

项创建

有四种特化来创建你的项

template <> handle<PTP_WORK> 
  CreateItem<PTP_WORK,PTP_WORK_CALLBACK>(PTP_WORK_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_WAIT> 
  CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>(PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_TIMER> 
  CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>(PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE);
template <> handle<PTP_IO> 
  CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>(PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY);

每个调用都接受要执行的回调,以及传递给回调的参数。CreateItem<PTP_IO> 还接受用于 I/O 操作的 HANDLE 值。

项运行

有三种特化来运行你的项(Wait 对象不 "运行")。借助 std::tuple,我们可以使用具有不同参数的相同函数签名

template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>);
template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,std::tuple<FILETIME*,DWORD,DWORD>t);
template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t);

RunItem<PTP_TIMER> 还接受计时器参数(参见SetThreadpoolTimer),而 RunItem<PTP_IO> 接受一个 booltrue 用于取消 I/O,或 false 用于启动它。

项等待

有四种特化来等待你的项

template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel);
template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel);
template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel);
template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel); 

Cancel 中传递 true 强制该函数在对象尚未启动的情况下将其杀死。

Joining (汇合)

借助类型特征,我们以两种方式定义 join 函数,具体取决于 tpool 是否使用 truefalse 实例化。

template <bool Q = AutoDestruct>
typename std::enable_if<Q,void>::type
Join(bool Cancel = false); 

对于自动销毁的情况,Join 等待你当前所有项完成。如果 Canceltrue,则所有尚未启动的项都将被取消。

            template <bool Q = AutoDestruct>
            typename std::enable_if<!Q,void>::type
                Join(bool Cancel = false,
                std::initializer_list<handle<PTP_WORK>> h1 = std::initializer_list<handle<PTP_WORK>>({}),
                std::initializer_list<handle<PTP_TIMER>> h2 = std::initializer_list<handle<PTP_TIMER>>({}),
                std::initializer_list<handle<PTP_WAIT>> h3 = std::initializer_list<handle<PTP_WAIT>>({}),
                std::initializer_list<handle<PTP_IO>> h4 = std::initializer_list<handle<PTP_IO>>({})
                ); 

对于手动销毁的情况,Join 等待你指定的项完成。如果 Canceltrue,则所有尚未启动的项都将被取消。

最终代码

#include <windows.h>
#include <functional>
#include <memory>
#include <type_traits>

// -------------------------
namespace tpoollib
    {
    // Handles template
    
    // Destruction Policy    
    template<typename T>
    class destruction_policy
        {
        public:
            static void destruct(T h)
                {
                static_assert(false,"Must define destructor");
                }
        };

    // Policies Specialization
    template<>    class destruction_policy<PTP_POOL> 
        { public: static void destruct(PTP_POOL h) { CloseThreadpool(h); } };
    template<>    class destruction_policy<PTP_WORK> 
        { public: static void destruct(PTP_WORK h) { CloseThreadpoolWork(h); } };
    template<>    class destruction_policy<PTP_WAIT> 
        { public: static void destruct(PTP_WAIT h) { CloseThreadpoolWait(h); } };
    template<>    class destruction_policy<PTP_TIMER> 
        { public: static void destruct(PTP_TIMER h) { CloseThreadpoolTimer(h); } };
    template<>    class destruction_policy<PTP_IO> 
        { public: static void destruct(PTP_IO h) { CloseThreadpoolIo(h); } };
    template<>    class destruction_policy<PTP_CLEANUP_GROUP> 
        { public: static void destruct(PTP_CLEANUP_GROUP h) 
        { CloseThreadpoolCleanupGroup(h); } };
    
    // Template for Handles
    template <typename T,typename Destruction = destruction_policy<T>>
    class handle
        {
        private:
            T hX = 0;
            bool NoDestruct = true;
            std::shared_ptr<size_t> ptr = std::make_shared<size_t>();

        public:

            // Closing items
            void Close()
                {
                if (!ptr || !ptr.unique())
                    {
                    ptr.reset();
                    return;
                    }
                ptr.reset();
                if (hX != 0 && !NoDestruct)
                    Destruction::destruct(hX);
                hX = 0;
                }

            handle()
                {
                hX = 0;
                }
            ~handle()
                {
                Close();
                }
            handle(const handle& h)
                {
                Dup(h);
                }
            handle(handle&& h)
                {
                Move(std::forward<handle>(h));
                }
            handle(T hY,bool NoDestructOnClose)
                {
                hX = hY;
                NoDestruct = NoDestructOnClose;
                }

            handle& operator =(const handle& h)
                {
                Dup(h);
                return *this;
                }
            handle& operator =(handle&& h)
                {
                Move(std::forward<handle>(h));
                return *this;
                }

            void Dup(const handle& h)
                {
                Close();
                NoDestruct = h.NoDestruct;
                hX = h.hX;
                ptr = h.ptr;
                }
            void Move(handle&& h)
                {
                Close();
                hX = h.hX;
                ptr = h.ptr;
                NoDestruct = h.NoDestruct;
                h.ptr.reset();
                h.hX = 0;
                h.NoDestruct = false;
                }
            operator T() const
                {
                return hX;
                }

        };

    template <bool AutoDestruct = true>
    class tpool
        {
        private:
            handle<PTP_POOL> p;
            handle<PTP_CLEANUP_GROUP> pcg;
            TP_CALLBACK_ENVIRON env;

            tpool(const tpool&) = delete;
            tpool(tpool&&) = delete;
            void operator=(const tpool&) = delete;
            void operator=(tpool&&) = delete;

        public:

            tpool()
                {
                }

            ~tpool()
                {
                End();
                }

            void End()
                {
                Join();
                DestroyThreadpoolEnvironment(&env);
                p.Close();
                }

            // Creates the interfaces
            bool Create(unsigned long nmin = 1,unsigned long nmax = 1)
                { 
                bool jauto = AutoDestruct;

                // Env
                InitializeThreadpoolEnvironment(&env);

                // Pool and Min/Max
                handle<PTP_POOL> cx(CreateThreadpool(0),false);
                p = cx;
                if (!p)
                    {
                    End();
                    return false;
                    }
                if (!SetThreadpoolThreadMinimum(p,nmin))
                    {
                    End();
                    return false;
                    }
                SetThreadpoolThreadMaximum(p,nmax);

                // Cleanup Group
                if (jauto)
                    {
                    handle<PTP_CLEANUP_GROUP> cx(CreateThreadpoolCleanupGroup(),false);
                    pcg = cx;
                    if (!pcg)
                        {
                        End();
                        return false;
                        }
                    }

                // Sets
                SetThreadpoolCallbackPool(&env,p);
                SetThreadpoolCallbackCleanupGroup(&env,pcg,0);

                return true;
                }

            // Templates for each of the items, to be specialized later
            template <typename J>
            void Wait(handle<J> h,bool Cancel = false)
                {
                static_assert(false,"No Wait function is defined");
                }
            template <typename J,typename CB_J>
            handle<J> CreateItem(CB_J cb,PVOID opt = 0,HANDLE hX = 0)
                {
                static_assert(false,"No Create function is defined");
                }
            template <typename J,typename ...A>
            void RunItem(handle<J> h,std::tuple<A...> = std::make_tuple<>())
                {
                static_assert(false,"No Run function is defined");
                }

            // Work Stuff
            template <> handle<PTP_WORK> CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                        (PTP_WORK_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WORK> a(CreateThreadpoolWork(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_WORK>(handle<PTP_WORK> h,std::tuple<>)
                {
                SubmitThreadpoolWork(h);
                }
            template <> void Wait<PTP_WORK>(handle<PTP_WORK> h,bool Cancel)
                {
                WaitForThreadpoolWorkCallbacks(h,Cancel);
                }

            // Wait  stuff
            template <> handle<PTP_WAIT> CreateItem<PTP_WAIT,PTP_WAIT_CALLBACK>
                        (PTP_WAIT_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_WAIT> a(CreateThreadpoolWait(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void Wait<PTP_WAIT>(handle<PTP_WAIT> h,bool Cancel)
                {
                WaitForThreadpoolWaitCallbacks(h,Cancel);
                }

            // Timer stuff
            template <> handle<PTP_TIMER> CreateItem<PTP_TIMER,PTP_TIMER_CALLBACK>
                        (PTP_TIMER_CALLBACK cb,PVOID opt,HANDLE)
                {
                handle<PTP_TIMER> a(CreateThreadpoolTimer(cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_TIMER>(handle<PTP_TIMER> h,
                             std::tuple<FILETIME*,DWORD,DWORD>t)
                {
                SetThreadpoolTimer(h,std::get<0>(t),std::get<1>(t),std::get<2>(t));
                }
            template <> void Wait<PTP_TIMER>(handle<PTP_TIMER> h,bool Cancel)
                {
                WaitForThreadpoolTimerCallbacks(h,Cancel);
                }

            // IO Stuff
            template <> handle<PTP_IO> CreateItem<PTP_IO,PTP_WIN32_IO_CALLBACK>
                        (PTP_WIN32_IO_CALLBACK cb,PVOID opt,HANDLE hY)
                {
                handle<PTP_IO> a(CreateThreadpoolIo(hY,cb,opt,&env),AutoDestruct);
                return a;
                }
            template <> void RunItem<PTP_IO>(handle<PTP_IO> h,std::tuple<bool> t)
                {
                bool Cancel = std::get<0>(t);
                if (Cancel)
                    CancelThreadpoolIo(h);
                else
                    StartThreadpoolIo(h);
                }
            template <> void Wait<PTP_IO>(handle<PTP_IO> h,bool Cancel)
                {
                WaitForThreadpoolIoCallbacks(h,Cancel);
                }

            // Join functions, one for each option (AutoDestruct or not)
            template <bool Q = AutoDestruct>
            typename std::enable_if<Q,void>::type
            Join(bool Cancel = false)
                {
                if (pcg)
                    {
                    CloseThreadpoolCleanupGroupMembers(pcg,Cancel,0);
                    pcg.Close();
                    }
                }

            template <bool Q = AutoDestruct>
            typename std::enable_if<!Q,void>::type
                Join(bool Cancel = false,
                std::initializer_list<handle<PTP_WORK>> h1 = 
                                 std::initializer_list<handle<PTP_WORK>>({}),
                std::initializer_list<handle<PTP_TIMER>> h2 = 
                                 std::initializer_list<handle<PTP_TIMER>>({}),
                std::initializer_list<handle<PTP_WAIT>> h3 = 
                                 std::initializer_list<handle<PTP_WAIT>>({}),
                std::initializer_list<handle<PTP_IO>> h4 = 
                                 std::initializer_list<handle<PTP_IO>>({})
                )
                {
                for (auto a : h1)
                    Wait<PTP_WORK>(a,Cancel);
                for (auto a : h2)
                    Wait<PTP_TIMER>(a,Cancel);
                for (auto a : h3)
                    Wait<PTP_WAIT>(a,Cancel);
                for (auto a : h4)
                    Wait<PTP_IO>(a,Cancel);
                }
        };
    } 

示例用法

为了简单起见,已删除错误检查。

using namespace tpoollib;
int __stdcall WinMain(HINSTANCE, HINSTANCE, LPSTR, int)
    {
    CoInitializeEx(0, COINIT_APARTMENTTHREADED);
    

    // Auto-Destruction of items
        {
        tpool<true> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                      ([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join();
        }

    // Manual Destruction of items
        {
        tpool<false> t;
        t.Create();
        auto workit = t.CreateItem<PTP_WORK,PTP_WORK_CALLBACK>
                      ([] (PTP_CALLBACK_INSTANCE,PVOID,PTP_WORK)
            {
            Sleep(((rand() % 5) + 1) * 1000);
            return;
            },0);
        for (int i = 0; i < 3; i++)
            t.RunItem(workit);
        t.Join(true,{workit});
        }

    return 0;
}

历史

  • 2015年7月26日 - 首次发布
© . All rights reserved.