具有暂停/恢复功能的 数据处理线程
暂停/恢复线程功能有助于避免因无效内存访问和竞态条件引起的问题。
动机
许多通信应用程序,例如代理服务器,都有一个主控制线程和几个数据处理线程。控制线程负责数据线程的配置和管理。应用程序可以维护一个数据线程的集合(池);集合的大小通常适合 CPU 的数量。数据线程执行时间关键的任务。例如,数据线程从可配置的套接字列表中读取消息,使用字典解析它们,处理它们,然后将它们传递给另一个线程进行发送。在描述的场景中,数据线程几乎独占地访问其可配置数据(例如套接字列表或解析字典):大部分时间只有它会触及可配置数据。只有在罕见的重新配置/关闭事件中,主控制线程才会修改线程的可配置数据。始终用互斥锁保护线程数据将非常低效:事实上,大部分时间都不需要保护,但每次数据线程从套接字列表中读取消息或使用字典处理它时,都会执行昂贵的互斥锁获取操作。
“利益冲突”可以通过多种方式解决。暂停/恢复功能是其中一种可重用的解决方案。
接口描述
类 working_thread
是暂停/恢复功能的跨平台(Windows/Linux)实现。该类(与 BOOST/C++11 线程不同)利用了线程类实现的经典方法:派生类必须实现一个将在线程上下文中调用的纯虚函数。
class working_thread
{
public:
enum state_type
{
init, //! thread did not start yet
paused, //! thread is paused
running, //! thread is running
completed //! thread function is completed
};
public:
working_thread
(
priority_type p=priority_type(),
size_t stack_size=DEFAULT_STACK_VALUE
)
void stop(bool force_interrupt=false);
bool start();
bool pause();
bool resume();
void join();
handle_type handle();
id_type id() const;
state_type state() const;
protected:
virtual bool action()=0;
virtual void on_start(){}
virtual void on_exit(){}
virtual void on_interrupt(){}
bool is_interrupted() const;
};
构造函数创建一个具有给定优先级和堆栈大小的非活动线程对象。start()
方法启动一个新的系统线程;stop()
方法关闭线程;pause()
方法使线程休眠;resume()
方法唤醒它。join()
方法等待直到线程的主函数完成。派生类必须实现纯虚方法 action()
,该方法将在运行的系统线程上下文中反复调用。该方法必须返回 true
才能再次调用。如果返回 false
,线程的主函数将退出。派生类还可以实现另外两个虚方法:on_start()
和 on_exit()
,分别在线程启动和停止时调用。
所有 start()
、stop()
、pause()
和 resume()
方法都是阻塞的。它们将等待直到相应操作完成:void stop()
在系统线程的主函数结束后返回。bool start()
方法在线程对象处于运行状态后返回,或者在发现线程无法启动(因为线程的主函数已结束)后返回。如果线程无法启动,bool start()
方法将返回 false
。对于 pause()
和 resume()
方法也是如此:pause()
方法使线程主函数休眠并等待操作完成;如果线程无法休眠,该方法返回 false
。resume()
方法唤醒线程主函数并等待操作完成;如果操作无法完成,该方法返回 false
。返回 false
的唯一原因是线程的主函数已结束且线程对象处于已完成状态。
该接口有几个优点
- 控制线程可以在修改可配置数据之前“暂停”数据线程;在数据线程休眠时修改线程的可配置数据是安全的,并且不需要互斥锁。
- 所有公开的方法都是阻塞的。这使我们能够避免许多因无效内存访问(例如,线程对象已被销毁但线程的主函数仍在运行)和竞态条件引起的意外情况。
暂停/恢复功能实现需要派生类进行一定的协作:它对 action()
实现提出了以下要求
action()
方法通过返回true
或false
来控制线程执行。如果函数返回true
,它将被再次调用。如果函数返回false
,线程的主函数将退出。action()
方法必须在其执行中中断,如果is_interrupted()
标志设置为 true。这并不意味着线程的主函数将退出。
action()
函数可以通过多种方式满足第二个要求。action()
函数可以周期性地检查 is_interrupted()
标志。检查周期决定了 stop()
和 pause()
方法的等待时间。例如(伪代码)
virtual bool action()
{
while(!is_interrupted())
{
select(socket_list, time_out_value);
if(data_arrived)
{
...
}
}
return true;
}
另一种方法是实现 on_interrupt()
虚方法。每次调用 stop()
或 pause()
方法时都会调用此回调。回调实现应用于强制 action()
方法检查 is_interrupted()
标志。例如(伪代码)
virtual bool on_interrupt()
{
signal(special_handle);
}
virtual bool action()
{
while(!is_interrupted())
{
select(socket_list, special_handle)//!
if(data_arrived)
{
...
}
}
return true;
}
此外,stop()
(但不包括 pause()
)方法可以使用 force_interrupt
标志:stop(true)
。此标志将中断 action()
代码在 BOOST 线程中断点的等待。
boost::thread::join()
boost::thread::timed_join()
boost::thread::try_join_for()
boost::thread::try_join_until()
boost::condition_variable::wait()
boost::condition_variable::timed_wait()
boost::condition_variable::wait_for()
boost::condition_variable::wait_until()
boost::condition_variable_any::wait()
boost::condition_variable_any::timed_wait()
boost::condition_variable_any::wait_for()
boost::condition_variable_any::wait_until()
boost::thread::sleep()
boost::this_thread::sleep_for()
boost::this_thread::sleep_until()
boost::this_thread::interruption_point()
强烈建议派生类在其析构函数中调用 stop()
方法——这将确保线程的主函数在类虚表被销毁之前完成。这一点至关重要,因为线程的主函数会调用类的虚方法(action()
、on_start()
、on_exit()
);这就是为什么 stop()
函数应该在派生类中调用,而不是在基类 working_thread
中调用。
批量操作。与线程池协同工作
当应用程序使用线程池时,阻塞接口并非总是高效。假设 action()
方法每秒检查一次 is_interrrupted()
标志。这将导致 stop()
方法的等待时间超过 1 秒,这是可以接受的。假设应用程序有 10 个数据线程。连续调用所有 10 个线程的 stop()
方法将花费 10 秒。这太长了。
总等待时间最多可以减少到 1 秒,前提是应用程序首先向每个线程发出退出信号,然后等待所有线程完成。然而,这种优化需要我们不想暴露的非阻塞接口。
为了在不暴露非阻塞接口的情况下允许批量操作,我们可以引入辅助类 bulk
。该类作为“友元”访问 working_thread
的非阻塞接口,但它本身只暴露阻塞接口。
struct bulk
{
bulk(…);
void start();
void stop(bool force_interrupt);
void pause();
void resume();
size_t count(state);
…
};
构造函数接受 working_thread
对象集合。start()
、stop()
、pause()
、resume()
函数使用隐藏的非阻塞 working_thread
接口来要求每个线程对象开始启动/停止/暂停/恢复操作,然后等待所有线程完成操作。
实现
实现很简单。主要思想是在线程的主函数中安排等待状态,当 action()
方法不被调用时。
此外,阻塞函数 start()/stop()/pause()/resume()
在条件变量上等待,并且每次线程对象更改其状态时都会发出信号。
实现使用了 BOOST 的线程类,但这并非必需。
线程的主函数实现如下:
void working_thread::main()
{
// signal that the thread is running
signal_state(running);
// perform on-start custom action
on_start();
// can throw const boost::thread_interrupted
// if interrupt() was call in any interrupt
// point
try
{
while(rq_stop!=m_request)
{
while(rq_none==m_request)
{
if(!action()) break;
}
if(rq_pause!=m_request) break;
idle();
}
}
catch(const boost::thread_interrupted&)
{
}
// update state
signal_state(completed);
// perform on-exit custom action
// after the state was updated
on_exit();
}
其中 idle()
方法是主循环的一部分,当 action()
函数未被调用时。
void working_thread::idle()
{
// signal paused state
signal_state(paused);
// wait in the paused state
{
boost::unique_lock<boost::mutex> lock(m_guard);
while(rq_pause==m_request)
{
m_pause.wait(lock);
}
}
// signal running state
signal_state(running);
}
signal_state()
方法更改 m_request
标志并发出条件变量信号。
void working_thread::signal_state(state_type state)
{
// update the state
// and signal that the thread is
// in new state
boost::unique_lock<boost::mutex> lock(m_guard);
m_state=state;
m_signal.notify_one();
}
所有接口函数都将更改 m_request
标志,然后等待条件变量发出信号。
例如,pause()
发起“暂停”操作并等待其完成。
bool working_thread::pause()
{
event_status rc=pause_event();
if(rc.wait)
rc.success&=wait_till_paused();
return rc.success;
}
我们需要这两个部分:“发起操作”和“等待其完成”来进一步实现“批量”接口。
第一个“发起暂停操作”部分是:
working_thread::event_status working_thread::pause_event()
{
// already paused
if(paused==m_state)
return event_status(true, false);
// cannot pause detached state
if(detached())
return event_status(false);
request(rq_pause);
// callback is called
// after the flag is changed
on_interrupt();
return event_status(true);
}
第二个“等待暂停”部分是:
bool working_thread::wait_till_paused()
{
// wait till the thread is paused
{
boost::unique_lock<boost::mutex> lock(m_guard);
while(paused!=m_state && completed!=m_state)
{
m_signal.wait(lock);
}
}
return paused==m_state;
}
批量 pause()
操作实现很简单。
template<typename It>
static void pause(It begin, It end)
{
std::list<working_thread*> waiting_list;
// pause an object and add to the waiting list
// if we need to wait till the object is paused
for(; begin != end; ++begin)
pause_item(get_ptr(*begin), waiting_list);
// wait till all objects in the waiting list are done
std::for_each(waiting_list.begin(), waiting_list.end(),
boost::mem_fn(&working_thread::wait_till_paused));
}
析构函数必须在线程主函数仍在运行时断言。
virtual ~working_thread()
{
// stop() must be called before
// the derived class' dtor is completed.
// stop() cannot be called here
// because the thread's main function calls to
// class's virtual functions including the pure
// virtual action()
BOOST_ASSERT_MSG(detached(),
"The thread function must be completed at this point");
}
BOOST 线程技巧
类实现使用了 BOOST 的线程类,它是跨平台的,并隐藏了操作系统特定的接口。BOOST 的线程类并未实现系统线程的所有功能。特别是,它没有线程优先级的接口。
幸运的是,该类提供了对线程“原生句柄”的访问,这使我们能够实现任何额外的操作系统特定功能。
处理线程优先级
// windows version
// the priority must be set after the thread is started
boost::thread_attributes attr;
// set stack size - crossplatform
attr.set_stack_size(m_stack_size);
boost::thread th=boost::thread(attr, ...);
#if defined(BOOST_THREAD_PLATFORM_WIN32)
SetThreadPriority(th.native_handle(), priority_value);
#endif
// pthread version
// the priority must be set before the thread is started
boost::thread_attributes attr;
// set stack size - crossplatform
attr.set_stack_size(m_stack_size);
#if defined(BOOST_THREAD_PLATFORM_PTHREAD)
pthread_attr_setschedpolicy(attr.native_handle(), priority_class);
struct sched_param params;
memset(¶ms, 0, sizeof(params));
params.sched_priority=priority_value;
pthread_attr_setschedparam(attr.native_handle(), ¶ms);
boost::thread th=boost::thread(attr, ...);
#endif
平台
- 编译器/IDE:MS Visual Studio 2010,KDevelop 4.4.1 使用 gcc 4.7.2
- Boost 库:1.53.0
- 操作系统:Windows 7 SP1,Fedora 18
参考文献
- BOOST 1.53:https://boost.ac.cn/doc/libs/1_53_0/
历史
- 初始版本:2013/01/04。