65.9K
CodeProject 正在变化。 阅读更多。
Home

MTCopy:一个多线程单/多文件复制工具

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.74/5 (6投票s)

2008 年 4 月 6 日

CPOL

8分钟阅读

viewsIcon

81547

downloadIcon

2633

一篇关于多线程单/多文件复制工具实现和用法的文章。

SC.JPG

引言

以正确的方式运行多个线程确实可以提高应用程序的效率和响应能力,仅举几例。MTCopy 具有这个优势。它首先启动多个线程以协同执行 I/O 操作(多文件多线程场景,MMT),并在必要时将要复制的大文件分割成几个扇区对齐的块(单文件多线程场景,SMT),这样您就不必等到耗时的 I/O 操作在单线程场景下完成。当然,线程越多并不一定效率越高,您需要考虑线程上下文切换的开销;否则,程序将花费大量宝贵的 CPU 时间在上下文切换上,而不是执行它应该执行的实际工作。

背景

在 CodeProject 上撰写文章并附带程序一直是我的梦想。几周前,在我看到一个类似的工具并想知道我是否能自己做一个之后,实现一个文件复制工具就浮现在我的脑海里了,如果能完成并在此发布就太好了。所以,几周后,我带着这篇文章和完成的示例程序来了!这个工具与其说是一个严肃的产品级文件复制工具,不如说是我在业余时间通过线程建模、线程同步和文件操作概念进行自我练习的副产品。

示例程序用法

该工具是一个基于控制台的应用程序,因此要运行它,您需要指定命令行,其中包含“[源文件/文件夹] [目标文件/文件夹] /t: [复制过程中将创建的线程数] /b: [复制过程中使用的缓冲区大小(KB/MB)] /l: [文件大小超过此阈值将被分割成小块并进行多线程复制]”。一旦开始运行,它将在 c: 驱动器根目录下创建一个名为“MTCopy.txt”的日志文件。

因此,您可以复制一个文件夹,例如:“MTCopy.exe "I:\Something" "I:\Something_" /t:2 /b:5m /l:10m"(对多文件复制使用两个线程;对于单文件复制,仅当其大小等于或大于 10MB 时才使用另外两个线程),或者简单地通过使用:“MTCopy.exe "I:\Something\something.wmv" "I:\Something_" /t:2 /b:5m /l:10m" 来多线程复制一个大的单文件。建议缓冲区大小应为扇区大小的倍数;这样,程序将直接分配该内存量,而不是调整缓冲区大小使其扇区对齐。

请注意:[源文件/文件夹] 和 [目标文件/文件夹] 选项应加双引号;否则,如果它们之间有空格,它们将被解释为多个选项。

实现方式

程序首先创建两个线程池和一个文件遍历线程。

文件遍历线程将递归搜索指定目录中的所有文件;它与 MMT 复制线程池并行运行。一旦找到文件,它将发出信号通知 MMT 线程池复制文件,然后使自身进入睡眠状态,直到 MMT 线程池通知它唤醒并继续遍历。如果找到文件夹,它将直接创建该文件夹。

多文件多线程复制线程池:当文件遍历线程向此线程池发出信号表示已找到文件时,它将在临界区中为本地线程创建一个源文件路径的本地副本,该临界区在任何给定时间仅由一个线程访问,然后由每个本地线程执行实际的文件复制操作。每个本地线程都有一组自己的本地变量,不与相邻线程共享。此线程池一直运行,直到文件遍历线程处于未发出信号状态(线程/进程对象在终止时发出信号);也就是说,当文件反向线程完成遍历特定目录时,此线程池将终止。

在接收要复制的文件时,它将检查每个文件的大小是否等于或大于 /l: 命令行选项指定的大小。如果大小满足条件,该文件将被放入一个专用的队列中作为工作项,并创建一个大小相同的虚拟文件以供将来处理。在此过程中,这个新创建的项目将被配备一系列块度量,例如该文件应被分割成多少块;同样,此块由 '/b:' 选项指定。有了所有这些信息,MMT 就会简单地复制未经过滤的文件。骨架伪代码如下:

for(;(WaitForSingleObject(g_hCopyThd, IGNORE)) != WAIT_OBJECT_0;) 
{
    // Get a copy of a file for the local thread 
    EnterCriticalSection(&lpCS);
    WaitForSingleObject(g_hFound, INFINITE);
    // Found one file
    _tcscpy(tSrcPath, FileName);
    // Start find process again
    SetEvent(g_hStartFind);
    LeaveCriticalSection(&lpCS);
    ...
    if (filesize > threshold)
    {
        AppendToSMT();
    }
    ...
    CopyFile(tSrcPath, tDstName, TRUE);
    ...
}

单文件多线程复制线程池:此线程池首先等待一个信号量对象被触发,当队列中有要复制的项时,此信号量会被触发;否则,它将进入睡眠状态。每个项的块将由此线程池中的本地线程处理。完成复制项后,线程池将暂停自身并等待下一个项的到来。代码骨架如下:

for (int i = 0; i < m_pHeadWorkItem->csa[uiIndex].uiLength; i++ )
{
    SetFilePointer(hSrcFile, 
     (LONG)(m_pHeadWorkItem->csa[uiIndex].uiCurrentBlock * CS->m_dwCopyBlockSize)
      + (CS->m_dwCopyBlockSize * i), NULL/*&lHigh*/, FILE_BEGIN);
    SetFilePointer(hDstFile, 
     (LONG)(m_pHeadWorkItem->csa[uiIndex].uiCurrentBlock * CS->m_dwCopyBlockSize)
      + (CS->m_dwCopyBlockSize * i), NULL, FILE_BEGIN);
    fFileOp = ReadFile(hSrcFile, pTmp, CS->m_dwCopyBlockSize, &dwRead, NULL);
    ...
    WriteFile(hDstFile, pTmp, dwRead, &dwWritten, NULL);
    ...
    SuspendThread(m_hThd[uiIndex]);
}

关注点

线程创建时的 CloseHandle:当我第一次看到像 CloseHandle(_beginthreadex(...)) 这样的代码时,我很困惑:关闭刚刚创建的线程句柄有什么意义?后来,在使用这段代码一段时间后,我意识到这实际上并不会导致子线程的主线程终止;它只是让系统将线程的用法计数从 2 减少到 1(线程出生时用法计数为 2),当线程退出时,此用法计数将减少到 0,从而释放对象的内存;因此,您无需编写额外的代码等待线程终止即可关闭其句柄。当然,如果您在创建线程内核对象后不再需要它,只想让它运行完成,那么情况就是如此。此行为也适用于进程内核对象。

CloseHandle(...) 还有另一种有用的情况:假设您有一个子进程,其主线程生成了另一个线程,然后主线程终止。此时,只有当父进程没有该线程对象的未决句柄时,系统才能从子进程的主线程对象中释放其内存。否则,系统无法释放该对象,直到父进程关闭句柄。因此,您可以使用类似的代码:

BOOL fSuccess = CreateProcess(..., &pi);
if (fSuccess) 
{
    // Close the thread handle as soon as it is no longer needed!
    CloseHandle(pi.hThread);
    ...
}

成功的等待副作用:对于某些内核对象,对 WaitForSingleObject/WaitForMultipleObjects 的成功等待(当对象被触发时)实际上会改变对象的状态。这种副作用适用于自动重置事件信号量对象。本程序同时使用了这两种副作用。对于信号量成功等待,程序用它来防止 SMT 项:当有可用项时释放线程,当没有可用项时使线程执行进入等待状态。

将项排队时,信号量将通过以下方式增加:

// Signal the copying thread pool to let it run
ReleaseSemaphore(m_hsemNumElements, CS->m_nThdCnt, NULL);

// For queue to have element && There's next item available
for (;(WaitForSingleObject(m_hsemNumElements, INFINITE) == WAIT_OBJECT_0);)
{
    ...
}

上面代码片段中的 for 语句只有在队列中有可用项时才会返回,并且在返回时,它还会将信号量资源计数减为零;否则,它将处于等待状态,空闲地等待下一个项的到来。

非阻塞等待

(WaitForSingleObject(m_hFindThd, IGNORE) == WAIT_OBJECT_0)
// where IGNORE equals zero.

当我寻找一种可以以最短时间测试对象信号量并返回的函数时,我在 MSDN 上看到了这个参数。因此,这样使用,可以立即测试对象的状态并返回。所以我的回顾是:在提问之前,先仔细阅读 MSDN。

临界区泄漏(孤立临界区)

for(...)
{
    EnterCriticalSection(&CS);
    break;
    ...
    LeaveCriticalSection(&CS);
}

当您处于一个由多个线程执行上述代码的线程池中时,要特别注意这类问题。在开发过程中,我曾遇到过这个问题,我在它们之间放了一个 break 语句,所以当脱离的线程强制在进入临界区后不释放它时,整个线程池就会被阻塞,其余的线程将永远无法再次进入临界区,这也就不足为奇了。

线程局部存储 (TLS)

起初,我认为我应该在线程池中使用 TLS 来存储与特定线程相关的变量,但后来我发现您只需要为全局或静态变量使用 TLS;如果您可以尽量少地使用此类变量,而更多地依赖于线程池中的自动(基于堆栈)变量,那么您可以忽略 TLS,因为这些变量都是线程本地的。

未来展望

在实现了该程序的基本功能后,我对 I/O 操作的最大化进行了一些研究。为了实现 I/O 的最大化性能,您需要考虑以下几点:

  1. 绕过文件系统缓存(使用 CreateFile(...) 并带有 FILE_FLAG_NO_BUFFERING)。
  2. 使用重叠 I/O(异步 I/O)。
  3. 该程序利用了绕过系统缓冲的优势,这意味着数据通过 SCSI 适配器使用 DMA(直接内存访问)直接进入应用程序,而不是先到系统再到应用程序。重叠 I/O 可以通过仅使用一个线程而不是多个线程来实现与本程序相同的效果,其中 Read()/Write() 函数在向操作系统发出命令后立即返回。因此,操作系统会在后台为您执行 I/O 工作,并在操作完成后通知您。这样,通过在任何瞬间为 I/O 子系统提供更多的工作来提高吞吐量。但是本程序尚未采用此机制,我们将其留待将来。

  4. 更好的用户界面(进度条、选项设置等)也很重要。

历史

  • 2008 年 4 月 7 日 — 发布了原始版本。
© . All rights reserved.