65.9K
CodeProject 正在变化。 阅读更多。
Home

第八部分:使用 OpenCL 进行异构工作流

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (2投票s)

2012年2月13日

CPOL

12分钟阅读

viewsIcon

30574

本文将通过一个通用的“即插即用工具”框架,演示如何将 OpenCL 整合到异构工作流中,该框架可以在单个工作站、跨网络机器或云环境框架内流式传输任意消息。

本系列关于 OpenCL™ 便携式并行性的上一篇文章(第 7 部分)演示了如何创建 C/C++ 插件,这些插件可以在运行时动态加载,从而为已运行的应用程序添加大规模并行 OpenCL™ 功能。能够理解在动态加载的运行时环境中使用的 OpenCL 的开发人员,可以通过编写使用 OpenCL 的新插件,将现有应用程序的性能提高一个数量级甚至更多。

本文将通过一个通用的“即插即用工具”框架,演示如何将 OpenCL 整合到异构工作流中,该框架可以在单个工作站、跨网络机器或云环境框架内流式传输任意消息(向量、数组以及任意复杂的嵌套结构)。创建可扩展工作流的能力非常重要,因为数据处理和转换可能与用于生成所需结果的计算问题一样复杂和耗时。我本教程中描述的框架的生产版本已成功地在商业和研究环境中将多个超级计算机和众多计算节点整合到一个统一的工作流中。

为了通用性,本教程使用了可免费下载的 Google protobufs(Protocol Buffers)包,以便读者可以轻松地扩展示例代码以操作他们自己的数据结构。Google protobufs 提供了跨机器的二进制互操作性,以及将用 C/C++、Python、Java、R 和许多其他语言编写的应用程序整合到其工作流中的能力。protobufs 在 Google 的日常计算中都有使用,并且已经过生产验证。Google 还声称这种二进制格式比 XML 提供了 20 到 100 倍的性能提升。

即插即用框架中的 Google protobufs

对于许多应用程序来说,数据预处理可能与生成所需结果的实际计算一样复杂和耗时。“即插即用工具”是一种常见的设计模式,它通过创建应用程序管道来处理信息,从而实现灵活高效的数据工作流。管道中的每个元素都从输入(通常是 stdin)读取数据,执行一些过滤或转换操作,并将结果写入输出(通常是 stdout)。信息以信息包的形式通过这些管道流动,信息包由字节流组成。“即插即用”管道中的所有元素都能够读取消息(有时称为信息包)并将其写入输出。基于信息包的类型,管道中的元素可以决定对数据进行操作,或者仅仅将其传递给管道中的其他元素。

“即插即用”框架自然地利用了多核处理器的并行性,因为管道中的每个元素都是一个独立的应用程序。操作系统调度程序确保任何拥有执行工作所需数据的应用程序都将运行——通常是在独立的处理器核心上。应用程序之间的缓冲允许对非常大的数据集进行即时处理。同样,可以通过 sshsocat 或等效的基于套接字的应用程序或库将数据跨机器进行管道传输,从而利用多台机器的并行性。

可以通过将网络或云中的多台机器连接起来构建可扩展、高性能的工作流。组件和插件的重用可减少错误,因为大多数工作流都可以由现有的“已知有效”应用程序构建。动态运行时加载的灵活性允许 OpenCL 被用作高性能、大规模并行的脚本语言,以创建和实现新的工作流。可以使用基于简单套接字编程技术(如 select()poll())的负载均衡分割操作,轻松构建高度复杂的多流工作流,以确定何时某个流已准备好接收更多数据。下面一张图展示了一种可能的工作流,该工作流利用了系统内部和跨网络的多核处理器和多个 GPU。

image001.jpg

图 1:工作流示例

请记住,每条信息流都可以写入磁盘作为工作存档,用于检查点保存结果,或用作后续应用程序的输入。

从数十年来使用这种“即插即用”框架获得的经验表明,每个信息包都需要前面有一个类似如下所示的头部

版本号 数据包大小(字节) 数据包类型 ID 数据包大小(字节)
图 2:头部示例

头部包含版本号非常重要,因为它允许库透明地选择正确的格式和 序列化 方法。例如,我在 20 世纪 80 年代初在洛斯阿拉莫斯国家实验室保存到磁盘的数据流至今仍可用。为了提高鲁棒性,必须将数据包的大小复制一份,以检测“静默”传输错误。否则,数据包大小中的位错误可能导致奇怪的故障,因为应用程序可能突然尝试分配 232 或 264 字节的内存(取决于用于存储大小的位数)。我在处理集群机器上的数据集时,在处理耗时数周的情况下,遇到过此类错误。机器故障可能导致通过 TCP 网络成功传输了错误的信息。

大小信息的冗余提供了很高的可能性来发现持久流中的 位翻转。某些磁盘子系统,尤其是廉价的磁盘子系统,容易受到多比特数据错误引起的位翻转的影响。只要大小信息正确,数据包信息就可以正确加载到内存中,在那里可以进行其他更广泛的检查或错误恢复。即使某个数据包已损坏,流中的其余数据包也可以正确加载,因此并非所有内容都丢失了。

以下是可适用于多种语言的头部简单定义

struct simpleHeader {
   uint64_t version, size1, packetID, size2;
}
示例 1:头部结构

每个应用程序,无论使用何种语言,都必须能够读取和理解此头部。应用程序程序员可以决定如何处理每个信息包。至少,程序员可以仅传递信息包而不影响数据包内容,或者丢弃数据包并将其从数据流中移除。无论如何,所有头部数据都使用网络标准字节序以二进制格式在应用程序之间传输,以便可以使用任意机器架构。自 20 世纪 80 年代中期以来售出的绝大多数机器上都成功运行了流协议。

以下伪代码描述了如何读取和写入一个或多个信息包。为了清晰起见,此伪代码确实会检查每个 I/O 操作是否成功。实际生产代码需要非常严格地检查每个操作。

while ( read the binary header information == SUCCESS)
{
    •    Compare header sizes (a mismatch flags an unrecoverable error)
    •    Allocate size bytes (after converting from network standard byte order)
    •    Binary read of size bytes into the allocated memory.

// perform the write
    •    Binary write the header in network standard byte order
    •    Binary write the packet information
}
示例 2:展示如何在流中使用 protobufs 的伪代码

大多数用户将使用一种常见的数据交换格式作为数据包数据。通过在头部正确指定数据包的类型,也可以将专有的、特殊的、高性能的格式混合到任何数据流中。只要有可能,出于性能原因,应使用二进制数据。如前所述,本教程使用 Google Protocol Buffers(protobufs),因为它们是一种支持良好、免费的二进制数据交换格式,它快速、经过充分测试且鲁棒。Google 在其大多数内部 RPC 协议和文件格式中使用 protobufs。可以通过通用的 protobuf 描述生成多种目标语言的代码。许多常用语言都存在生成器,包括 C、C++、Python、Java、Ruby、PHP、Matlab、Visual Basic 等。我在工作站和超级计算机上都使用过 protobufs。

以下 protobuf 规范演示了如何描述包含各种类型向量的消息。为简单起见,仅定义了单精度和双精度向量。

package tutorial;

enum Packet {
  UNKNOWN=0;
  PB_VEC_FLOAT=1;
  PB_VEC_DOUBLE=2;
  PB_VEC_INT=3;
}
message FloatVector {
  repeated float values    = 1 [packed = true];
  optional string name = 2;
}
message DoubleVector {
  repeated float values    = 1 [packed = true];
  optional string name = 2;
}
示例 3:tutorial.proto

使用 protoc 源语言生成器将 .proto 文件编译到目标源语言。以下是使用 tutorial.proto 生成 C++ 源包的命令。protoc 编译器还将生成 Java 和 Python 源包。有关其他语言的源生成器链接,请参阅 protobuf 网站

protoc --cpp_out=. tutorial.proto

示例 4:用于生成 C++ 代码的 protoc 命令

Linux 用户可以从应用程序管理器(如 Ubuntu 下的“apt-get”)安装 protobufs。Cygwin 和 Windows 用户需要从 code.google.com 下载并安装 protobufs。Google 提供了 Visual Studio 解决方案来帮助构建代码生成器和库。

以下文件 packetheader.h 包含在包含多个 protobuf 消息的流中读取和写入头部和消息信息的 方法。为了通用性,请注意消息类型是通过 .proto 文件中的枚举定义的。您可以通过向这些定义添加内容来利用自己的消息包。

为简洁起见,packetheader.h 中省略了许多基本检查。C++ 纯粹主义者会注意到 std::cinstd::cout 已被修改为支持二进制信息,如 setPacket_binaryIO() 方法。这样做是为了方便,因为它允许使用操作系统管道(用 ‘|’ 表示)轻松地“即插即用”应用程序。虽然不是 C++ 标准的一部分,但大多数 C++ 运行时系统都支持 std::cinstd::cout 上的二进制 I/O。那些反对这种做法的 C++ 程序员可以(1)更改脚本以手动指定 FIFO(先进先出队列)和网络连接,以便他们可以按照 C++ 标准使用二进制 I/O,或者(2)使用 C 语言。Windows 程序员会注意到 packetheader.h 使用 Microsoft 提供的 _setmode() 方法来执行二进制 I/O。

#ifndef PACKET_HEADER_H
#define PACKET_HEADER_H
 
#ifdef _WIN32
#include <stdio.h>
#include <fcntl.h>
#include <io.h>
#include <stdint.h>
#include <Winsock2.h>
#else
#include <arpa/inet.h>
#endif
 
// a simple version identifier
static const uint32_t version=1;
 
// change cin and cout so C++ can use binary
inline bool setPacket_binaryIO()
{
#ifdef _WIN32
  if(_setmode( _fileno(stdin), _O_BINARY) == -1)
     return false;
  if(_setmode( _fileno(stdout), _O_BINARY) == -1) 
     return false;
#endif
  return true;
}
 
inline bool writePacketHdr (uint32_t size, uint32_t type, std::ostream *out)
{
  size = htonl(size);
  type = htonl(type);
  out->write((const char *)&version, sizeof(uint32_t));
  out->write((const char *)&size, sizeof(uint32_t));
  out->write((const char *)&type, sizeof(uint32_t));
  out->write((const char *)&size, sizeof(uint32_t));
  return true;
}
 
template <typename T>
bool writeProtobuf(T &pb, uint32_t type, std::ostream *out)
{
  writePacketHdr(pb.ByteSize(), type, out);
  pb.SerializeToOstream(out);
  return true;
}
 
inline bool readPacketHdr (uint32_t *size, uint32_t *type, std::istream *in) 
{
  uint32_t size2, myversion;
 
  in->read((char *)&myversion, sizeof(uint32_t)); myversion = ntohl(myversion);
  if(!in->good()) return(false);
  in->read((char *)size, sizeof(uint32_t)); *size = ntohl(*size);
  if(!in->good()) return(false);
  in->read((char *)type, sizeof(uint32_t)); *type = ntohl(*type);
  if(!in->good()) return(false);
  in->read((char *)&size2, sizeof(uint32_t)); size2 = ntohl(size2);
  if(!in->good()) return(false);
 
  if(*size != size2) return(false);
  return(true);
}
 
template <typename T>
bool readProtobuf(T *pb, uint32_t size, std::istream *in)
{
  char *blob = new char[size];
  in->read(blob,size);
  bool ret = pb->ParseFromArray(blob,size);
  delete [] blob;
  return ret;
}
#endif

程序 testWrite.cc 演示了如何创建和写入 doublefloat 向量消息。默认向量长度为 100 个元素。可以通过在命令行指定大小来创建更大的消息。

// Rob Farber
#include <iostream>
using namespace std;
#include "tutorial.pb.h"
#include "packetheader.h"
 
int main(int argc, char *argv[])
{
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
  int vec_len = 100;
  // allow user to change the size of the data if they wish
  if(argc > 1) vec_len = atoi(argv[1]);
  
  // change cin and cout to binary mode
  // NOTE: this is not part of the C++ standard
  if(!setPacket_binaryIO()) return -1;
 
  tutorial::FloatVector vec;
  for(int i=0; i < vec_len; i++) vec.add_values(i);
 
  tutorial::DoubleVector vec_d;
  for(int i=0; i < 2*vec_len; i++) vec_d.add_values(i);
  
  vec.set_name("A");
  writeProtobuf<tutorial::FloatVector>(vec, tutorial::PB_VEC_FLOAT,
                                  &std::cout);
  vec_d.set_name("B");
  writeProtobuf<tutorial::DoubleVector>(vec_d, tutorial::PB_VEC_DOUBLE,
                                  &std::cout);
  return(0);
}
示例 6:testWrite.cc

程序 testRead.cc 演示了如何通过流读取头部和消息。当提供时,会打印与 protobuf 消息中可选名称关联的字符串。

// Rob Farber
#include <iostream>
#include "packetheader.h"
#include "tutorial.pb.h"
using namespace std;
 
int main(int argc, char *argv[])
{
  GOOGLE_PROTOBUF_VERIFY_VERSION;
 
  // Change cin and cout to binary mode
  // NOTE: this is not part of the C++ standard
  if(!setPacket_binaryIO()) return -1;
 
  uint32_t size, type;
  while(readPacketHdr(&size, &type, &std::cin)) {
    switch(type) {
    case tutorial::PB_VEC_FLOAT: {
      tutorial::FloatVector vec;
      if(!readProtobuf<tutorial::FloatVector>(&vec, size, &std::cin))
       break;
      if(vec.has_name() == true) cerr << "vec_float " << vec.name() << endl;
      cerr << vec.values_size() << " elements" << endl;
    } break;
    case tutorial::PB_VEC_DOUBLE: {
      tutorial::DoubleVector vec;
      if(!readProtobuf<tutorial::DoubleVector>(&vec, size, &std::cin))
       break;
      if(vec.has_name() == true) cerr << "vec_double " << vec.name() << endl;
      cerr << vec.values_size() << " elements" << endl;
    } break;
    default:
      cerr << "Unknown packet type" << endl;
    }
  }
  return(0);
}
示例 7:testRead.cc

可以使用以下命令在 Linux 下构建和测试这些应用程序

g++ -I . testWrite.cc tutorial.pb.cc -l protobuf -o testWrite -lpthread
g++ -I . testRead.cc tutorial.pb.cc -l protobuf -o testRead -lpthread
 
echo "----------- simple test -----------------"
./testWrite | ./testRead
示例 8:Linux 构建和测试命令
bda$ sh BUILD.linux 
----------- simple test -----------------
vec_float A
100 elements
vec_double B
200 elements
示例 9:Linux 命令的输出

一个即插即用框架

结合之前的动态编译/链接和 protobuf 示例,即可获得本文前面讨论的强大“即插即用”框架。

以下是 dynFunc.cc 的完整源代码,它结合了 protobuf 消息的流式传输和 C/C++ 方法的动态编译。

为了灵活性,init()func()fini() 方法能够修改或创建可以传递给即插即用框架中其他应用程序的新消息。插件作者有责任创建一个 char 数组来保存修改后的 protobuf 消息。添加了一个 dynFree() 方法,以便插件作者可以释放内存区域。这使得插件框架与语言无关。例如,C 源代码将使用 malloc()/free(),而 C++ 源代码将使用 new/delete

每个方法都可以返回一个指向字符数组的指针,该数组包含要写入的修改后的消息。返回 NULL 表示不需要写入消息。插件作者可以通过返回消息指针来传递原始消息。在这种情况下,不会调用 dynFree(),因为插件框架已执行分配。

//Rob Farber
#include <cstdlib>
#include <sys/types.h>
#include <dlfcn.h>
#include <string>
#include <iostream>
#include "packetheader.h"
 
using namespace std;
 
void *lib_handle;
 
typedef char* (*initFini_t)(const char*, const char*, uint32_t*, uint32_t*);
typedef char* (*func_t)(const char*, const char*, uint32_t*, uint32_t*, char*);
typedef void (*dynFree_t)(char*);
 
int main(int argc, char **argv) 
{
  if(argc < 2) {
    cerr << "Use: sourcefilename" << endl;
    return -1;
  }
  string base_filename(argv[1]);
  base_filename = base_filename.substr(0,base_filename.find_last_of("."));
  
  // build the shared object or dll
  string buildCommand("g++ -fPIC -shared ");
  buildCommand += string(argv[1]) 
    + string(" -o ") + base_filename + string(".so ");
 
  cerr << "Compiling with \"" << buildCommand << "\"" << endl;
  if(system(buildCommand.c_str())) {
    cerr << "compile command failed!" << endl;
    cerr << "Build command " << buildCommand << endl;
    return -1;
  }
  
  // load the library -------------------------------------------------
  string nameOfLibToLoad("./");
  nameOfLibToLoad += base_filename;
  
  nameOfLibToLoad += ".so";
  lib_handle = dlopen(nameOfLibToLoad.c_str(), RTLD_LAZY);
  if (!lib_handle) {
    cerr << "Cannot load library: " << dlerror() << endl;
    return -1;
  }
  
  // load the symbols -------------------------------------------------
  initFini_t dynamicInit= NULL;
  func_t dynamicFunc= NULL;
  initFini_t dynamicFini= NULL;
  dynFree_t dynamicFree= NULL;
 
  // reset errors
  dlerror();
  
  // load the function pointers
  dynamicFunc= (func_t) dlsym(lib_handle, "func");
  const char* dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicInit= (initFini_t) dlsym(lib_handle, "init");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicFini= (initFini_t) dlsym(lib_handle, "fini");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicFree= (dynFree_t) dlsym(lib_handle, "dynFree");
 
  // 
  // work with protobufs
  // 
 
  //enable C++ binary cin and cout
  if (!setPacket_binaryIO()) {
     cerr << "Cannot set binary mode for cin and cout!" << endl;
     return -1;
     }
 
  uint32_t size, type;
  char *retBlob;
 
  // handle initialization and put information on output stream when told
  if( (retBlob=(*dynamicInit)(argv[0], base_filename.c_str(),&size, &type)) ) {
    writePacketHdr(size, type, &std::cout);
    cout.write(retBlob, size);
    (dynamicFree)(retBlob);
  }
 
  // read stream from cin and put information on output stream when told
  while(readPacketHdr(&size, &type, &std::cin)) {
    char *blob = new char[size];
    cin.read(blob, size);
    retBlob =(*dynamicFunc)(argv[0], base_filename.c_str(), &size, &type, blob);
    if(retBlob) {
      writePacketHdr(size, type, &std::cout);
      cout.write(retBlob, size);
      // optimization: if retBlob == blob then allocated was by this program
      if(retBlob != blob) (dynamicFree)(retBlob);
    }
    delete [] blob;
  }
 
  // handle finalization (fini) and put information on output stream when told
  if( retBlob = (*dynamicFini)(argv[0], base_filename.c_str(),&size, &type) ) {
    writePacketHdr(size, type, &std::cout);
    cout.write(retBlob, size);
    (dynamicFree)(retBlob);
  }
  
  // unload the library -----------------------------------------------
  dlclose(lib_handle);
  return 0;
}
示例 10:dynFunc.cc

passthrough.cc 的源代码包含在下面。此插件仅将所有消息传递给管道中的下一个应用程序。Linux 用户会注意到 g++ 命令包含 –rdynamic 选项的使用,该选项告诉链接器检查可执行文件中任何未解析的符号。(虽然本教程未提供 Visual Studio 代码,但 Visual Studio 用户指定 #pragma comment() 来告知链接器所需的库很重要。这样,protobuf 方法就可以与插件链接。)

//passthrough.cc (Rob Farber)
#include <stdlib.h>
#include <stdint.h>
#include <iostream>
#include "tutorial.pb.h"
using namespace std;
 
extern "C" char* init(const char* progname, const char* sourcename,
          uint32_t *size, uint32_t *type) {
  return(NULL); 
}
 
extern "C" char* func(const char* progname, const char* sourcename, 
          uint32_t *size, uint32_t *type, char *blob)
{
  return(blob); //Note: this is a special case that will not invoke dynFree
}
 
extern "C" char* fini(const char* progname, const char* sourcename,
          uint32_t *size, uint32_t *type) {
  return(NULL); 
}
 
extern "C" void dynFree(char* pt) {
  cerr << "dynFree" << endl;
  if(pt) delete [] pt;
}
示例 11:passthrough.cc

以下源文件 reduction.cc 演示了如何使用 init()fini() 在主机上计算 floatdouble protobuf 向量消息的总和。

//reduction.cc (Rob Farber)
#include <stdlib.h>
#include <stdint.h>
#include <iostream>
#include "tutorial.pb.h"
using namespace std;
 
extern "C" char* init(const char* progname, const char* sourcename, 
                    uint32_t *size, uint32_t *type) {
  return(NULL); 
}
 
extern "C" char* func(const char* progname, const char* sourcename, 
                    uint32_t *size, uint32_t *type, char *blob)
{
  switch(*type) {
  case tutorial::PB_VEC_FLOAT: {
    tutorial::FloatVector vec;
    if(!vec.ParseFromArray(blob,*size)) {
      cerr << progname << "," << sourcename << "Illegal packet" << endl;
    } else {
      if(vec.has_name() == true) cerr << "vec_float " << vec.name() << " ";
      float sum=0.f;
      for(int i=0; i < vec.values_size(); i++) sum += vec.values(i);
      cerr << "sum of vector " << sum << endl;
      cerr << "\tlast value in vector is " << vec.values(vec.values_size()-1)
          << endl;
      cerr << "\tvector size is " << vec.values_size() << endl;
    }
  } break;
  case tutorial::PB_VEC_DOUBLE: {
    tutorial::DoubleVector vec;
    if(!vec.ParseFromArray(blob,*size)) {
      cerr << progname << "," << sourcename << "Illegal packet" << endl;
    } else {
      if(vec.has_name() == true) cerr << "vec_double " << vec.name() << " ";
      double sum=0.;
      for(int i=0; i < vec.values_size(); i++) sum += vec.values(i);
      cerr << "sum of vector " << sum << endl;
      cerr << "\tlast value in vector is " << vec.values(vec.values_size()-1)
          << endl;
      cerr << "\tvector size is " << vec.values_size() << endl;
    }
  } break;
  default:
    cerr << "Unknown packet type" << endl;
  }
  return(NULL);
}
 
extern "C" char* fini(const char* progname, const char* sourcename, 
                    uint32_t *size, uint32_t *type) {
  return(NULL); 
}
 
extern "C" void dynFree(char* pt) {
  if(pt) delete [] pt;
}
示例 12:reduction.cc

在即插即用框架中使用 OpenCL

dynFunc.cc 的源代码可以轻松修改,以包含解析设备类型(CPU 或 GPU)所需的代码,并添加对新方法 oclSetupFunc() 的调用,该方法将 OpenCL 上下文和内核源文件的名称传递给插件。然后,插件可以在 init()func()fini() 方法中构建 OpenCL 源代码并调用 OpenCL 内核。代码中的更改在下面的 oclFunc.cc 源代码中以粗体显示。

//Rob Farber (dynOCL.cc)
#include <cstdlib>
#include <sys/types.h>
#include <dlfcn.h>
#include <string>
#include <iostream>
#include "packetheader.h"
 
#define PROFILING // Define to see the time the kernel takes
#define __NO_STD_VECTOR // Use cl::vector instead of STL version
#define __CL_ENABLE_EXCEPTIONS // needed for exceptions
#include <CL/cl.hpp>
#include <fstream>
 
using namespace std;
void *lib_handle;
 
typedef char* (*initFini_t)(const char*, const char*, uint32_t*, uint32_t*);
typedef char* (*func_t)(const char*, const char*, uint32_t*, uint32_t*, char*);
typedef void (*dynFree_t)(char*);
typedef void (*oclSetup_t)(const char*, cl::CommandQueue*);
 
int main(int argc, char **argv) 
{
  if(argc < 3) {
    cerr << "Use: sourcefilename cpu|gpu oclSource" << endl;
    return -1;
  }
  string base_filename(argv[1]);
  base_filename = base_filename.substr(0,base_filename.find_last_of("."));
  
  // build the shared object or dll
  string buildCommand("g++ -fPIC -shared -I $AMDAPPSDKROOT/include ");
  buildCommand += string(argv[1]) 
    + string(" -o ") + base_filename + string(".so ");
 
  cerr << "Compiling with \"" << buildCommand << "\"" << endl;
  if(system(buildCommand.c_str())) {
    cerr << "compile command failed!" << endl;
    cerr << "Build command " << buildCommand << endl;
    return -1;
  }
  
  // load the library -------------------------------------------------
  string nameOfLibToLoad("./");
  nameOfLibToLoad += base_filename;
  
  nameOfLibToLoad += ".so";
  lib_handle = dlopen(nameOfLibToLoad.c_str(), RTLD_LAZY);
  if (!lib_handle) {
    cerr << "Cannot load library: " << dlerror() << endl;
    return -1;
  }
  
  // load the symbols -------------------------------------------------
  initFini_t dynamicInit= NULL;
  func_t dynamicFunc= NULL;
  initFini_t dynamicFini= NULL;
  dynFree_t dynamicFree= NULL;
 
  // reset errors
  dlerror();
  
  // load the function pointers
  dynamicFunc= (func_t) dlsym(lib_handle, "func");
  const char* dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicInit= (initFini_t) dlsym(lib_handle, "init");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicFini= (initFini_t) dlsym(lib_handle, "fini");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  dynamicFree= (dynFree_t) dlsym(lib_handle, "dynFree");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
  // add a function to specify the ocl context and kernel file
  oclSetup_t oclSetupFunc;
  oclSetupFunc = (oclSetup_t) dlsym(lib_handle, "oclSetup");
  dlsym_error = dlerror();
  if (dlsym_error) { cerr << "sym load: " << dlsym_error << endl; return -1;}
 
  // -------------------------------------------------------------- 
  // Setup OCL context
  //
  const string platformName(argv[2]);
  const char* oclKernelFile = argv[3];
  int ret= -1;
 
  cl::vector<int> deviceType;
  cl::vector< cl::CommandQueue > contextQueues;
 
  // crudely parse the command line arguments. 
  if(platformName.compare("cpu")==0)
    deviceType.push_back(CL_DEVICE_TYPE_CPU);
  else if(platformName.compare("gpu")==0) 
    deviceType.push_back(CL_DEVICE_TYPE_GPU);
  else { cerr << "Invalid device type!" << endl; return(1); }
 
  // create the context and queues
  try {
    cl::vector< cl::Platform > platformList;
    cl::Platform::get(&platformList);
 
    // Get all the appropriate devices for the platform the
    // implementation thinks we should be using.
    // find the user-specified devices
    cl::vector<cl::Device> devices;
    for(int i=0; i < deviceType.size(); i++) {
      cl::vector<cl::Device> dev;
      platformList[0].getDevices(deviceType[i], &dev);
      for(int j=0; j < dev.size(); j++) devices.push_back(dev[j]);
    }
 
    // set a single context
    cl_context_properties cprops[] = {CL_CONTEXT_PLATFORM, NULL, 0};
    cl::Context context(devices, cprops);
    cerr << "Using the following device(s) in one context" << endl;
    for(int i=0; i < devices.size(); i++)  {
      cerr << "  " << devices[i].getInfo<CL_DEVICE_NAME>() << endl;
    }
 
    // Create the separate command queues to perform work
    for(int i=0; i < devices.size(); i++)  {
#ifdef PROFILING
      cl::CommandQueue queue(context, devices[i],CL_QUEUE_PROFILING_ENABLE);
#else
      cl::CommandQueue queue(context, devices[i],0);
#endif
      contextQueues.push_back( queue );
    }
  } catch (cl::Error error) {
    cerr << "caught exception: " << error.what() 
        << '(' << error.err() << ')' << endl;
    return(-1);
  }
  oclSetupFunc(oclKernelFile, &contextQueues[0]);
 
  // -------------------------------------------------------------- 
  // work with protobufs
  // 
 
  //enable C++ binary cin and cout
  if (!setPacket_binaryIO()) {
     cerr << "Cannot set binary mode for cin and cout!" << endl;
     return -1;
     }
 
  uint32_t size, type;
  char *retBlob;
 
  // handle initialization and put information on output stream when told
  if( (retBlob=(*dynamicInit)(argv[0], base_filename.c_str(),&size, &type)) ) {
    writePacketHdr(size, type, &std::cout);
    cout.write(retBlob, size);
    (dynamicFree)(retBlob);
  }
 
  // read stream from cin and put information on output stream when told
  while(readPacketHdr(&size, &type, &std::cin)) {
    char *blob = new char[size];
    cin.read(blob, size);
    retBlob =(*dynamicFunc)(argv[0], base_filename.c_str(), &size, &type, blob);
    if(retBlob) {
      writePacketHdr(size, type, &std::cout);
      cout.write(retBlob, size);
      // optimization: if retBlob == blob then allocated was by this program
      if(retBlob != blob) (dynamicFree)(retBlob);
    }
    delete [] blob;
  }
 
  // handle finalization (fini) and put information on output stream when told
  if( retBlob = (*dynamicFini)(argv[0], base_filename.c_str(),&size, &type) ) {
    writePacketHdr(size, type, &std::cout);
    cout.write(retBlob, size);
    (dynamicFree)(retBlob);
  }
  
  // unload the library -----------------------------------------------
  dlclose(lib_handle);
  return 0;
}

以下 OpenCL 内核将设备上的向量的每个元素与其自身相加。然后将生成的向量传递给管道中的下一个应用程序。请注意,此示例中未优化副本的数量。

inline __kernel void init(int veclen, __global TYPE1* c, int offset)
{
}
 
inline __kernel void func(int veclen, __global TYPE1* c, int offset)
{
  // get the index of the test we are performing
  int index = get_global_id(0);
 
  c[index + offset*veclen] += c[index + offset*veclen];
}
 
inline __kernel void fini(int veclen, __global TYPE1* c, int offset)
{
}
示例 14:simpleAdd.cl 源文件,用于将向量中的元素与其自身相加

以下命令用于在 Linux 下构建和运行一些测试

gcc -rdynamic -o dynFunc dynFunc.cc tutorial.pb.cc -l protobuf -ldl -lpthread
gcc -I $AMDAPPSDKROOT/include -rdynamic -o dynOCL dynOCL.cc tutorial.pb.cc -L $AMDAPPSDKROOT/lib/x86_64 -lOpenCL -lprotobuf -ldl -lpthread  

echo "--------------- showing a dynamic reduction ----------------------------"
./testWrite | ./dynFunc reduction.cc
 
echo "---------------- Pass through demo -------------------------------"
./testWrite | ./dynFunc passthrough.cc \
   | ./dynFunc reduction.cc
 
 
echo "------------- the float vector contains values*4 -----------------------"
# increase the float vector by a factor of four
./testWrite \
       | ./dynOCL oclFunc.cc cpu simpleAdd.cl \
       | ./dynOCL oclFunc.cc gpu simpleAdd.cl \
       | ./dynFunc reduction.cc
示例 15:用于构建和测试示例的 Linux 命令

这些命令在 Linux 下生成了以下输出。请注意,在最后一个测试中,使用 simpleAdd.clOpenCL 插件被调用了两次,以将浮点向量的值增加四倍,如下面的粗体所示。此示例演示了 OpenCL 插件可以链式调用。此外,第一次使用 simpleAdd.cl 在 CPU 上运行,第二次在 GPU 上运行。

$: sh BUILD.linux 
--------------- showing a dynamic reduction ----------------------------
Compiling with "g++ -fPIC -shared reduction.cc -o reduction.so "
vec_float A sum of vector 4950
       last value in vector is 99
       vector size is 100
vec_double B sum of vector 19900
       last value in vector is 199
       vector size is 200
---------------- Pass through demo -------------------------------
Compiling with "g++ -fPIC -shared passthrough.cc -o passthrough.so "
Compiling with "g++ -fPIC -shared reduction.cc -o reduction.so "
vec_float A sum of vector 4950
       last value in vector is 99
       vector size is 100
vec_double B sum of vector 19900
       last value in vector is 199
       vector size is 200
------------- the float vector contains values*4 -----------------------
Compiling with "gcc -fPIC -shared -I $AMDAPPSDKROOT/include oclFunc.cc -o oclFunc.so "
Compiling with "g++ -fPIC -shared reduction.cc -o reduction.so "
Compiling with "gcc -fPIC -shared -I $AMDAPPSDKROOT/include oclFunc.cc -o oclFunc.so "
Using the following device(s) in one context
  AMD Phenom(tm) II X6 1055T Processor
building OCL source (simpleAdd.cl)
   buildOptions -D TYPE1=float 
Using the following device(s) in one context
  Cypress
building OCL source (simpleAdd.cl)
   buildOptions -D TYPE1=float 
dynFree
dynFree
vec_float A sum of vector 19800
       last value in vector is 396
       vector size is 100
vec_double B sum of vector 19900
       last value in vector is 199
       vector size is 200
示例 16:Linux 命令和测试的输出

摘要

通过创建 OpenCL 插件的能力,应用程序程序员可以编写和支持通用应用程序,这些应用程序在存在 GPU 时提供加速性能,在没有 GPU 时提供基于 CPU 的性能。这些插件架构是经过充分理解的,并且是利用现有应用程序和代码库的便捷方式。它们还有助于保护现有的软件投资。

动态编译 OpenCL 插件的能力为优化代码生成器和在多个设备后端上透明运行单个 OpenCL 应用程序打开了许多机会。通过这种技术,OpenCL 应用程序可以为特定问题提供优化代码,并实现远超单个通用代码所能达到的极高性能。随着 OpenCL 的成熟,这些应用程序还将从任何编译器和其他性能改进中获得透明的收益。

本文中的工作流示例演示了如何使用 OpenCL 来利用混合 CPU/GPU 计算。“即插即用”管道工作流的灵活性、鲁棒性、可扩展性和性能相结合,使程序员能够在其生产工作流中利用 GPU 加速和 CPU 功能。

© . All rights reserved.