几乎像 Java 线程
一个类似 Java 的线程框架。
引言
在我之前的文章《Sound recording and encoding in MP3 format》中,我承诺将继续描述 MP3 流媒体的一些方面。问题是,它们太多了,无法全部写在一篇文章中。虽然可以,但我担心会失去焦点。最终,我们将通过收集分散在各篇文章中的代码,构建一个 MP3 流媒体服务器应用程序。
在本文中,我将描述(或者说提供)一个“类似 Java 的线程”框架。显然,一个好的服务器应用程序需要一个经过深思熟虑的线程引擎。为什么是类似 Java 的?我之所以这样实现(受到 Java 的启发),是因为 Java 中的线程实现简单而优雅(我认为这是一个非常有力的论据)。不幸的是,这个框架不支持 Java 的所有线程功能,因此它们只是“几乎像 Java 线程”。
我们对 Java 线程了解多少?
让我们回顾一下我们对 Java 线程的一些了解。我们知道 Java 线程基于 Thread
类和 Runnable
接口。
- 当继承
Thread
类时,实现run()
方法很重要,该方法将由Thread
执行。 - 实现
Runnable
时,实现run()
方法并将Runnable
传递给Thread
很重要。
class MyThread extends Thread {
public void run() {
...
}
}
...
MyThread th = new MyThread();
th.start(); // will execute run() within the thread
class MyRunnable implements Runnable {
public void run() {
...
}
}
...
Thread th = new Thread(new MyRunnable());
th.start(); // will execute run() method of the passed instance of the MyRunnable
但这还不是全部,Thread
类的 stop()
方法已被声明为已弃用(参见 JDK,例如 Java 1.5),并且提供了另一种停止 Java 线程的机制。在不深入细节的情况下(您可以查看 JDK 以获取更多详细信息),提供了两种方法:
interrupt()
用于设置线程的中断状态。isInterrupted()
用于验证当前线程是否已被中断。
因此,之前的代码片段看起来会像
class MyThread extends Thread {
public void run() {
while (!isInterrupted()) {
...
}
}
}
...
MyThread th = new MyThread();
th.start();
...
th.interrupt();
class MyRunnable implements Runnable {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
...
}
}
}
...
Thread th = new Thread(new MyRunnable());
th.start();
...
th.interrupt();
我们对 Win32 SDK 线程了解多少
只有几个函数,例如 CreateThread
、ResumeThread
、SuspendThread
和 TerminateThread
。我们也知道 CreateThread
函数接受一些参数,其中最重要的是:
- 一个指向将在线程中执行的函数的指针。
- 一个指向要传递给线程的变量的指针,该变量实际上将传递给线程的函数(请参阅前面的参数)。
几乎像 Java 线程一样,实现 1
我们只需要从 Win32 SDK 中使用的唯一函数(实际上是几个,但我们稍后会看到它们)是 CreateThread
。我们不关心 ResumeThread
和 SuspendThread
函数,因为它们在 Java 中的等效项也已声明为已弃用(如果线程拥有关键锁并被挂起,它们可能会导致应用程序死锁)。我们也不关心 TerminateThread
,它被声明为不安全(参见 Win32 SDK),并且 Java 的 stop()
等效项也已弃用。我们需要一个看起来像这样的通用接口:
class IRunnable {
protected:
virtual void run() = 0;
};
并且,与 Java 线程类似,它将在这个框架中扮演核心的角色。与 Java 相比,有人可能会注意到我将 run()
声明为 protected
(在 Java 中是 public
)。这没什么大不了的,除了在线程之外调用 run()
方法(几乎所有时候)都是无用的。
我们还将创建一个类似(仅框架,不包含实现,如果对实现感兴趣,请查看源代码)的类:
class CThread: public IRunnable {
private:
...
volatile bool m_bIsInterrupted;
IRunnable *m_RunObj;
...
// See ::CreateThread(...) within the start() method. This is
// the thread's API function to be executed. Method executes
// run() method of the CThread instance passed as parameter.
static DWORD WINAPI StartThreadST(LPVOID PARAM) {...};
protected:
// It is not possible to instantiate CThread objects directly.
// Possible only by specifying a IRunnable object to execute
// its run() method.
CThread(int nPriority = THREAD_PRIORITY_NORMAL): {...};
// this implementation of the run() will execute the passed IRunnable
// object (if not null). Inheriting class is responsible for
// using this method or overriding it with a different one.
virtual void run() {
if (this->m_RunObj != NULL) this->m_RunObj->run();
};
public:
CThread(IRunnable *RunTask, int nPriority = THREAD_PRIORITY_NORMAL) {
...
if (this != RunTask) this->m_RunObj = RunTask;
else throw "Self referencing not allowed.";
};
virtual ~CThread() {
this->interrupt();
// wait until thread ends
this->join();
};
// Method returns the instance of a CThread responsible
// for the context of the current thread.
static CThread& currentThread() {...}
// Method signals thread to stop execution.
void interrupt() { this->m_bIsInterrupted = true; }
// Check if thread was signaled to stop.
bool isInterrupted() { return this->m_bIsInterrupted; }
// Method will wait for thread's termination.
void join() {...};
// Method starts the Thread. If thread is already started/running, method
// will simply return.
void start() {...};
};
一切看起来都很简单,除了两件事(也许)。
- Q1.
run()
是如何在线程中实际执行的? - A1. 关键在于这个方法:
static DWORD WINAPI StartThreadST(LPVOID PARAM)
它是静态的,并且不指向 CThread
的任何实例。我们可以实现 start()
方法,如下所示:
void start() {
HANDLE hThread;
LPTHREAD_START_ROUTINE pStartRoutine = &CThread::StartThreadST;
...
hThread = ::CreateThread(NULL, 0, pStartRoutine, (PVOID) this, 0, NULL);
if (hThread == NULL) {
...
throw "Failed to call CreateThread(). Thread not started.";
}
...
}
因此,pStartRoutine
指向 StartThreadST
方法(它是静态的!),我们将其作为指针传递给 CreateThread
函数。我们还将 this
,即 CThread
对象当前实例的指针,传递给 CreateThread
。
从另一个角度看,StartThreadST
应该看起来像:
static DWORD WINAPI StartThreadST(LPVOID PARAM) {
CThread *_this = (CThread *) PARAM;
if (_this != NULL) {
...
_this->run();
...
}
return 0;
}
这就是诀窍,没什么复杂的,说实话,是一个非常著名的技巧。
run()
方法(继承类的)。在这种情况下,我们没有问题。void run() {
while (!isInterrupted()) {
...
}
}
但是 CThread
本身也可以被实例化,因为它有一个公共构造函数,接受一个指向 IRunnable
类型对象的指针。但是,这是一个单独的对象,有自己的 run()
,那么我们如何处理这种情况呢?
class MyRunnable: public IRunnable {
...
void run() {
while (!CThread::currentThread().isInterrupted()) {
...
}
}
};
...
MyRunnable *rn = new MyRunnable();
CThread *th = new Thread(rn);
th->start();
...
th->interrupt();
TlsAlloc()
、TlsFree()
、TlsSetValue()
和 TlsGetValue()
函数。回到 StartThreadST
方法,我们对其进行调整:static DWORD WINAPI StartThreadST(LPVOID PARAM) {
CThread *_this = (CThread *) PARAM;
if (_this != NULL) {
...
TlsSetValue(CThread::m_TLSDesc.descriptor, (LPVOID) _this);
_this->run();
...
}
return 0;
}
而 currentThread()
方法将看起来像:
static CThread& currentThread() {
CThread *thr = (CThread *) TlsGetValue(CThread::m_TLSDesc.descriptor);
if (thr == NULL) throw "Call is not within a CThread context.";
return *thr;
}
这样,任何“在 CThread
实例(仅此而已!)内运行”的东西都可以访问该实例,否则(如果“不在内”)——将引发异常。
坏处
不幸的是,CThread
的这种实现有一个问题;最终,这是 C++,不是 Java。请看析构函数的实现:
virtual ~CThread() {
this->interrupt();
// wait until thread ends
this->join();
};
是的,当 CThread
类型(包括派生类)的对象被销毁时,会发送一个中断请求(以防有人忘记这样做)并调用 join()
,它会等待直到线程停止执行(如果正在运行)。所以,像这样的事情:
while (!isInterrupted()) {
...
}
在 run()
方法中是必不可少的,除非您实现了自己的停止机制(这与此实现冗余,在 Java 的实现中也是如此)。这也不是太糟糕。归根结底,在开发代码时,您必须了解自己的代码(以及代码内部发生的事情)。此外,如果至少有一个非守护线程在运行(即使 main()
方法执行完成),Java 应用程序也不会退出,并且 Java 不保证 interrupt()
会停止线程(interrupt()
根本不是为此目的而设计的,它只是一个停止的信号)。因此,在销毁线程时,请准备好您的代码可能会卡住一段时间(或更长时间)。
另外,请考虑以下代码:
class MyThread: public CThread{
private:
Object o;
void use(Object &obj) {...}
public:
MyThread(): o(), CThread() {...}
protected:
void run() {
while(!isSuspended()) {
use(o); // intensive use of the "o"
}
}
};
乍一看,这段代码看起来不错。但其中存在一个问题。让我们看看执行情况:
MyThread *mth = new MyThread();
mth->start();
...
delete mth;
因此,当创建 mth
时,会调用 CThread()
构造函数,创建一个对象 o
,然后调用 MyThread()
构造函数(C++ 标准是这样说的)。由 mth
指向的线程启动,并在一段时间后被销毁。销毁的方式与构造相反;调用 ~MyThread()
(即使不存在),销毁对象 o
,最后调用 ~CThread()
。请注意,在调用 ~MyThread()
和 ~CThread()
之间,对象 o
被销毁,但线程仍在运行并执行 run()
,而 run()
使用对象 o
(如前所述,非常频繁地)。糟糕,而且我认为这是一个非常大的问题。那么,我们如何处理这个问题呢?
解决方案 1. 简单的方法,但每次继承 CThread
时都需要记住它:
~MyThread() {
interrupt();
join();
}
解决方案 2. 与其使用静态初始化的成员,甚至动态初始化的成员,不如使用带有引用计数的智能指针,例如 shared_ptr
(参见此链接)。代码看起来会像这样(仅伪代码):
class MyThread: public CThread {
private:
shared_ptr<Object> o;
void use(shared_ptr<Object> &obj) {...}
public:
MyThread(): CThread() {
this->o = shared_ptr<Object>(new Object());
...
}
protected:
void run() {
shared_ptr<Object> o1 = this->o;
while(!isSuspended()) {
use(o1);
}
}
};
这甚至更像 Java,因为 object
,shared_ptr<Object>
指向的对象,当引用计数等于零时会自动销毁。
解决方案 3. 使用模板(将 CThread
设为模板,而不是类),例如:
template <class T> class CThread: public T {
public:
CThread(): T() {...}
...
~CThread() {
suspend();
wait();
}
};
这里的一个问题是 currentThread()
方法仍然必须是静态的。另一个问题是,T
必须有一个不带参数的构造函数。那么,IRunnable
在哪里?
我找不到一个非常好的、100% 可行的解决方案。因此,我将继续考虑“解决方案 1”。但是,如果您有好主意,请随时与我联系!
就这些了吗?
不完全是,否则我不会问这个问题。随着 JDK 1.5 的发布,包含了一个新包:java.util.concurrent
。这个包提供了一系列非常好的功能(如果我能这样称呼它们的话),用于并发处理和多线程。其中一些是:
Executors
工厂类和ExecutorService
接口
Executors
(工厂)类有一些静态方法,每个方法都返回 ExecutorService
类型的对象。根据调用的是哪个方法,会返回具有相应线程池引擎实现的 ExecutorService
类型的对象。每个返回的 ExecutorService
都包含一个任务队列和一些重载的 submit()
方法,用于将任务提交到池以执行它们。猜猜怎么着?submit()
方法之一接受 Runnable
类型的对象作为任务。
几乎像 Java 线程一样,实现 2
让我们尝试做一些类似的事情,但不是非常通用,我将只关注 Executors.newFixedThreadPool(...)
。所以,我们将构建一个具有固定数量线程的线程池(仍然只是框架;有关更多详细信息,请参阅源代码):
class CSimpleThreadPool: public IRunnable {
private:
vector<CThread*> m_arrThreadTasks;
mpriority_queue<CPriorityTask> m_PQueue;
// Method will return a task from the queue,
// if there are no tasks in the queue, method will return NULL.
IRunnable *get() {...}
public:
// How many threads are in the collection.
int threads() const { m_arrThreadTasks.size(); };
// Method starts pool's threads.
void startAll() {...};
// Constructor creates the thread pool and sets capacity for the task queue.
CSimpleThreadPool(unsigned int nThreadsCount, unsigned int nQueueCapacity = 16) {
int i;
CThread *thTask = NULL;
...
// Set initial capacity of the tasks Queue.
m_PQueue.reserve(nQueueCapacity);
// Initialize thread pool.
for (i = 0; i < nThreadsCount; i++) {
thTask = new CThread(this);
if (thTask != NULL) m_arrThreadTasks.push_back(thTask);
}
};
// Submit a new task to the pool
void submit(IRunnable *pRunObj, int nPriority = 0) {...}
// Method will execute task's run() method within its CThread context.
virtual void run() {
IRunnable *task;
while (!CThread::currentThread().isInterrupted()) {
// Get a task from the queue.
task = get();
// Execute the task.
if (task != NULL) task->run();
::Sleep(2);
}
}
virtual ~CSimpleThreadPool() {...};
// Method signals threads to stop and waits for termination
void shutdown() {
for (int i = 0; i < m_arrThreadTasks.size(); i++) {
m_arrThreadTasks[i]->interrupt();
m_arrThreadTasks[i]->join();
}
}
};
与 Java 的 FixedThreadPool
唯一的区别是,CSimpleThreadPool
使用优先队列,并且只有一个 submit 方法。mpriority_queue
是同一个 STL priority_queue
,除了它允许预设容量以及获取容器的当前容量。
现在,我们可以简单地这样做:
#define NUMLIM 50
class A: public IRunnable {
private:
int m_num;
protected:
void run() {
Log::LogMessage(L"Executing %d thread\n", m_num);
}
public:
A(int num) { m_num = num; }
};
class B: public IRunnable {
protected:
void run() {
int i = 0;
while (!CThread::currentThread().isInterrupted()) {
Log::LogMessage(L"Executing B cycle %d\n", i);
::Sleep(100);
i++;
}
}
};
int main(int argc, char* argv[])
{
CSimpleThreadPool tp(2, NUMLIM + 1);
A *a[NUMLIM];
B *b = new B();
int i = 0;
for (i = 0; i < NUMLIM; i++) a[i] = new A(i);
tp.startAll();
tp.submit(b);
for (i = 0; i < 20*NUMLIM; i++) {
tp.submit(a[i % NUMLIM], i % NUMLIM);
::Sleep(1);
}
::Sleep(10000);
tp.shutdown();
delete b;
for (i = 0; i < NUMLIM; i++) delete a[i];
return 0;
}
“几乎”像 Java 一样。
我的下一篇文章将是关于 IO 完成端口(IO Completion Ports),我保证尽快准备好。