使用 C++ 委托进行远程过程调用





5.00/5 (8投票s)
使用 C++ 委托库通过任何通信媒介调用远程函数
引言
C++ 委托简化了发布/订阅模式的使用。通过委托,客户端代码匿名注册回调函数指针以接收运行时通知。在其他语言中,委托是头等特性并内置于语言中。但在 C++ 中并非如此,这使得开发人员需要创建自定义库来模拟委托。
委托通常支持同步执行,也就是说,当被调用时,绑定的函数在调用者的控制线程中执行。几年前,我写了一篇题为“C++ 中的异步多播委托”的文章。该库为任何可调用函数提供同步和异步函数调用。简而言之,回调函数和回调线程在订阅者注册期间指定。在通知期间,绑定的回调函数在订阅者所需的控制线程上调用。
本文解释了我原始 C++ 委托库的扩展:远程委托。远程委托在远程系统上调用函数(带数据)。远程系统定义为在由通信链接分隔的不同 CPU 上运行的应用程序,或在独立进程中执行的程序。对于用户来说,委托看起来是本地的;然而,该库只需很少的努力即可调用远程函数。将远程委托视为使用 C++ 委托实现的符合 C++ 标准的远程过程调用 (RPC)。
远程委托库的特点是
- 远程调用 – 远程调用任何可调用函数(最多 5 个参数)
- 任何协议 – 支持任何传输介质:UDP、TCP、串口、命名管道
- 任何序列化 – 支持任何对象序列化方法:二进制、JSON、XML
- 字节序 – 处理不同的 CPU 架构
- 任何编译器 – 适用于任何编译器的标准 C++ 代码,无需奇怪的技巧
- 任何操作系统 – 易于移植到任何操作系统。包含 Win32、POSIX 和 std::thread 端口
- 任意函数 - 调用任何可调用函数:成员函数、静态函数或自由函数
- 任意参数类型 - 支持任何参数类型:值、引用、指针、指针的指针
- 多个参数 – 支持多个函数参数
远程委托实现显著简化了远程应用程序之间的数据和对象传递。编写少量平台特定代码以使库适应特定的操作系统和通信协议。之后,该框架处理所有底层机制,以安全地在远程系统上调用任何函数签名。
原始的异步委托实现致力于通过使用 C++ 委托在线程之间调用函数和传递数据来简化线程间通信。远程委托将库扩展到包括进程间和处理器间通信。
原始文章C++ 中的异步多播委托涵盖了所有同步和异步委托库功能。本文的重点是新的远程委托增强功能。
委托背景
如果您不熟悉委托,这个概念非常简单。委托可以被认为是超级函数指针。在 C++ 中,没有指针类型能够指向所有可能的函数变体:实例成员、虚函数、const、静态和自由(全局)。函数指针不能指向实例成员函数,指向成员函数的指针有各种限制。然而,委托类可以以类型安全的方式指向任何函数,只要函数签名匹配。简而言之,委托指向任何具有匹配签名的函数,以支持匿名函数调用。
这个 C++ 委托实现功能齐全,允许同步或异步调用任何函数,甚至实例成员函数,带任何参数。委托库使得绑定和调用任何函数变得轻而易举。远程委托的添加将委托范式扩展到包括在独立上下文中执行的函数调用。
Using the Code
我将首先介绍如何使用代码,然后深入探讨实现细节。
核心委托库受任何 C++03 或更高版本编译器的支持。然而,所有远程委托示例都使用了一些 C++11 特性。包含 Visual Studio/Win32 和 Eclipse/POSIX 项目和示例。
委托库由委托和委托容器组成。委托能够绑定到单个可调用函数。多播委托容器在列表中保存一个或多个委托,以便按顺序调用。单播委托容器最多保存一个委托。
新的远程委托类如下所示,其中 X 是目标函数签名中的函数参数数量。
DelegateRemoteSendX<>
DelegateFreeRemoteRecvX<>
DelegateMemberRemoteRecvX<>
DelegateRemoteSendX<>
启动调用在远程系统上执行的远程函数。发送系统创建此对象。
DelegateFreeRemoteRecvX<>
同步调用位于接收远程系统上的本地自由回调函数。
DelegateMemberRemoteRecvX<>
同步调用位于接收远程系统上的本地成员回调函数。
远程委托能够插入到任何现有的委托容器中。
SinglecastDelegateX<>
MulticastDelegateX<>
MulticastDelegateSafeX<>
发送数据示例
发送应用程序展示了如何使用单个 RemoteDataPoint&
参数调用远程函数。以下代码的要点是
MakeDelegate()
创建一个发送远程委托。- 调用发送远程委托。
int main(void)
{
BOOL result = AfxWinInit(GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0);
ASSERT_TRUE(result == TRUE);
result = AfxSocketInit(NULL);
ASSERT_TRUE(result == TRUE);
UdpDelegateSend::GetInstance().Initialize();
// Create a stream to hold send data
stringstream ss(ios::in | ios::out | ios::binary);
// Create a send remote delegate
auto sendDataPointDelegate =
MakeDelegate<RemoteDataPoint&>(UdpDelegateSend::GetInstance(), ss, REMOTE_DATA_POINT_ID);
cout << "Press any key to exit program." << endl;
int x = 1;
int y = 1;
while (!_kbhit())
{
// Send data point to remote system
RemoteDataPoint dataPoint(x++, y++);
sendDataPointDelegate(dataPoint);
}
return 0;
}
MakeDelegate()
是一个重载函数,用于创建委托对象。通常,MakeDelegate()
使用模板参数推导来根据参数创建正确的实例类型。然而,发送委托不绑定到函数;绑定的函数在远程系统上。因此,在创建发送委托时,使用模板函数参数类型调用该函数。
auto sendDataPointDelegate =
MakeDelegate<RemoteDataPoint&>(UdpDelegateSend::GetInstance(), ss, REMOTE_DATA_POINT_ID);
第一个参数是传输对象。第二个参数是传出数据字节流。最后一个参数是发送方和接收方之间共享的 ID。
发送方使用正确的函数参数调用远程委托。RemoteDataPoint
对象被序列化,并向接收方发送消息。
RemoteDataPoint dataPoint(x++, y++);
sendDataPointDelegate(dataPoint);
接收委托示例
接收应用程序展示了如何注册远程委托回调。以下代码的要点
MakeDelegate()
创建一个接收远程委托。- 当发送方调用委托时,会调用
RecvDataPointCb()
。
static void RecvDataPointCb(RemoteDataPoint& data)
{
cout << "RemoteDataPoint: " << data.GetX() << " " << data.GetY() << endl;
}
int main(void)
{
BOOL result = AfxWinInit(GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0);
ASSERT_TRUE(result == TRUE);
result = AfxSocketInit(NULL);
ASSERT_TRUE(result == TRUE);
UdpDelegateRecv::GetInstance().Initialize();
// Create a receive remote delegate
auto recvDataPointDelegate = MakeDelegate(&RecvDataPointCb, REMOTE_DATA_POINT_ID);
cout << "Press any key to exit program." << endl;
while (!_kbhit())
Sleep(10);
return 0;
}
接收方使用与发送方相同的 ID 创建委托。
// Create a receive remote delegate
auto recvDataPointDelegate = MakeDelegate(&RecvDataPointCb, REMOTE_DATA_POINT_ID);
第一个参数是指向回调函数的指针。第二个参数是共享 ID。委托库接收消息,反序列化 ReceiveDataPoint
对象,并调用回调函数。
当然,委托库除了支持多个参数和不同参数类型外,还支持成员函数回调。
SysData 示例
远程委托可以插入到委托容器中,就像任何其他委托类型一样。委托类型总结如下
- Synchronous
- 异步
- 异步阻塞
- Remote
此示例展示了如何使用每种委托类型注册通知。SysData
是一个简单的类,用于存储系统模式并在更改时通知客户端。
/// @brief SysData stores common data accessible by any system thread. This class
/// is thread-safe.
class SysData
{
public:
/// Clients register with MulticastDelegateSafe1 to get callbacks when system mode changes
static MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;
/// Get singleton instance of this class
static SysData& GetInstance();
/// Sets the system mode and notify registered clients via SystemModeChangedDelegate.
/// @param[in] systemMode - the new system mode.
void SetSystemMode(SystemMode::Type systemMode);
private:
SysData();
~SysData();
/// The current system mode data
SystemMode::Type m_systemMode;
/// Lock to make the class thread-safe
LOCK m_lock;
};
订阅者使用 SystemModeChangedDelegate
容器进行注册。函数回调签名是 void (const SystemModeChanged&)
。
static MulticastDelegateSafe1<const SystemModeChanged&> SystemModeChangedDelegate;
SystemModeChanged
对象是回调参数类型。
/// @brief Structure to hold system mode callback data.
class SystemModeChanged
{
public:
SystemModeChanged() :
PreviousSystemMode(SystemMode::STARTING),
CurrentSystemMode(SystemMode::STARTING)
{
}
SystemMode::Type PreviousSystemMode;
SystemMode::Type CurrentSystemMode;
friend std::ostream& operator<< (std::ostream &out, const SystemModeChanged& data);
friend std::istream& operator>> (std::istream &in, const SystemModeChanged& data);
};
当有人调用 SysData::SetSystemMode()
时,新模式将被保存并通知所有注册的订阅者。
void SysData::SetSystemMode(SystemMode::Type systemMode)
{
LockGuard lockGuard(&m_lock);
// Create the callback data
SystemModeChanged callbackData;
callbackData.PreviousSystemMode = m_systemMode;
callbackData.CurrentSystemMode = systemMode;
// Update the system mode
m_systemMode = systemMode;
// Callback all registered subscribers
if (SystemModeChangedDelegate)
SystemModeChangedDelegate(callbackData);
}
TestSysData()
向 SysData::SystemModeChangedDelegate
注册了四个回调。每个回调代表一种不同的通知类型:远程、同步、异步和异步阻塞。
请注意,SysData
只公开了一个通用委托容器。每个匿名客户端决定如何接收通知。另请注意,调用 MakeDelegate()
时使用的参数如何决定所创建的委托类型。
// Callback function to receive notifications
static void SystemModeChangedCb(const SystemModeChanged& data)
{
cout << "SystemModeChangedCb: " << data.CurrentSystemMode << " " <<
data.PreviousSystemMode << endl;
}
void TestSysData()
{
sysDataWorkerThread.CreateThread();
// Binary stream of send data bytes
stringstream ss(ios::in | ios::out | ios::binary);
// Register to send a remote callback
SysData::SystemModeChangedDelegate +=
MakeDelegate<const SystemModeChanged&>(UdpDelegateSend::GetInstance(),
ss, REMOTE_SYSTEM_MODE_CHANGED_ID);
// Create a receive delegate to receive remote callback
auto recvDataPointDelegate =
MakeDelegate(&SystemModeChangedCb, REMOTE_SYSTEM_MODE_CHANGED_ID);
// Register for synchronous callback
SysData::SystemModeChangedDelegate += MakeDelegate(&SystemModeChangedCb);
// Register for asynchronous callback on a worker thread
SysData::SystemModeChangedDelegate +=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread);
// Register for asynchronous blocking callback on a worker thread
SysData::SystemModeChangedDelegate +=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread, 5000);
// Change system mode. All registered subscribers are notified.
SysData::GetInstance().SetSystemMode(SystemMode::STARTING);
SysData::GetInstance().SetSystemMode(SystemMode::NORMAL);
std::this_thread::sleep_for(std::chrono::seconds(1));
// Unregister all callbacks
SysData::SystemModeChangedDelegate -= MakeDelegate(&SystemModeChangedCb);
SysData::SystemModeChangedDelegate -=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread);
SysData::SystemModeChangedDelegate -=
MakeDelegate(&SystemModeChangedCb, &sysDataWorkerThread, 5000);
SysData::SystemModeChangedDelegate -=
MakeDelegate<const SystemModeChanged&>
(UdpDelegateSend::GetInstance(), ss, REMOTE_SYSTEM_MODE_CHANGED_ID);
sysDataWorkerThread.ExitThread();
}
移植细节
远程委托库抽象了对象序列化和通信协议。库使用通用接口来执行这些操作。
序列化
作为远程委托参数发送的每个用户定义数据类型必须
- 实现默认构造函数。
- 重载插入和提取运算符以序列化对象(即
operator<<
和operator>>
)。
以下示例以二进制方式序列化(即插入/提取)一个 RemoteDataPoint&
对象。
struct RemoteDataPoint
{
public:
RemoteDataPoint(int x, int y) : m_x(x), m_y(y) {}
RemoteDataPoint() : m_x(0), m_y(0) {}
int GetX() const { return m_x; }
int GetY() const { return m_y; }
private:
int m_y;
int m_x;
friend std::ostream& operator<< (std::ostream &out, const RemoteDataPoint& data);
friend std::istream& operator>> (std::istream &in, RemoteDataPoint& data);
};
std::ostream& operator<< (std::ostream &out, const RemoteDataPoint& data)
{
out << data.m_x << std::endl;
out << data.m_y << std::endl;
return out;
}
std::istream& operator>> (std::istream &in, RemoteDataPoint& data)
{
in >> data.m_x;
in >> data.m_y;
return in;
}
一个类似的 RemoteDataPointJson
对象使用 RapidJSON 序列化。
std::ostream& operator<< (std::ostream &out, const RemoteDataPointJson& data)
{
StringBuffer sb;
PrettyWriter<StringBuffer> writer(sb);
// Serialize object using JSON
data.Serialize(writer);
// Add JSON length
out << sb.GetLength() + 1;
// Add JSON string
out << sb.GetString();
return out;
}
std::istream& operator>> (std::istream &in, RemoteDataPointJson& data)
{
// Get JSON length
size_t bufLen = 0;
in >> bufLen;
// Allocate storage buffer
char* buf = (char*)malloc(bufLen);
// Copy JSON into buffer
in.rdbuf()->sgetn(buf, bufLen);
// Parse JSON
Document d;
d.Parse(buf);
// Get JSON values into object variables
data.m_x = d["m_x"].GetInt();
data.m_y = d["m_y"].GetInt();
free(buf);
return in;
}
所采用的序列化和反序列化方法由您决定。唯一的要求是使用输入或输出流来保存序列化对象。
示例中使用了 std::stringstream
。但任何派生自 std::iostream
的类都可以使用。
传输
远程委托库使用 IDelegateTransport
类将数据发送到远程系统。
class IDelegateTransport
{
public:
/// Dispatch a stream of bytes to a remote system. The implementer is responsible
/// for sending the bytes over a communication link. Once the receiver obtains the
/// bytes, the DelegateRemoteInvoker::DelegateInvoke() function must be called to
/// execute the callback on the remote system.
/// @param[in] s - an outgoing stream to send to the remote CPU.
virtual void DispatchDelegate(std::iostream& s) = 0;
};
发送方继承自 IDelegateTransport
并实现 DispatchDelegate()
。UDP 实现如下所示
void UdpDelegateSend::DispatchDelegate(std::iostream& s)
{
size_t len = (size_t)s.tellp();
char* sendBuf = (char*)malloc(len);
// Copy char buffer into heap allocated memory
s.rdbuf()->sgetn(sendBuf, len);
// Send data to remote system using a socket
int result = m_sendSocket.Send((void*)sendBuf, len, 0);
ASSERT_TRUE(result == len);
free(sendBuf);
// Reset stream positions
s.seekg(0);
s.seekp(0);
}
同样,Windows 命名管道在进程之间发送远程委托。
void PipeDelegateSend::DispatchDelegate(std::iostream& s)
{
size_t len = (size_t)s.tellp();
char* sendBuf = (char*)malloc(len);
// Copy char buffer into heap allocated memory
s.rdbuf()->sgetn(sendBuf, len);
// Send message through named pipe
DWORD sentLen = 0;
BOOL success = WriteFile(m_hPipe, sendBuf, len, &sentLen, NULL);
ASSERT_TRUE(success && sentLen == len);
free(sendBuf);
// Reset stream positions
s.seekg(0);
s.seekp(0);
}
接收线程从 UDP 套接字获取数据。唯一的要求是使用传入数据流调用 DelegateRemoteInvoker::Invoke()
。
unsigned long UdpDelegateRecv::Process(void* parameter)
{
MSG msg;
const int BUF_SIZE = 1024;
char recvBuf[BUF_SIZE];
SOCKADDR_IN addr;
int addrLen = sizeof(addr);
BOOL success = AfxSocketInit(NULL);
ASSERT_TRUE(success == TRUE);
success = m_recvSocket.Create(514, SOCK_DGRAM, NULL);
ASSERT_TRUE(success);
m_started = true;
for (;;)
{
// Check for thread exit message
if (PeekMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END, PM_REMOVE) != 0)
{
switch (msg.message)
{
case WM_EXIT_THREAD:
m_recvSocket.Close();
return 0;
}
}
// Check for socket receive message
int recvMsgSize = m_recvSocket.Receive(recvBuf, BUF_SIZE, 0);
if (recvMsgSize > 0)
{
// Copy receive data bytes into a stringstream
stringstream ss(ios::in | ios::out | ios::binary);
ss.write(recvBuf, recvMsgSize);
// Invoke the remote delegate callback function
DelegateRemoteInvoker::Invoke(ss);
}
else
{
Sleep(100);
}
}
return 0;
}
命名管道的实现类似。请注意,调用了相同的 DelegateRemoteInvoker::Invoke()
函数,只是在这种情况下,数据字节是从命名管道而不是 UDP 套接字获取的。
unsigned long PipeDelegateRecv::Process(void* parameter)
{
MSG msg;
BOOL connected = FALSE;
char recvBuf[BUF_SIZE];
for (;;)
{
// Check for thread exit message
if (PeekMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END, PM_REMOVE) != 0)
{
switch (msg.message)
{
case WM_EXIT_THREAD:
CloseHandle(m_hPipe);
return 0;
}
}
if (!connected)
{
// Check if client connected
connected = ConnectNamedPipe(m_hPipe, NULL) ?
TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
}
else
{
DWORD recvMsgSize = 0;
BOOL success = ReadFile(m_hPipe, recvBuf, BUF_SIZE, &recvMsgSize, NULL);
if (success && recvMsgSize > 0)
{
// Copy receive data bytes into a stringstream
stringstream ss(ios::in | ios::out | ios::binary);
ss.write(recvBuf, recvMsgSize);
// Invoke the remote delegate callback function
DelegateRemoteInvoker::Invoke(ss);
}
else
{
Sleep(100);
}
}
}
return 0;
}
附加源代码中提供了 POSIX UdpDelegateSend
和 UdpDelegateRecv
示例。
DelegateRemoteInvoker()
库函数只是通过 ID 查找匹配的接收委托实例并调用 DelegateInvoke()
。
bool DelegateRemoteInvoker::Invoke(std::istream& s)
{
// Get id from stream
DelegateIdType id;
s >> id;
s.seekg(0);
// Find invoker instance matching the id
std::map<DelegateIdType, DelegateRemoteInvoker*>::iterator it;
{
LockGuard lockGuard(GetLock());
it = GetMap().find(id);
}
if (it != GetMap().end())
{
// Invoke the delegate instance
(*it).second->DelegateInvoke(s);
return true;
}
else
{
// No delegate found
return false;
}
}
传输机制完全由用户定义。使用少量平台特定代码即可支持任何通信介质。
远程委托详情
DelegateRemoteSend1<>
实现了一个单参数发送远程委托。
/// @brief Send a delegate to invoke a function on a remote system.
template <class Param1>
class DelegateRemoteSend1 : public Delegate1<Param1> {
public:
DelegateRemoteSend1(IDelegateTransport& transport, std::iostream& stream, DelegateIdType id) :
m_transport(transport), m_stream(stream), m_id(id) { }
virtual DelegateRemoteSend1* Clone() const { return new DelegateRemoteSend1(*this); }
/// Invoke the bound delegate function.
virtual void operator()(Param1 p1) {
m_stream << m_id << std::ends;
m_stream << p1 << std::ends;
m_transport.DispatchDelegate(m_stream);
}
virtual bool operator==(const DelegateBase& rhs) const {
const DelegateRemoteSend1<Param1>* derivedRhs = dynamic_cast<const DelegateRemoteSend1<Param1>*>(&rhs);
return derivedRhs &&
m_id == derivedRhs->m_id &&
&m_transport == &derivedRhs->m_transport; }
private:
IDelegateTransport& m_transport; // Object sends data to remote
std::iostream& m_stream; // Storage for remote message
DelegateIdType m_id; // Remote delegate identifier
};
构造函数需要一个 IDelegateTransport&
、一个 std::iostream&
和一个 DelegateIdType
。
operator()
将 m_id
插入到流中,然后对每个参数调用 operator<<
进行序列化。之后,调用 DispatchDelegate()
将委托发送到远程系统。
DelegateFreeRemoteRecv1<>
实现了一个单参数接收远程委托。
template <class Param1>
class DelegateFreeRemoteRecv1 : public DelegateFree1<Param1>, public DelegateRemoteInvoker {
public:
typedef void(*FreeFunc)(Param1);
// Contructors take a free function and delegete id
DelegateFreeRemoteRecv1(FreeFunc func, DelegateIdType id) : DelegateRemoteInvoker(id) { Bind(func, id); }
/// Bind a free function to the delegate.
void Bind(FreeFunc func, DelegateIdType id) {
m_id = id;
DelegateFree1<Param1>::Bind(func);
}
virtual DelegateFreeRemoteRecv1* Clone() const { return new DelegateFreeRemoteRecv1(*this); }
/// Called by the remote system to invoke the delegate function
virtual void DelegateInvoke(std::istream& stream) {
RemoteParam<Param1> param1;
Param1 p1 = param1.Get();
stream >> m_id;
stream.seekg(stream.tellg() + std::streampos(1));
stream >> p1;
stream.seekg(stream.tellg() + std::streampos(1));
DelegateFree1<Param1>::operator()(p1);
}
virtual bool operator==(const DelegateBase& rhs) const {
const DelegateFreeRemoteRecv1<Param1>* derivedRhs = dynamic_cast<const DelegateFreeRemoteRecv1<Param1>*>(&rhs);
return derivedRhs &&
m_id == derivedRhs->m_id &&
DelegateFree1<Param1>::operator == (rhs);
}
private:
DelegateIdType m_id; // Remote delegate identifier
};
构造函数接受一个 FreeFunc
函数指针和一个 DelegateIdType
。
DelegateRemoteInvoker::Invoke()
使用传入数据流调用 DelegateInvoke()
。每个函数参数都使用 operator<<
反序列化,并调用 FreeFunc
目标函数。
成员函数委托变体实现类似。
异步委托详情
异步委托也是委托库的一部分。这些特性的移植细节在C++ 中的异步多播委托一文中有所介绍。
源代码
附加的源代码包含整个委托库以及 Visual Studio 和 Eclipse 的大量示例。
Visual Studio
Visual Studio/Win32 示例有三个独立的项目
- RemoteDelegate – 许多示例,其中远程发送方和接收方在同一个应用程序中执行。
- RemoteDelegeteSend – 一个 UDP 远程委托发送控制台应用程序。
- RemoteDelegateRecv – 一个 UDP 接收控制台应用程序,它从
RemoteDelegateSend
接收数据。
要运行 JSON 示例,您需要
- 将
RapidJSON
库克隆到 RemoteDelegate 源目录中的 rapidjson 目录中。 - 在 Visual Studio 的 C/C++ > 预处理器 > 预处理器定义中定义 RAPID_JSON。
Eclipse
Eclipse/POSIX 示例使用 文件 > 导入... > 将现有项目导入工作区导入到工作区中。
要运行 JSON 示例,您需要
- 将
RapidJSON
库克隆到 RemoteDelegate 源目录中的 rapidjson 目录中。 - 在 属性 > C/C++ 通用 > 路径和符号 > 符号选项卡中定义 RAPID_JSON。
结论
我已经在几个不同的项目中使用 C++ 委托库了。函数和数据在线程之间轻松移动的方式真正改变了我创建多线程应用程序的方式。
远程委托增强功能通过允许远程通知来扩展库。该库通过使用简单的委托机制在远程系统之间传递数据,从而简化了应用程序开发。
参考文献
- C++ 中的异步多播委托 - 作者:David Lafreniere
历史
- 2020年3月15日
- 初始发布。
- 2020年3月18日
- 添加了 Eclipse/POSIX 示例。
- 2020年3月19日
- 修复了内置数据类型处理。