packetcache:探索开发 memcache 类网络缓存





5.00/5 (2投票s)
本文探讨了 UDP 编程、LRU 缓存开发和 .NET 数据包处理。
引言
在本文中,我将向您展示如何进行 UDP 网络编程、如何编写最近最少使用 (LRU) 缓存,以及如何在 .NET 中进行数据包处理。这些编程技能将应用于编写一个类似 memcache
的网络内存缓存。它以 UDP 为中心,使用 LRU 缓存,并且是单线程的。然后,我将展示如何从 .NET 客户端访问此缓存。
UDP 编程
用户数据报协议 (UDP) 编程是一种在网络上的计算机之间发送少量信息的方式。与传输控制协议 (TCP) 不同,UDP 不保证数据能够送达,甚至不保证数据按发送顺序接收,也不保证数据在传输过程中没有损坏。
在 UDP 编程中,存在服务器和客户端。服务器绑定到一个端口,监听客户端发送的数据报(网络数据包),并(通常)以某种数据报响应回复客户端。客户端通过地址和服务器的端口连接到服务器,向服务器发送请求,并接收来自服务器的响应。与 TCP 的全双工持久流不同,UDP 更像是玩扔接棒球。
最流行的 UDP 编程系统是域名服务 (DNS),它负责将 michaelballoni.com
等域名解析为网络地址。在 DNS 中,客户端将域名发送给服务器,服务器使用基本的数据报将网络地址发送回客户端,而无需像 TCP 那样建立持久连接。这使得协议轻量级,没有持久连接的处理或资源开销。
除了不可靠之外,UDP 的另一个主要限制是,通过互联网发送或从 UDP 服务器接收的数据量大约只有 500 字节。如果您愿意在这些限制下工作,如果您基本上愿意来回发送小数据块,并且愿意容忍或补偿不可靠的网络,那么 UDP 就适合您。
UDP 服务器实现
在 UDP 编程中,服务器代码通常会创建一个套接字,绑定到一个端口,然后进入接收请求 / 处理请求 / 发送响应循环。
创建 UDP 套接字
my_socket = ::socket(AF_INET, SOCK_DGRAM, 0)
如果 my_socket
返回 < 0
,您将需要进行一些错误处理。
将 UDP 套接字绑定到端口
sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = INADDR_ANY; // listen on any address
servaddr.sin_port = ::htons(my_port); // listen on this 16-bit port number
result = ::bind(my_socket, (const sockaddr*)&servaddr, sizeof(servaddr))
如果结果返回非零值,您将需要进行一些错误处理,并清理上面的 my_socket
。
接收请求数据报并发送响应
这是一个基本的请求/响应 UDP 服务器循环。
// https://stackoverflow.com/questions/1098897/
// what-is-the-largest-safe-udp-packet-size-on-the-internet
const size_t max_packet_size = 508;
while (true)
{
// clientaddr is the data structure that holds the client's address
// so we know where to send the response back to
sockaddr_in clientaddr;
int addr_len = sizeof(clientaddr);
// read the request data and client address using the recvfrom function
uint8_t input_buffer[max_packet_size];
int read_amount =
::recvfrom(my_socket, (char*)input_buffer,
(int)max_packet_size, 0, (sockaddr*)&clientaddr, &addr_len);
if (read_amount <= 0)
// error handling
// process the input_buffer and read_amount yielding data in
// output_buffer and output_len
uint8_t output_buffer[max_packet_size];
size_t output_len = 0;
// ...
// send the response back to the client using the sendto function
if (::sendto(my_socket, (const char*)output_buffer,
(int)output_len, 0, (sockaddr*)&clientaddr, addr_len) < 0)
// error handling
}
UDP 客户端编程
UDP 客户端编程与 UDP 服务器编程非常相似,发送请求并接收响应,而且更简单一些。我这里使用的风格是模仿 TCP 的 connect / send / recv 函数,因为这提供了持久连接的有用假象。
创建 UDP 套接字,与服务器一样
my_socket = ::socket(AF_INET, SOCK_DGRAM, 0)
如果 my_socket 返回 < 0,您将需要进行一些错误处理。
连接到 UDP 服务器
sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = ::htons(port); // the server's 16-bit port number
servaddr.sin_addr.s_addr = ::inet_addr(server); // inet_addr works with dotted
// IPv4 addresses
// use getaddrinfo to support more
result = ::connect(my_socket, (sockaddr*)&servaddr, sizeof(servaddr))
如果 result != 0
,您将需要处理错误并关闭 my_socket
。
与 UDP 服务器交互
// populate request_buffer / request_len with your request data
uint8_t request_buffer[max_packet_size];
size_t request_len;
// ...
// send the request
result = ::send(my_socket, (const char*)request_buffer, (int)request_len, 0);
// if result < 0, do error handling, clean up my_socket
// read the response
uint8_t response_buffer[max_packet_size];
int response_len = ::recv(my_socket, (char*)response_buffer, (int)max_packet_size, 0);
// if response_len <= 0, do error handling, clean up my_socket
// process the response_buffer / response_len response
这就是 UDP 客户端和服务器编程的内容。嗯……您应该真的为那个 recv
调用设置一个超时。照目前这样写,如果由于服务器错误或网络错误等任何原因没有将响应传递给客户端,客户端应用程序将会挂起。哦,您还想验证响应,以防它在网络上传输过程中损坏。是的,数据包丢失、数据包损坏,没错,就这些。
网络缓存实现
附带的源代码包含一个“packetcache
”解决方案。该解决方案的任务是双重的:首先,开发一个类似 memcache
的缓存服务器;其次,开发缓存客户端以证明 packetcache
和 memcache
之间的性能差异。
为此,我们提供了一个 C# 类库和两个 C# 应用程序:一个用于快速测试的 POC packetapp 客户端,以及一个性能基准测试 packetstress
应用程序;还有一个 C++ 代码,用于一个 static
库,包含 C++ UDP 服务器的所有核心功能,以及一个仅用于开发的 C++ POC UDP 客户端。
您可以深入研究所有这些 C# 和 C++ 代码,并得出关于 packetcache
与 memcache
性能的结论。在这里,我想概述一下 C++ 中的 LRU 缓存可能是什么样子。
LRU 缓存概述
缓存将键映射到值。键的值根据需要添加到缓存中,然后由缓存的其他用户“免费”访问。最近最少使用 (LRU) 缓存会维护最近使用最多的缓存条目,并在缓存占用过多内存时,您猜对了,会修剪掉最近最少使用的缓存条目。
我创建的 LRU 缓存由两个数据结构组成:首先,一个按最近使用到最少使用排序的缓存条目列表;其次,一个将键映射到缓存条目的哈希表。窍门在于让缓存条目对象链接在一起形成列表,而哈希表包含指向缓存条目的指针。
LRU 缓存类
缓存类包含链表(cache_list
)和哈希表(std::unordered_map
)。cache_list
具有指向 cache_entry
类型对象的头指针和尾指针。哈希表将 std::vector<uint8_t>
映射到 cache_entry*
。将 std::vector<uint8_t>
用作哈希表键与使用 std::string
作为键没有什么不同。我不想处理字符编码,甚至不假设客户端想要按字符串进行缓存。键可以是任何长度的二进制值,同时考虑到 UDP 数据包的大小。
这是允许 std::vector<uint8_t>
作为哈希表键的技巧
#include "MurmurHash2.h" // https://github.com/aappleby/smhasher
struct byte_array_hasher
{
std::size_t operator()(const std::vector<uint8_t>& v) const noexcept
{
return MurmurHash2(v.data(), int(v.size()), 0);
}
};
然后,您可以在缓存类中这样声明哈希表
std::unordered_map<std::vector<uint8_t>, cache_entry*, byte_array_hasher> m_cache
缓存条目类
每个缓存条目都是 cache_entry
类的对象。
time_t expiration;
time_t last_access;
cache_entry* prev;
cache_entry* next;
std::vector<uint8_t> data;
缓存条目用于存储数据值和最后访问时间戳,并且还是一个带有前向和后向指针的链表节点。
缓存条目构造函数会将其提升到列表的头部
cache_entry(time_t _expiration, const std::vector<uint8_t>& _data, cache_entry*& head)
: expiration(_expiration)
, last_access(::time(nullptr))
, prev(nullptr)
, next(nullptr)
, data(_data)
{
if (head != nullptr)
{
head->prev = this;
next = head;
prev = nullptr;
}
head = this;
}
缓存列表类
cache_list
类管理 cache_entry
的列表。它只有头指针、尾指针和大小成员变量,但它通过这个 API 来管理整个流程:
cache_entry* cache_list::add(time_t expiration, const std::vector<uint8_t>& data)
创建一个新的缓存条目,将其设为列表的新头部,并且可能是尾部。
cache_entry* new_entry =
new cache_entry(expiration, data, m_head); // new_entry becomes head
if (m_tail == nullptr)
m_tail = new_entry;
m_size += new_entry->size();
return new_entry;
const std::vector<uint8_t>& cache_list::get(cache_entry* entry)
当命中一个缓存条目时,将其移动到列表的前面并返回其数据。
entry->last_access = ::time(nullptr);
//
// move the entry to the front of the list
//
// bail if already at the head
if (m_head == entry)
return entry->data;
// adjust neighbor pointers
auto prev_neighbor = entry->prev;
auto next_neighbor = entry->next;
if (prev_neighbor != nullptr)
prev_neighbor->next = next_neighbor;
if (next_neighbor != nullptr)
next_neighbor->prev = prev_neighbor;
// make the new entry head of the list
if (m_head != nullptr)
{
m_head->prev = entry;
entry->next = m_head;
entry->prev = nullptr;
}
m_head = entry;
// set the tail
if (next_neighbor != nullptr && next_neighbor->next == nullptr)
m_tail = next_neighbor;
else if (prev_neighbor != nullptr && prev_neighbor->next == nullptr)
m_tail = prev_neighbor;
return entry->data;
cache_list::set(cache_entry* entry, time_t expiration, const std::vector<uint8_t>& data)
当缓存条目获得新值时,更新该条目并更新缓存的大小。
注意:我在这里没有将条目移动到列表的前面,因为它尚未被最近使用。
entry->last_access = ::time(nullptr);
size_t old_size = entry->data.size();
entry->expiration = expiration;
entry->data = data;
// adjust the new size carefully
int64_t size_change = int64_t(data.size()) - old_size;
if (-size_change > int64_t(m_size))
m_size = 0;
else
m_size += size_change;
cache_list::remove(cache_entry* old_entry)
从缓存中删除一个条目。
// update head and tail
if (m_head == old_entry)
m_head = old_entry->next;
if (m_tail == old_entry)
m_tail = old_entry->prev;
// update neighbors
if (old_entry->prev != nullptr)
old_entry->prev->next = old_entry->next;
if (old_entry->next != nullptr)
old_entry->next->prev = old_entry->prev;
// adjust our size
if (old_entry->size() > m_size)
m_size = 0;
else
m_size -= old_entry->size();
// clean it up
delete old_entry;
cache_list::trim(size_t target_size)
删除条目,直到缓存大小低于目标大小。
while (m_size > target_size && m_tail != nullptr)
remove(m_tail);
另一边… C# 数据包处理
在 C# 方面,我们希望实现一个类似 memcache
的接口,如下所示:
public interface ICache
{
Task<string?> GetAsync(string key);
Task<bool> SetAsync(string key, string value, int cacheSeconds);
Task<bool> DelAsync(string key);
}
为此,我们编写了一个用于处理请求和响应数据的核心类 Packet
。Packet
具有以下成员变量:
public CacheOp Op;
public long TtlSeconds;
public MemoryStream Key = new MemoryStream(MaxPacketSize);
public MemoryStream Value = new MemoryStream(MaxPacketSize);
Packet
类的核心目的是将 Packet
成员打包到缓冲区中,并将 Packet
成员从缓冲区解析出来。良好的 .NET 性能的关键之一是避免内存分配,因此处理网络数据的 C# 客户端代码会维护一套 Packet
对象和缓冲区,以应对对象的整个生命周期。
这是 Pack
函数,用于将 Packet
对象打包到输出缓冲区中,包括写入的字节数。
public static void Pack(Packet p, byte[] output, out int idx)
{
// Make sure it will all fit
if
(
1 + // op
4 + // ttl
2 + p.Key.Length +
2 + p.Value.Length +
4 // crc
> MaxPacketSize
)
{
throw new PacketException("Too much data");
}
// Add the op
idx = 0;
output[idx++] = (byte)p.Op;
// Add the TTL
var ttl_bytes =
BitConverter.GetBytes(IPAddress.HostToNetworkOrder((int)p.TtlSeconds));
output[idx++] = ttl_bytes[0];
output[idx++] = ttl_bytes[1];
output[idx++] = ttl_bytes[2];
output[idx++] = ttl_bytes[3];
// Add the key len and data
short key_len = (short)p.Key.Length;
var key_len_bytes = BitConverter.GetBytes(IPAddress.HostToNetworkOrder(key_len));
output[idx++] = key_len_bytes[0];
output[idx++] = key_len_bytes[1];
Buffer.BlockCopy(p.Key.GetBuffer(), 0, output, idx, key_len);
idx += key_len;
// Add the value len and data
short value_len = (short)p.Value.Length;
var value_len_bytes =
BitConverter.GetBytes(IPAddress.HostToNetworkOrder(value_len));
output[idx++] = value_len_bytes[0];
output[idx++] = value_len_bytes[1];
Buffer.BlockCopy(p.Value.GetBuffer(), 0, output, idx, value_len);
idx += value_len;
// CRC what we've got so far
var crc_bytes = Utils.Crc32(output, idx);
output[idx++] = crc_bytes[3];
output[idx++] = crc_bytes[2];
output[idx++] = crc_bytes[1];
output[idx++] = crc_bytes[0];
}
Parse
函数接收输入缓冲区并填充 Packet
的成员。
public static void Parse(byte[] input, Packet p)
{
// Reset the output
p.Reset();
// Validate data length
if (input.Length < MinPacketSize)
throw new PacketException("Not enough data");
else if (input.Length > MaxPacketSize)
throw new PacketException("Too much data");
short input_len = (short)input.Length;
// Validate the CRC
byte[] crc_computed_bytes = Utils.Crc32(input, input_len - 4);
if
(
crc_computed_bytes[0] != input[input_len - 1]
||
crc_computed_bytes[1] != input[input_len - 2]
||
crc_computed_bytes[2] != input[input_len - 3]
||
crc_computed_bytes[3] != input[input_len - 4]
)
{
throw new PacketException("Checksum mismath");
}
// Extract op
int idx = 0;
p.Op = (CacheOp)input[idx++];
// Extract ttl
p.TtlSeconds = IPAddress.HostToNetworkOrder(BitConverter.ToInt32(input, idx));
idx += 4;
// Extract key
short key_len = IPAddress.HostToNetworkOrder(BitConverter.ToInt16(input, idx));
idx += 2;
if (key_len > 0)
{
if (idx + key_len >= input_len)
throw new PacketException("Invalid key length");
p.Key.Write(input, idx, key_len);
p.Key.Seek(0, SeekOrigin.Begin);
idx += key_len;
}
// Extract value
short value_len = IPAddress.HostToNetworkOrder(BitConverter.ToInt16(input, idx));
idx += 2;
if (value_len > 0)
{
if (idx + value_len >= input_len)
throw new PacketException("Invalid value length");
p.Value.Write(input, idx, value_len);
p.Value.Seek(0, SeekOrigin.Begin);
idx += value_len;
}
// Validate input size
if (idx != input_len - 4)
throw new PacketException("Invalid packet format");
}
在 Parse
和 Pack
函数承担繁重工作的情况下,数据包处理代码的核心“执行缓存操作”函数只是:
// Take the data like the key and value from the request packet and
// populate the request buffer
int request_len;
Packet.Pack(m_requestPacket, m_requestBuffer, out request_len);
// Send the request
// m_client is .NET's UdpClient object, the raw network "connection" with the server
if (await m_client.SendAsync(m_requestBuffer, request_len) != request_len)
throw new PacketException("Not all data sent");
// Recv the response
byte[] response = (await m_client.ReceiveAsync()).Buffer;
// Parse the buffer into a response packet with the result of the operation
Packet.Parse(response, m_responsePacket);
苹果和橘子
我很有兴趣将我的小型 packetcache
与久经考验的 memcache
进行比较。我是一个 Windows 用户,所以我寻找了一个适用于 Windows 的 memcache
端口/安装程序。我找到了 这个网站,并在我的初步测试中使用了它。
StackOverflow 在 这个链接 上有其他建议。
在我所有的本地 PC 测试中,packetcache
的每秒操作数约为 60K,而 memcache
为 40K。快了 50%,这很不错。但是,当我模拟更多具有更多客户端线程的客户端时,memcache
服务器似乎不稳定,我想进行更公平的比较,所以我去了 AWS,启动了一个 ElastiCache memcache
节点和一个基本的 Windows EC2 实例,价格相同,将 packetcache
加载到 EC2 上,并从同一区域的通用 Windows EC2 实例访问两者。观察到的性能几乎相同,均为每秒 20K 次操作。我可以接受这个结果:packetcache
在一个性能较弱的 Windows EC2 实例上,其性能至少与非 Windows 且专用的硬件上的 memcache
相当。仍然存在苹果/橘子对比问题,例如 memcache
可以配置为使用 UDP。我的观点是,这种方法并不比行业冠军差很多。
结论和关注点
我已经向您展示了 UDP 编程、LRU 缓存实现以及 .NET 数据包打包和解析。我邀请您获取代码并在 Linux 上使用专用硬件进行构建,并得出您自己的结论。
历史
- 2022 年 11 月 28 日:初始版本