通过 HTTP 进行千兆字节文件上传 - Node.js 与 NGINX 版本





0/5 (0投票)
使用 NGINX Web 服务器和 NodeJS 后端通过 HTTP 进行千兆字节文件上传。该想法是将文件上传卸载到 NGINX Web 服务器,然后使用 NGINX client_body_in_file_only 指令告知 NODEJS 后端,在文件上传完成后应进行处理。代码
在 博客 撰写有关使用 Node.js 进行千兆字节文件上传的内容之后,我们希望做的一件事就是看看如何提高应用程序的性能。在以前的应用程序版本中,编写的代码大多是同步的,因此导致了高 CPU 使用率、大量的 I/O 操作以及相当多的内存消耗。总而言之,创建的内容更多是为了演示如何通过 HTTP 进行千兆字节文件上传的概念,而不是为了性能。
现在我们已经建立了概念,是时候看看如何提高应用程序的性能了。
性能调优
为了解决千兆字节文件上传性能问题,我们要关注以下几个方面:
- 在 Node.js 服务器前面实现反向代理服务器。
- 将文件上传请求卸载到反向代理。
- 将 MergeAll 阻塞同步代码转换为非阻塞异步代码
- 为每个后端请求创建 API。目前,UploadChunk API 调用用于管理所有上传。
- 从 MergeAll API 调用中移除校验和计算。将创建一个 GetChecksum API 来计算上传文件的校验和。
性能测试是在一台运行 NGINX 版本 1.9.9 和 Node.js 版本 5.3.0 的 Centos 7 虚拟机上进行的。这与我们之前的博客文章有所不同,因为之前的工作是在 Windows 2012 平台上完成的。
反向代理
Node.js 可以让你构建快速、可扩展的网络应用程序,能够处理大量并发连接并实现高吞吐量。这意味着从一开始 Node.js 就能够很好地处理千兆字节文件上传。
那么,为什么在这种情况下我们想在 Node.js 服务器前面使用反向代理呢?我们这样做是因为将文件处理卸载到 NGINX Web 服务器将减少 Node.js 后端的开销,这应该会提高性能。下图显示了如何实现这一点。
- 客户端计算机通过调用 XFileName API 上传文件块。一旦 NGINX 反向代理看到对 /api/CelerFTFileUpload/UploadChunk/XFileName 的调用,它就会将文件块保存到 NGINX 私有临时目录中,因为我们启用了 NGINX client_body_in_file_only 指令。NGINX 私有临时目录可以在 /tmp 下找到。之所以会这样,是因为在 NGINX systemd 文件中,PrivateTmp 配置选项设置为 true。有关 PrivateTmp 配置选项的更多信息,请参阅 systemd man 手册页。
- 在文件块保存后,NGINX 会设置 X-File-Name 标头,其中包含文件块的名称。这将发送到 Node.js。
- 当所有文件块都上传完成后,客户端会调用 MergeAll API,NGINX 会直接将此请求发送给 Node.js。一旦 Node.js 收到 MergeAll 请求,它就会合并所有上传的文件块以创建文件。
- 一旦 Node.js 收到 X-File-Name 标头,它就会将文件块从 NGINX 私有临时目录移动到文件上传目录,并使用正确的名称进行保存。
我们使用了以下 NGINX 配置
# redirect CelerFT location = /api/CelerFTFileUpload/UploadChunk/XFileName { aio on; directio 10M; client_body_temp_path /tmp/nginx 1; client_body_in_file_only on; client_body_buffer_size 10M; client_max_body_size 60M; proxy_pass_request_headers on; proxy_set_body off; proxy_redirect off; proxy_ignore_client_abort on; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header Host $host; ##proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-File-Name $request_body_file; proxy_pass http://127.0.0.1:1337; # proxy_redirect default; proxy_connect_timeout 600; proxy_send_timeout 600; proxy_read_timeout 600; send_timeout 600; access_log off; error_log /var/log/nginx/nginx.upload.error.log; }
关键参数是 X-File-Name 标头,它被设置为文件的名称。Node.js 后端必须然后处理各个块。代码的关键部分是找出 NGINX 私有临时目录的创建位置,因为 NGINX 会在那里写入文件块。在 systemd 下,NGINX 私有临时目录在每次 NGINX 重启时名称都会不同,因此我们必须在移动文件块到最终目的地之前获取该目录的名称。
app.post('*/api/CelerFTFileUpload/UploadChunk/XFileName*', function (request, response) { // Check if we uploading using a x-file-header // This means that we have offloaded the file upload to the // web server (NGINX) and we are sending up the path to the actual // file in the header. The file chunk will not be in the body // of the request if (request.headers['x-file-name']) { // Temporary location of our uploaded file // Nginx uses a private file path in /tmp on Centos // we need to get the name of that path var temp_dir = fs.readdirSync('/tmp'); var nginx_temp_dir = []; for (var i = 0; i < temp_dir.length; i++) { if (temp_dir[i].match('nginx.service')) { nginx_temp_dir.push(temp_dir[i]); } } var temp_path = '/tmp/' + nginx_temp_dir[0] + request.headers['x-file-name']; fs.move(temp_path , response.locals.localfilepath, {}, function (err) { if (err) { response.status(500).send(err); return; } // Send back a sucessful response with the file name response.status(200).send(response.locals.localfilepath); response.end(); }); } });
MergeAll 异步 API
在 上一篇 博客文章中,我们广泛使用了 fs.readdirSync
和 fs.readfileSync
函数调用。每次我们需要检查是否已上传所有文件块时,都会调用 fs.readdirSync
。在合并所有上传的文件块以创建文件时,会调用 fs.readfileSync
。
这些函数调用都是同步调用,每次调用都会导致 MergeAll API 阻塞。在 MergeAll API 中调用的 getfilesWithExtensionName 函数被替换为 fs.readdir
函数调用,用于检查我们是否已上传所有文件块。
getfilesWithExtensionName 函数。
function getfilesWithExtensionName(dir, ext) { var matchingfiles = []; if (fs.ensureDirSync(dir)) { return matchingfiles; } var files = fs.readdirSync(dir); for (var i = 0; i < files.length; i++) { if (path.extname(files[i]) === '.' + ext) { matchingfiles.push(files[i]); } } return matchingfiles; }
MergeAll API 编写为使用 fs.readdir
函数来检查我们是否已上传所有文件块。在每次调用 fs.readdir 时,我们将 fileslist 数组填充文件名。一旦我们上传了所有文件块,我们就会像这样用所有文件名填充一个名为 files 的数组。
for (var i = 0; i < fileslist.length; i++) { if (path.extname(fileslist[i]) == '.tmp') { //console.log(fileslist[i]); files.push(fileslist[i]); } }
接下来要做的是使用 fs.createWriteStream
创建输出文件。
// Create tthe output file var outputFile = fs.createWriteStream(filename);
然后,我们使用一个名为 mergefiles 的递归函数将文件块合并到最终输出文件中。在 mergefiles 函数中,我们使用 fs.createReadStream
来读取 files 数组中的每个文件,并将它们写入输出文件。mergefiles 函数以索引 0 调用,并且在每次成功调用 fs.createReadStream
后,我们都会递增索引。
var index = 0; // Recrusive function used to merge the files // in a sequential manner var mergefiles = function (index) { // If teh index matches the items in the array // end the function and finalize the output file if (index == files.length) { outputFile.end(); return; } console.log(files[index]); // Use a read stream too read the files and write them to the write stream var rstream = fs.createReadStream(localFilePath + '/' + files[index]); rstream.on('data', function (data) { outputFile.write(data); }); rstream.on('end', function () { //fs.removeSync(localFilePath + '/' + files[index]); mergefiles(index + 1); }); rstream.on('close', function () { fs.removeSync(localFilePath + '/' + files[index]); //mergefiles(index + 1); }); rstream.on('error', function (err) { console.log('Error in file merge - ' + err); response.status(500).send(err); return; }); }; mergefiles(index);
MergeAll API 调用的完整代码。
// Request to merge all of the file chunks into one file app.get('*/api/CelerFTFileUpload/MergeAll*', function (request, response) { if (request.method == 'GET') { // Get the extension from the file name var extension = path.extname(request.param('filename')); // Get the base file name var baseFilename = path.basename(request.param('filename'), extension); var localFilePath = uploadpath + request.param('directoryname') + '/' + baseFilename; var filename = localFilePath + '/' + baseFilename + extension; // Array to hold files to be processed var files = []; // Use asynchronous readdir function to process the files // This provides better i/o fs.readdir(localFilePath, function (error, fileslist) { if (error) { response.status(400).send('Number of file chunks less than total count'); //response.end(); console.log(error); return; } //console.log(fileslist.length); //console.log(request.param('numberOfChunks')); if ((fileslist.length) != request.param('numberOfChunks')) { response.status(400).send('Number of file chunks less than total count'); //response.end(); return; } // Check if all of the file chunks have be uploaded // Note we only want the files with a *.tmp extension if ((fileslist.length) == request.param('numberOfChunks')) { for (var i = 0; i < fileslist.length; i++) { if (path.extname(fileslist[i]) == '.tmp') { //console.log(fileslist[i]); files.push(fileslist[i]); } } if (files.length != request.param('numberOfChunks')) { response.status(400).send('Number of file chunks less than total count'); //response.end(); return; } // Create tthe output file var outputFile = fs.createWriteStream(filename); // Done writing the file. Move it to the top level directory outputFile.on('finish', function () { console.log('file has been written ' + filename); //runGC(); // New name for the file var newfilename = uploadpath + request.param('directoryname') + '/' + baseFilename + extension; // Check if file exists at top level if it does delete it // Use move with overwrite option fs.move(filename, newfilename , {}, function (err) { if (err) { console.log(err); response.status(500).send(err); //runGC(); return; } else { // Delete the temporary directory fs.remove(localFilePath, function (err) { if (err) { response.status(500).send(err); //runGC(); return; } // Send back a sucessful response with the file name response.status(200).send('Sucessfully merged file ' + filename); //response.end(); //runGC(); }); // Send back a sucessful response with the file name //response.status(200).send('Sucessfully merged file ' + filename + ", " + md5results.toUpperCase()); //response.end(); } }); }); var index = 0; // Recrusive function used to merge the files // in a sequential manner var mergefiles = function (index) { // If teh index matches the items in the array // end the function and finalize the output file if (index == files.length) { outputFile.end(); return; } console.log(files[index]); // Use a read stream too read the files and write them to the write stream var rstream = fs.createReadStream(localFilePath + '/' + files[index]); rstream.on('data', function (data) { outputFile.write(data); }); rstream.on('end', function () { //fs.removeSync(localFilePath + '/' + files[index]); mergefiles(index + 1); }); rstream.on('close', function () { fs.removeSync(localFilePath + '/' + files[index]); //mergefiles(index + 1); }); rstream.on('error', function (err) { console.log('Error in file merge - ' + err); response.status(500).send(err); return; }); }; mergefiles(index); } /*else { response.status(400).send('Number of file chunks less than total count'); //response.end(); return; }*/ }); } });
其他改进
如前所述,我们做的另一件事是为 CelerFT 支持的每种文件上传类型创建了一个 API 调用。
- Base64 API 调用将处理 CelerFT-Encoded 标头设置为 base64 的上传。
- FormData API 调用将处理所有 multipart/form-data 上传。
- XFileName API 调用将用于将文件上传卸载到 NGINX 反向代理。
初步测试表明,在文件上传期间后端服务器的性能有了显著的改进。请随时下载 CelerFT 并提供有关其性能的反馈。
该项目的代码可以在我的 github 存储库的 nginxasync 分支下找到。