65.9K
CodeProject 正在变化。 阅读更多。
Home

几乎像 Java 线程

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.44/5 (12投票s)

2007年1月2日

CPOL

7分钟阅读

viewsIcon

51339

downloadIcon

483

一个类似 Java 的线程框架。

引言

在我之前的文章《Sound recording and encoding in MP3 format》中,我承诺将继续描述 MP3 流媒体的一些方面。问题是,它们太多了,无法全部写在一篇文章中。虽然可以,但我担心会失去焦点。最终,我们将通过收集分散在各篇文章中的代码,构建一个 MP3 流媒体服务器应用程序。

在本文中,我将描述(或者说提供)一个“类似 Java 的线程”框架。显然,一个好的服务器应用程序需要一个经过深思熟虑的线程引擎。为什么是类似 Java 的?我之所以这样实现(受到 Java 的启发),是因为 Java 中的线程实现简单而优雅(我认为这是一个非常有力的论据)。不幸的是,这个框架不支持 Java 的所有线程功能,因此它们只是“几乎像 Java 线程”。

我们对 Java 线程了解多少?

让我们回顾一下我们对 Java 线程的一些了解。我们知道 Java 线程基于 Thread 类和 Runnable 接口。

  1. 当继承 Thread 类时,实现 run() 方法很重要,该方法将由 Thread 执行。
  2. class MyThread extends Thread {
        public void run() {
            ...
        }
    }
    ...
    MyThread th = new MyThread();
    th.start(); // will execute run() within the thread
  3. 实现 Runnable 时,实现 run() 方法并将 Runnable 传递给 Thread 很重要。
  4. class MyRunnable implements Runnable {
        public void run() {
            ...
        }
    }
    ...
    Thread th = new Thread(new MyRunnable());
    th.start(); // will execute run() method of the passed instance of the MyRunnable

但这还不是全部,Thread 类的 stop() 方法已被声明为已弃用(参见 JDK,例如 Java 1.5),并且提供了另一种停止 Java 线程的机制。在不深入细节的情况下(您可以查看 JDK 以获取更多详细信息),提供了两种方法:

  • interrupt() 用于设置线程的中断状态。
  • isInterrupted() 用于验证当前线程是否已被中断。

因此,之前的代码片段看起来会像

class MyThread extends Thread {
    public void run() {
        while (!isInterrupted()) {
            ...
        }
    }
}
...
MyThread th = new MyThread();
th.start();
...
th.interrupt();
class MyRunnable implements Runnable {
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            ...
        }
    }
}
...
Thread th = new Thread(new MyRunnable());
th.start(); 
...
th.interrupt();

我们对 Win32 SDK 线程了解多少

只有几个函数,例如 CreateThreadResumeThreadSuspendThreadTerminateThread。我们也知道 CreateThread 函数接受一些参数,其中最重要的是:

  • 一个指向将在线程中执行的函数的指针。
  • 一个指向要传递给线程的变量的指针,该变量实际上将传递给线程的函数(请参阅前面的参数)。

几乎像 Java 线程一样,实现 1

我们只需要从 Win32 SDK 中使用的唯一函数(实际上是几个,但我们稍后会看到它们)是 CreateThread。我们不关心 ResumeThreadSuspendThread 函数,因为它们在 Java 中的等效项也已声明为已弃用(如果线程拥有关键锁并被挂起,它们可能会导致应用程序死锁)。我们也不关心 TerminateThread,它被声明为不安全(参见 Win32 SDK),并且 Java 的 stop() 等效项也已弃用。我们需要一个看起来像这样的通用接口:

class IRunnable {
protected:
    virtual void run() = 0;
};

并且,与 Java 线程类似,它将在这个框架中扮演核心的角色。与 Java 相比,有人可能会注意到我将 run() 声明为 protected(在 Java 中是 public)。这没什么大不了的,除了在线程之外调用 run() 方法(几乎所有时候)都是无用的。

我们还将创建一个类似(仅框架,不包含实现,如果对实现感兴趣,请查看源代码)的类:

class CThread: public IRunnable {
private:
    ...
    volatile bool    m_bIsInterrupted;
    IRunnable    *m_RunObj;
    ...

    // See ::CreateThread(...) within the start() method. This is 
    // the thread's API function to be executed. Method executes
    // run() method of the CThread instance passed as parameter.

    static DWORD WINAPI StartThreadST(LPVOID PARAM) {...};

protected:
    // It is not possible to instantiate CThread objects directly.
    // Possible only by specifying a IRunnable object to execute 
    // its run() method.

    CThread(int nPriority = THREAD_PRIORITY_NORMAL): {...};

    // this implementation of the run() will execute the passed IRunnable
    // object (if not null). Inheriting class is responsible for 
    // using this method or overriding it with a different one.

    virtual void run() {
        if (this->m_RunObj != NULL) this->m_RunObj->run();
    };

public:
    CThread(IRunnable *RunTask, int nPriority = THREAD_PRIORITY_NORMAL) {
        ...
        if (this != RunTask) this->m_RunObj = RunTask;
        else throw "Self referencing not allowed.";
    };

    virtual ~CThread() { 
        this->interrupt();
        // wait until thread ends

        this->join();
    };

    // Method returns the instance of a CThread responsible
    // for the context of the current thread.
    static CThread& currentThread() {...}

    // Method signals thread to stop execution.

    void interrupt() { this->m_bIsInterrupted = true; }

    // Check if thread was signaled to stop.

    bool isInterrupted() { return this->m_bIsInterrupted; }

    // Method will wait for thread's termination.

    void join() {...};

    // Method starts the Thread. If thread is already started/running, method

    // will simply return.

    void start() {...};
};

一切看起来都很简单,除了两件事(也许)。

  • Q1. run() 是如何在线程中实际执行的?
  • A1. 关键在于这个方法:
  • static DWORD WINAPI StartThreadST(LPVOID PARAM)

    它是静态的,并且不指向 CThread 的任何实例。我们可以实现 start() 方法,如下所示:

    void start() {
        HANDLE    hThread;
        LPTHREAD_START_ROUTINE pStartRoutine = &CThread::StartThreadST;
    
        ...
    
        hThread = ::CreateThread(NULL, 0, pStartRoutine, (PVOID) this, 0, NULL);
        if (hThread == NULL) {
            ...
            throw "Failed to call CreateThread(). Thread not started.";
        }
        ...
    }

    因此,pStartRoutine 指向 StartThreadST 方法(它是静态的!),我们将其作为指针传递给 CreateThread 函数。我们还将 this,即 CThread 对象当前实例的指针,传递给 CreateThread

    从另一个角度看,StartThreadST 应该看起来像:

    static DWORD WINAPI StartThreadST(LPVOID PARAM) {
        CThread *_this = (CThread *) PARAM;
    
        if (_this != NULL) {
            ...
            _this->run();
            ...
        }
        return 0;
    }

    这就是诀窍,没什么复杂的,说实话,是一个非常著名的技巧。

  • Q2. 好的,这个类可以被继承,并且继承类的对象将执行适当的 run() 方法(继承类的)。在这种情况下,我们没有问题。
  • void run() {
        while (!isInterrupted()) {
            ...
        }
    }

    但是 CThread 本身也可以被实例化,因为它有一个公共构造函数,接受一个指向 IRunnable 类型对象的指针。但是,这是一个单独的对象,有自己的 run(),那么我们如何处理这种情况呢?

    class MyRunnable: public IRunnable {
    ...
        void run() {
            while (!CThread::currentThread().isInterrupted()) {
                ...
            }
        }
    };
    ...
    MyRunnable *rn = new MyRunnable();
    CThread *th = new Thread(rn);
    th->start();
    ...
    th->interrupt();
  • A2. 关键在于 TLS API(线程局部存储)。请参阅 MSDN 中的 TlsAlloc()TlsFree()TlsSetValue()TlsGetValue() 函数。回到 StartThreadST 方法,我们对其进行调整:
  • static DWORD WINAPI StartThreadST(LPVOID PARAM) {
        CThread *_this = (CThread *) PARAM;
    
        if (_this != NULL) {
            ...
            TlsSetValue(CThread::m_TLSDesc.descriptor, (LPVOID) _this);
            _this->run();
            ...
        }
        return 0;
    }

    currentThread() 方法将看起来像:

    static CThread& currentThread() {
        CThread *thr = (CThread *) TlsGetValue(CThread::m_TLSDesc.descriptor);
        if (thr == NULL) throw "Call is not within a CThread context.";
        return *thr;
    }

    这样,任何“在 CThread 实例(仅此而已!)内运行”的东西都可以访问该实例,否则(如果“不在内”)——将引发异常。

坏处

不幸的是,CThread 的这种实现有一个问题;最终,这是 C++,不是 Java。请看析构函数的实现:

virtual ~CThread() {
    this->interrupt();
    // wait until thread ends

    this->join();
};

是的,当 CThread 类型(包括派生类)的对象被销毁时,会发送一个中断请求(以防有人忘记这样做)并调用 join(),它会等待直到线程停止执行(如果正在运行)。所以,像这样的事情:

while (!isInterrupted()) {
    ...
}

run() 方法中是必不可少的,除非您实现了自己的停止机制(这与此实现冗余,在 Java 的实现中也是如此)。这也不是太糟糕。归根结底,在开发代码时,您必须了解自己的代码(以及代码内部发生的事情)。此外,如果至少有一个非守护线程在运行(即使 main() 方法执行完成),Java 应用程序也不会退出,并且 Java 不保证 interrupt() 会停止线程(interrupt() 根本不是为此目的而设计的,它只是一个停止的信号)。因此,在销毁线程时,请准备好您的代码可能会卡住一段时间(或更长时间)。

另外,请考虑以下代码:

class MyThread: public CThread{
private:
    Object o;
    void use(Object &obj) {...}

public:
    MyThread(): o(), CThread() {...}

protected:
    void run() {
        while(!isSuspended()) {
            use(o); // intensive use of the "o"

        }
    }
};

乍一看,这段代码看起来不错。但其中存在一个问题。让我们看看执行情况:

MyThread *mth = new MyThread();
mth->start();
...
delete mth;

因此,当创建 mth 时,会调用 CThread() 构造函数,创建一个对象 o,然后调用 MyThread() 构造函数(C++ 标准是这样说的)。由 mth 指向的线程启动,并在一段时间后被销毁。销毁的方式与构造相反;调用 ~MyThread()(即使不存在),销毁对象 o,最后调用 ~CThread()。请注意,在调用 ~MyThread()~CThread() 之间,对象 o 被销毁,但线程仍在运行并执行 run(),而 run() 使用对象 o(如前所述,非常频繁地)。糟糕,而且我认为这是一个非常大的问题。那么,我们如何处理这个问题呢?

解决方案 1. 简单的方法,但每次继承 CThread 时都需要记住它:

~MyThread() {
    interrupt();
    join();    
}

解决方案 2. 与其使用静态初始化的成员,甚至动态初始化的成员,不如使用带有引用计数的智能指针,例如 shared_ptr参见此链接)。代码看起来会像这样(仅伪代码):

class MyThread: public CThread {
private:
    shared_ptr<Object> o;
    void use(shared_ptr<Object> &obj) {...}

public:
    MyThread(): CThread() {
        this->o = shared_ptr<Object>(new Object());
        ...
    }

protected:
    void run() {
        shared_ptr<Object> o1 = this->o;
        while(!isSuspended()) {
            use(o1);
        }
    }
};

这甚至更像 Java,因为 objectshared_ptr<Object> 指向的对象,当引用计数等于零时会自动销毁。

解决方案 3. 使用模板(将 CThread 设为模板,而不是类),例如:

template <class T> class CThread: public T {
public:
    CThread(): T() {...}
    ...
    ~CThread() {
        suspend();
        wait();
    }
};

这里的一个问题是 currentThread() 方法仍然必须是静态的。另一个问题是,T 必须有一个不带参数的构造函数。那么,IRunnable 在哪里?

我找不到一个非常好的、100% 可行的解决方案。因此,我将继续考虑“解决方案 1”。但是,如果您有好主意,请随时与我联系!

就这些了吗?

不完全是,否则我不会问这个问题。随着 JDK 1.5 的发布,包含了一个新包:java.util.concurrent。这个包提供了一系列非常好的功能(如果我能这样称呼它们的话),用于并发处理和多线程。其中一些是:

  • Executors 工厂类和
  • ExecutorService 接口

Executors(工厂)类有一些静态方法,每个方法都返回 ExecutorService 类型的对象。根据调用的是哪个方法,会返回具有相应线程池引擎实现的 ExecutorService 类型的对象。每个返回的 ExecutorService 都包含一个任务队列和一些重载的 submit() 方法,用于将任务提交到池以执行它们。猜猜怎么着?submit() 方法之一接受 Runnable 类型的对象作为任务。

几乎像 Java 线程一样,实现 2

让我们尝试做一些类似的事情,但不是非常通用,我将只关注 Executors.newFixedThreadPool(...)。所以,我们将构建一个具有固定数量线程的线程池(仍然只是框架;有关更多详细信息,请参阅源代码):

class CSimpleThreadPool: public IRunnable {
private:
    vector<CThread*>        m_arrThreadTasks;
    mpriority_queue<CPriorityTask>    m_PQueue;

    // Method will return a task from the queue,
    // if there are no tasks in the queue, method will return NULL.

    IRunnable *get() {...}

public:
    // How many threads are in the collection.

    int threads() const { m_arrThreadTasks.size(); };

    // Method starts pool's threads.

    void startAll() {...};

    // Constructor creates the thread pool and sets capacity for the task queue.

    CSimpleThreadPool(unsigned int nThreadsCount, unsigned int nQueueCapacity = 16) {
        int i;
        CThread *thTask = NULL;
        ...
        // Set initial capacity of the tasks Queue.

        m_PQueue.reserve(nQueueCapacity);

        // Initialize thread pool.

        for (i = 0; i < nThreadsCount; i++) {
            thTask = new CThread(this);
            if (thTask != NULL) m_arrThreadTasks.push_back(thTask);
        }
    };

    // Submit a new task to the pool

    void submit(IRunnable *pRunObj, int nPriority = 0) {...}

    // Method will execute task's run() method within its CThread context.

    virtual void run() {
        IRunnable *task;

        while (!CThread::currentThread().isInterrupted()) {
            // Get a task from the queue.

            task = get();

            // Execute the task.

            if (task != NULL) task->run();
            ::Sleep(2);
        }
    }

    virtual ~CSimpleThreadPool() {...};

    // Method signals threads to stop and waits for termination

    void shutdown() {
        for (int i = 0; i < m_arrThreadTasks.size(); i++) {
            m_arrThreadTasks[i]->interrupt();
            m_arrThreadTasks[i]->join();
        }
    }
};

与 Java 的 FixedThreadPool 唯一的区别是,CSimpleThreadPool 使用优先队列,并且只有一个 submit 方法。mpriority_queue 是同一个 STL priority_queue,除了它允许预设容量以及获取容器的当前容量。

现在,我们可以简单地这样做:

#define NUMLIM 50

class A: public IRunnable {
private:
    int m_num;
protected:
    void run() {
        Log::LogMessage(L"Executing %d thread\n", m_num);
    }

public:
    A(int num) { m_num = num; }
};

class B: public IRunnable {
protected:
    void run() {
        int i = 0;
        while (!CThread::currentThread().isInterrupted()) {
            Log::LogMessage(L"Executing B cycle %d\n", i);
            ::Sleep(100);
            i++;
        }
    }
};

int main(int argc, char* argv[])
{
    CSimpleThreadPool tp(2, NUMLIM + 1);
    A *a[NUMLIM];
    B *b = new B();
    int i = 0;


    for (i = 0; i < NUMLIM; i++) a[i] = new A(i); 

    tp.startAll();
    tp.submit(b); 
    for (i = 0; i < 20*NUMLIM; i++) {
        tp.submit(a[i % NUMLIM], i % NUMLIM); 
        ::Sleep(1);
    }

    ::Sleep(10000);
    tp.shutdown();

    delete b;
    for (i = 0; i < NUMLIM; i++) delete a[i]; 
    return 0;
}

“几乎”像 Java 一样。

我的下一篇文章将是关于 IO 完成端口(IO Completion Ports),我保证尽快准备好。

© . All rights reserved.