65.9K
CodeProject 正在变化。 阅读更多。
Home

具有暂停/恢复功能的 数据处理线程

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (12投票s)

2013 年 4 月 1 日

CPOL

7分钟阅读

viewsIcon

41122

downloadIcon

796

暂停/恢复线程功能有助于避免因无效内存访问和竞态条件引起的问题。

动机

许多通信应用程序,例如代理服务器,都有一个主控制线程和几个数据处理线程。控制线程负责数据线程的配置和管理。应用程序可以维护一个数据线程的集合(池);集合的大小通常适合 CPU 的数量。数据线程执行时间关键的任务。例如,数据线程从可配置的套接字列表中读取消息,使用字典解析它们,处理它们,然后将它们传递给另一个线程进行发送。在描述的场景中,数据线程几乎独占地访问其可配置数据(例如套接字列表或解析字典):大部分时间只有它会触及可配置数据。只有在罕见的重新配置/关闭事件中,主控制线程才会修改线程的可配置数据。始终用互斥锁保护线程数据将非常低效:事实上,大部分时间都不需要保护,但每次数据线程从套接字列表中读取消息或使用字典处理它时,都会执行昂贵的互斥锁获取操作。

“利益冲突”可以通过多种方式解决。暂停/恢复功能是其中一种可重用的解决方案。

接口描述

working_thread 是暂停/恢复功能的跨平台(Windows/Linux)实现。该类(与 BOOST/C++11 线程不同)利用了线程类实现的经典方法:派生类必须实现一个将在线程上下文中调用的纯虚函数。

class working_thread
{
public:
 
    enum state_type
    {
        init, //! thread did not start yet
        paused, //! thread is paused 
        running, //! thread is running 
        completed //! thread function is completed 
    };
 
public:
    working_thread
    (
        priority_type p=priority_type(), 
        size_t stack_size=DEFAULT_STACK_VALUE
    )
  void stop(bool force_interrupt=false);
    bool start();
    bool pause();
    bool resume();
    void join();
    handle_type handle();
    id_type id() const;
    state_type state() const;
protected:
    virtual bool action()=0;
    virtual void on_start(){}
    virtual void on_exit(){}
 
    virtual void on_interrupt(){}
    bool is_interrupted() const; 
 
};

构造函数创建一个具有给定优先级和堆栈大小的非活动线程对象。start() 方法启动一个新的系统线程;stop() 方法关闭线程;pause() 方法使线程休眠;resume() 方法唤醒它。join() 方法等待直到线程的主函数完成。派生类必须实现纯虚方法 action(),该方法将在运行的系统线程上下文中反复调用。该方法必须返回 true 才能再次调用。如果返回 false,线程的主函数将退出。派生类还可以实现另外两个虚方法:on_start()on_exit(),分别在线程启动和停止时调用。

所有 start()stop()pause()resume() 方法都是阻塞的。它们将等待直到相应操作完成:void stop() 在系统线程的主函数结束后返回。bool start() 方法在线程对象处于运行状态后返回,或者在发现线程无法启动(因为线程的主函数已结束)后返回。如果线程无法启动,bool start() 方法将返回 false。对于 pause()resume() 方法也是如此:pause() 方法使线程主函数休眠并等待操作完成;如果线程无法休眠,该方法返回 falseresume() 方法唤醒线程主函数并等待操作完成;如果操作无法完成,该方法返回 false。返回 false 的唯一原因是线程的主函数已结束且线程对象处于已完成状态。

该接口有几个优点

  • 控制线程可以在修改可配置数据之前“暂停”数据线程;在数据线程休眠时修改线程的可配置数据是安全的,并且不需要互斥锁。
  • 所有公开的方法都是阻塞的。这使我们能够避免许多因无效内存访问(例如,线程对象已被销毁但线程的主函数仍在运行)和竞态条件引起的意外情况。

暂停/恢复功能实现需要派生类进行一定的协作:它对 action() 实现提出了以下要求

  • action() 方法通过返回 truefalse 来控制线程执行。如果函数返回 true,它将被再次调用。如果函数返回 false,线程的主函数将退出。
  • action() 方法必须在其执行中中断,如果 is_interrupted() 标志设置为 true。这并不意味着线程的主函数将退出。

action() 函数可以通过多种方式满足第二个要求。action() 函数可以周期性地检查 is_interrupted() 标志。检查周期决定了 stop()pause() 方法的等待时间。例如(伪代码)

virtual bool action()
{
   while(!is_interrupted())
   {
     select(socket_list, time_out_value);
     if(data_arrived)
     {
       ...
     } 
   }
 
   return true;
}

另一种方法是实现 on_interrupt() 虚方法。每次调用 stop()pause() 方法时都会调用此回调。回调实现应用于强制 action() 方法检查 is_interrupted() 标志。例如(伪代码)

virtual bool on_interrupt()
{
   signal(special_handle);
}
 
virtual bool action()
{
   while(!is_interrupted())
   {
     select(socket_list, special_handle)//!
     if(data_arrived)
     {
 
       ...
     } 
   }
 
   return true;
}

此外,stop()(但不包括 pause())方法可以使用 force_interrupt 标志:stop(true)。此标志将中断 action() 代码在 BOOST 线程中断点的等待。

  • boost::thread::join()
  • boost::thread::timed_join()
  • boost::thread::try_join_for()
  • boost::thread::try_join_until()
  • boost::condition_variable::wait()
  • boost::condition_variable::timed_wait()
  • boost::condition_variable::wait_for()
  • boost::condition_variable::wait_until()
  • boost::condition_variable_any::wait()
  • boost::condition_variable_any::timed_wait()
  • boost::condition_variable_any::wait_for()
  • boost::condition_variable_any::wait_until()
  • boost::thread::sleep()
  • boost::this_thread::sleep_for()
  • boost::this_thread::sleep_until()
  • boost::this_thread::interruption_point()

强烈建议派生类在其析构函数中调用 stop() 方法——这将确保线程的主函数在类虚表被销毁之前完成。这一点至关重要,因为线程的主函数会调用类的虚方法(action()on_start()on_exit());这就是为什么 stop() 函数应该在派生类中调用,而不是在基类 working_thread 中调用。

批量操作。与线程池协同工作

当应用程序使用线程池时,阻塞接口并非总是高效。假设 action() 方法每秒检查一次 is_interrrupted() 标志。这将导致 stop() 方法的等待时间超过 1 秒,这是可以接受的。假设应用程序有 10 个数据线程。连续调用所有 10 个线程的 stop() 方法将花费 10 秒。这太长了。

总等待时间最多可以减少到 1 秒,前提是应用程序首先向每个线程发出退出信号,然后等待所有线程完成。然而,这种优化需要我们不想暴露的非阻塞接口。

为了在不暴露非阻塞接口的情况下允许批量操作,我们可以引入辅助类 bulk。该类作为“友元”访问 working_thread 的非阻塞接口,但它本身只暴露阻塞接口。

struct bulk
{
     bulk(…);
 
     void start();
     void stop(bool force_interrupt);
     void pause();
     void resume();
     size_t count(state);
…
};

构造函数接受 working_thread 对象集合。start()stop()pause()resume() 函数使用隐藏的非阻塞 working_thread 接口来要求每个线程对象开始启动/停止/暂停/恢复操作,然后等待所有线程完成操作。

实现

实现很简单。主要思想是在线程的主函数中安排等待状态,当 action() 方法不被调用时。

此外,阻塞函数 start()/stop()/pause()/resume() 在条件变量上等待,并且每次线程对象更改其状态时都会发出信号。

实现使用了 BOOST 的线程类,但这并非必需。

线程的主函数实现如下:

void working_thread::main()
{
    
    // signal that the thread is running
    signal_state(running);
    // perform on-start custom action
    on_start();
 
    // can throw const boost::thread_interrupted
    // if interrupt() was call in any interrupt
    // point
    try
    {
        while(rq_stop!=m_request)
        {
            while(rq_none==m_request)
            {
                if(!action()) break;
            }
 
            if(rq_pause!=m_request) break;
 
            idle();
        }
    }
    catch(const boost::thread_interrupted&)
    {
    }
        
    // update state
    signal_state(completed);
    // perform on-exit custom action
    // after the state was updated
    on_exit();
}

其中 idle() 方法是主循环的一部分,当 action() 函数未被调用时。

void working_thread::idle()
{
    // signal paused state
    signal_state(paused);
    
    // wait in the paused state
    {    
        boost::unique_lock<boost::mutex> lock(m_guard);
 
        while(rq_pause==m_request)
        {
            m_pause.wait(lock);
        }
    }
 
    // signal running state
    signal_state(running);
}

signal_state() 方法更改 m_request 标志并发出条件变量信号。

void working_thread::signal_state(state_type state)
{
    // update the state
    // and signal that the thread is 
    // in new state 
    boost::unique_lock<boost::mutex> lock(m_guard);
    m_state=state;
    
    m_signal.notify_one();
}

所有接口函数都将更改 m_request 标志,然后等待条件变量发出信号。

例如,pause() 发起“暂停”操作并等待其完成。

bool working_thread::pause()
{
    event_status rc=pause_event();
 
    if(rc.wait) 
        rc.success&=wait_till_paused();
    
    return rc.success;
}

我们需要这两个部分:“发起操作”和“等待其完成”来进一步实现“批量”接口。

第一个“发起暂停操作”部分是:

working_thread::event_status working_thread::pause_event()
{
    // already paused
    if(paused==m_state) 
        return event_status(true, false);
    
    // cannot pause detached state
    if(detached()) 
        return event_status(false);
    
    request(rq_pause);
 
    // callback is called
    // after the flag is changed
    on_interrupt();
 
    return event_status(true);
}

第二个“等待暂停”部分是:

bool working_thread::wait_till_paused()
{
    // wait till the thread is paused
    {    
        boost::unique_lock<boost::mutex> lock(m_guard);
        while(paused!=m_state && completed!=m_state)
        {
            m_signal.wait(lock);
        }
    }
 
    return paused==m_state;
}

批量 pause() 操作实现很简单。

template<typename It>
static void pause(It begin, It end)
{
 std::list<working_thread*> waiting_list;
 
 // pause an object and add to the waiting list
 // if we need to wait till the object is paused
 for(; begin != end; ++begin) 
  pause_item(get_ptr(*begin), waiting_list);
 // wait till all objects in the waiting list are done
 std::for_each(waiting_list.begin(), waiting_list.end(), 
   boost::mem_fn(&working_thread::wait_till_paused));
}

析构函数必须在线程主函数仍在运行时断言。

virtual ~working_thread()
{
        // stop() must be called before
        // the derived class' dtor is completed.
        // stop() cannot be called here
        // because the thread's main function calls to
        // class's virtual functions including the pure
        // virtual action()
        BOOST_ASSERT_MSG(detached(), 
            "The thread function must be completed at this point");
}

BOOST 线程技巧

类实现使用了 BOOST 的线程类,它是跨平台的,并隐藏了操作系统特定的接口。BOOST 的线程类并未实现系统线程的所有功能。特别是,它没有线程优先级的接口。

幸运的是,该类提供了对线程“原生句柄”的访问,这使我们能够实现任何额外的操作系统特定功能。

处理线程优先级

// windows version
// the priority must be set after the thread is started

boost::thread_attributes attr;
   
// set stack size - crossplatform
attr.set_stack_size(m_stack_size);

boost::thread th=boost::thread(attr, ...);


#if defined(BOOST_THREAD_PLATFORM_WIN32)
SetThreadPriority(th.native_handle(), priority_value);
#endif
 
// pthread version
// the priority must be set before the thread is started

boost::thread_attributes attr;

// set stack size - crossplatform
attr.set_stack_size(m_stack_size);
 
#if defined(BOOST_THREAD_PLATFORM_PTHREAD)
 
pthread_attr_setschedpolicy(attr.native_handle(), priority_class);
struct sched_param params;
memset(&params, 0, sizeof(params));
params.sched_priority=priority_value;
pthread_attr_setschedparam(attr.native_handle(), &params);

boost::thread th=boost::thread(attr, ...);

#endif

平台

  • 编译器/IDE:MS Visual Studio 2010,KDevelop 4.4.1 使用 gcc 4.7.2
  • Boost 库:1.53.0
  • 操作系统:Windows 7 SP1,Fedora 18

参考文献

历史

  • 初始版本:2013/01/04。
© . All rights reserved.