使用 IPC 进行数据日志记录(Windows)






4.50/5 (9投票s)
这是另一个 MMF IPC 数据传输类,但具有日志记录工具的特殊性:它有一个操作进程,该进程仅收集所有记录的数据,以及一个或多个被动进程,用于生成和写入日志信息。
当我们想了解一段代码的工作方式时,我们大多数人都会使用日志记录。 大多数人使用基于文件的日志记录解决方案。 但是,当您开发产品时,将日志记录到文件感觉像是一个两步操作
- 记录发生了什么
- 在日志中查看发生了什么
我从来不喜欢这样。 所以我实现了这个类,用于记录到第二个应用程序,该应用程序交互式地显示正在发生的事情。 这是另一个 MMF IPC 数据传输类,但具有日志记录工具的特殊性:它有一个操作进程,该进程仅收集所有记录的数据,以及一个或多个被动进程,用于生成和写入日志信息。
Application logging data 1 ---\
Application logging data 2 --------> MMF ---> Logging collector
Application logging data 3 ---/
我认为这种逻辑比您用于应用程序日志记录的大多数其他选项都快。 同时,它允许您合并来自同一计算机上运行的多个应用程序的日志数据。
这里最重要的是设置一套精心安排的事件和互斥锁,以了解何时可以读取以及何时可以写入,并且永远、永远不要覆盖尚未读取的内容。
我们仅使用两个事件来记录所有内容:一个 read 事件,标记何时发生了读取操作,以及一个 avail 事件,标记何时发生了写入操作。 我们还使用内存映射文件头结构来存储共享值,例如我们接下来要读取的位置,或者我们接下来要写入的位置等等。 最后,因为我们不能同时有多个线程写入数据,所以我们使用互斥锁来阻止任何其他线程进入写入方法,同时正在进行写入。
如果我们是读取应用程序,我们将运行一个线程,该线程将始终等待新数据。 这非常重要,因为到达 MMF 的任何数据,我们都必须尽可能快地处理它。 我们在这里使用两种数据,但它可以扩展到更多种类的数据。 第一个是 APPLICATION_LOGGING
,它是日志本身,另一个我们称之为 APPLICATION_EOF
。 我们有一块内存(内存映射文件),并且我们必须在有足够的空间这样做的情况下写入它。 我们不能将消息分成碎片。 如果我们要写入一条新的日志消息,但没有足够的可用空间,我们必须写入一个 EOF,然后回到内存区域的开头来写入我们的消息。 如果读取线程没有在共享内存的开头释放足够的空间,我们必须等待下一个读取事件。 一旦读取器读取,我们再次检查是否有足够的空间,如果有足够的空间,我们就继续写入新消息,否则我们等待下一个读取事件。
Let's say we use R as the next reading place, W as the next writing place,
X as data written and not read so far and Z as EOF.
We start with all memory available so we just write
|RXXXXXXXXXXW------------------------------------------------------------------------------|
We continue writing until we have no more free space available
|-----------RXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXW-------|
We can write an EOF (Z) and continue in the beginning
|XXXXXXXXW-----------------RXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXZ-------|
这种逻辑看起来足够好,但有一个问题。 一开始,我们拥有所有的共享内存,下一个读取位置与下一个写入位置相同。 如果写入器在读取器甚至开始读取之前就填满了所有内存怎么办? 最后的写入操作将写入 EOF 并转到共享内存的开头。 同样,我们将使下一个读取位置与下一个写入位置相同。 我们应该写吗? 当然不。 但是我们如何知道何时是这种情况? 我使用写入器和读取器的“循环掩码”来解决这个问题。 每次我写入 EOF 时,我都会切换循环掩码,因此如果下一个写入指针与下一个读取指针相同,并且两者具有相同的“循环掩码”,则内存是空闲的,我们可以写入。 如果下一个写入指针与下一个读取指针相同,并且它们具有不同的“循环掩码”,则表示内存尚未被读取,我们必须等待下一个读取事件。 您可以在日志数据函数中看到此逻辑
BOOLEAN data_logging::log_data(PLOG_PACKAGE_DATA data)
{
if (!m_running) return false;
UINT16 transfer_size = data->data_size + data->header_size;
if ((NULL == m_mapFile) || (NULL == m_memory)) {
error_handler(L"log_data > memory mapped file and/or memory pointer is NULL!");
goto log_error;
}
if (m_data_header->first_element_offset + transfer_size /* this packet */ +
sizeof(MMFILE_HEADER) /* the needed EOF packet */ > MMFILE_SIZE)
{
// the package is too big and can't be handled here
error_handler(L"Package too big. Can't handle packages greater than %u bytes.",
MMFILE_SIZE - sizeof(MMFILE_HEADER) - m_data_header->first_element_offset);
goto log_error;
}
if (WaitForSingleObject(m_log_writers_lock, INFINITE) != WAIT_OBJECT_0)
{
// the package is too big and can't be handled here
error_handler(L"Unable to obtain writer lock - aborting");
goto log_error;
}
UINT32 writeOffset = m_data_header->next_writing_pointer;
BOOLEAN needToWriteEOFPacket = writeOffset + transfer_size /* this packet */ +
sizeof(MMFILE_HEADER) /* the needed EOF packet */ > MMFILE_SIZE;
// in case the reader is in the previous round, we have to wait for it to catch up us
while (((m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK) != 0) !=
((m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0)) {
int waitResult = WaitForSingleObject(m_log_read_ev, MAX_WAIT_FOR_WRITE);
if ((waitResult != WAIT_OBJECT_0) && (((m_data_header->flags[LOG_FLAG_TEXT] &
WRITE_MMF_ROUND_MASK) != 0) != ((m_data_header->flags[LOG_FLAG_TEXT] &
READ_MMF_ROUND_MASK) != 0)))
{
error_handler(L"log_data > Write wait operation failed - waitResult = %u",
waitResult);
ReleaseMutex(m_log_writers_lock);
goto log_error;
}
}
// manage the EOF write as a full stand-alone package so in case we need to write more
// than MMFILE_SIZE / 2 we can wait for the reader to read up to the EOFPacket and then
// we will have the whole MMF for our huge package
if (needToWriteEOFPacket)
{
writeEOF();
m_data_header->next_writing_pointer = writeOffset =
m_data_header->first_element_offset;
char logTextFlags = m_data_header->flags[LOG_FLAG_TEXT];
while (logTextFlags != _InterlockedCompareExchange8
((char*)&m_data_header->flags[LOG_FLAG_TEXT],
logTextFlags ^ WRITE_MMF_ROUND_MASK, logTextFlags))
{
logTextFlags = m_data_header->flags[LOG_FLAG_TEXT];
}
if (!SetEvent(m_log_avail_ev))
{
// unexpected error!
error_handler(L"log_data > SetEvent(m_log_avail_ev) failed!");
}
}
while (wait_for_read_before_write(writeOffset, transfer_size)) {
int waitResult = WaitForSingleObject(m_log_read_ev, MAX_WAIT_FOR_WRITE);
if ((waitResult != WAIT_OBJECT_0) && wait_for_read_before_write
(writeOffset, transfer_size))
{
error_handler(L"log_data > Write wait operation failed - waitResult = %u",
waitResult);
ReleaseMutex(m_log_writers_lock);
goto log_error;
}
}
writeOffset += (UINT32)write(writeOffset, data->header, data->header_size);
writeOffset += (UINT32)write(writeOffset, data->data, data->data_size);
m_data_header->next_writing_pointer = writeOffset;
ReleaseMutex(m_log_writers_lock);
if (!SetEvent(m_log_avail_ev))
{
// unexpected error!
error_handler(L"log_data > SetEvent(m_log_avail_ev) (2) failed!");
}
return true;
log_error:
m_running = false;
return false;
}
函数 wait_for_read_before_write
检查我们是否有足够的可用空间来存储我们的数据,或者我们是否必须等待,同时考虑循环掩码。
BOOLEAN data_logging::wait_for_read_before_write(UINT32 write_offset, UINT16 data_size)
{
// The logic here is that it might happen that we are about to write back from where the other
// thread is reading, so we have to make sure that we don't override the data that has not being
// read yet. That explains the unequal sign on both sides. But what if we wrote all the data
// until the position where the reading pointer is (in it's previous round)?
// Using the current check we will miss the possibility that we are in different rounds and we
// are about to write over the unread data. In order to avoid that case, we include the
// next_reading_pointer as a position to avoid as well.
// >>>>>>>>>>>>>>>>W------------------------R>>>>>>>>>>>>>>>>>>>//
BOOLEAN differentRound = ((m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK)
!= 0) != ((m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0);
BOOLEAN res = ((write_offset < m_data_header->next_reading_pointer) &&
(write_offset + data_size >= m_data_header->next_reading_pointer)) ||
(differentRound && (write_offset == m_data_header->next_reading_pointer));
return res;
}
最后,读取线程执行 waitForLog
函数
DWORD WINAPI data_logging::waitForLog(LPVOID data)
{
data_logging * self = (data_logging *)data;
HANDLE waitOn[2];
waitOn[0] = self->m_log_avail_ev;
waitOn[1] = self->m_terminate;
while (true)
{
int wait_result = WaitForMultipleObjects(2, waitOn, FALSE, INFINITE);
if (wait_result == WAIT_OBJECT_0 + 1) // terminate has been triggered
{
self->m_running = false;
break;
}
if (wait_result != WAIT_OBJECT_0) // unexpected result! terminate
// as well - but report error!
{
self->error_handler(L"waitForLog > WaitForMultipleObjects failed -
waitResult = %u\n", wait_result);
goto error_case;
break;
}
// we have data in the MMF, so look for it. Have to finish when both pointers
// have the same value and both operations are in the same round.
while ((self->m_data_header->next_reading_pointer !=
self->m_data_header->next_writing_pointer) ||
(((self->m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK) != 0) !=
((self->m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0)))
{
switch (((PLOG_HEADER)((char*)self->m_memory + self->m_data_header->
next_reading_pointer))->application)
{
case APPLICATION_EOF:
{
self->m_data_header->next_reading_pointer = self->m_data_header->
first_element_offset;
char logTextFlag = self->m_data_header->flags[LOG_FLAG_TEXT];
while (logTextFlag != _InterlockedCompareExchange8((char *)
&self->m_data_header->flags[LOG_FLAG_TEXT],
logTextFlag ^ READ_MMF_ROUND_MASK, logTextFlag))
{
logTextFlag = self->m_data_header->flags[LOG_FLAG_TEXT];
}
break;
}
case APPLICATION_LOGGING:
{
PLOG_MESSAGES msg_header = (PLOG_MESSAGES)((char*)self->m_memory +
self->m_data_header->next_reading_pointer);
int message_length = (msg_header->header.size -
sizeof(LOG_MESSAGES))/2 - 1;
wchar_t * message = (wchar_t*)((char*)msg_header +
sizeof(LOG_MESSAGES));
if (self->m_kind_callbacks.find(APPLICATION_LOGGING) !=
self->m_kind_callbacks.end())
((logAvailableCallbackFunction)self->
m_kind_callbacks[APPLICATION_LOGGING])
(msg_header, message, message_length,
self->m_kind_callbacks_parameter[APPLICATION_LOGGING]);
self->m_data_header->next_reading_pointer += msg_header->header.size;
break;
}
default:
self->error_handler(L"waitForLog > Unexpected application code: %u\n",
((PLOG_HEADER)((char*)self->m_memory + self->m_data_header->
next_reading_pointer))->application);
goto error_case;
break;
}
if (!SetEvent(self->m_log_read_ev))
{
// unexpected error! log
self->error_handler(L"waitForLog > Unexpected error,
SetEvent of m_log_read_ev failed\n");
}
}
}
return 0;
error_case:
self->m_running = false;
return 0;
}
我每天都使用它,并且它运行良好。 我确信它可以稍微改进一下,但是现在的状态对于我想要达到的目的来说已经足够好了。 您可以通过添加更多数据种类来扩展它。 唯一的限制是它是一种单向通信逻辑:多个写入器和一个侦听器,侦听器无法向他们回复。 如果您想要进行双向通信,则无法使用此逻辑,因为写入器仅在读取器已经在侦听时才设置(读取器创建 MMF)。 您必须重新制定此逻辑以拥有具有两个内存映射文件(一个用于读取,另一个用于写入)的客户端-服务器逻辑,这是下一篇文章的主题,在下一篇文章中,我将介绍我用来将 JazzVPN 服务与 GUI 前端通信的双向 IPC。