65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 IPC 进行数据日志记录(Windows)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (9投票s)

2017 年 12 月 1 日

MIT

4分钟阅读

viewsIcon

14215

downloadIcon

342

这是另一个 MMF IPC 数据传输类,但具有日志记录工具的特殊性:它有一个操作进程,该进程仅收集所有记录的数据,以及一个或多个被动进程,用于生成和写入日志信息。

当我们想了解一段代码的工作方式时,我们大多数人都会使用日志记录。 大多数人使用基于文件的日志记录解决方案。 但是,当您开发产品时,将日志记录到文件感觉像是一个两步操作

  1. 记录发生了什么
  2. 在日志中查看发生了什么

我从来不喜欢这样。 所以我实现了这个类,用于记录到第二个应用程序,该应用程序交互式地显示正在发生的事情。 这是另一个 MMF IPC 数据传输类,但具有日志记录工具的特殊性:它有一个操作进程,该进程仅收集所有记录的数据,以及一个或多个被动进程,用于生成和写入日志信息。

      Application logging data 1 ---\
      Application logging data 2 --------> MMF ---> Logging collector 
      Application logging data 3 ---/

我认为这种逻辑比您用于应用程序日志记录的大多数其他选项都快。 同时,它允许您合并来自同一计算机上运行的多个应用程序的日志数据。

这里最重要的是设置一套精心安排的事件和互斥锁,以了解何时可以读取以及何时可以写入,并且永远、永远不要覆盖尚未读取的内容。

我们仅使用两个事件来记录所有内容:一个 read 事件,标记何时发生了读取操作,以及一个 avail 事件,标记何时发生了写入操作。 我们还使用内存映射文件头结构来存储共享值,例如我们接下来要读取的位置,或者我们接下来要写入的位置等等。 最后,因为我们不能同时有多个线程写入数据,所以我们使用互斥锁来阻止任何其他线程进入写入方法,同时正在进行写入。

如果我们是读取应用程序,我们将运行一个线程,该线程将始终等待新数据。 这非常重要,因为到达 MMF 的任何数据,我们都必须尽可能快地处理它。 我们在这里使用两种数据,但它可以扩展到更多种类的数据。 第一个是 APPLICATION_LOGGING,它是日志本身,另一个我们称之为 APPLICATION_EOF。 我们有一块内存(内存映射文件),并且我们必须在有足够的空间这样做的情况下写入它。 我们不能将消息分成碎片。 如果我们要写入一条新的日志消息,但没有足够的可用空间,我们必须写入一个 EOF,然后回到内存区域的开头来写入我们的消息。 如果读取线程没有在共享内存的开头释放足够的空间,我们必须等待下一个读取事件。 一旦读取器读取,我们再次检查是否有足够的空间,如果有足够的空间,我们就继续写入新消息,否则我们等待下一个读取事件。

      Let's say we use R as the next reading place, W as the next writing place, 
      X as data written and not read so far and Z as EOF.
        
      We start with all memory available so we just write
      |RXXXXXXXXXXW------------------------------------------------------------------------------|

      We continue writing until we have no more free space available
      |-----------RXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXW-------|

      We can write an EOF (Z) and continue in the beginning
      |XXXXXXXXW-----------------RXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXZ-------|

这种逻辑看起来足够好,但有一个问题。 一开始,我们拥有所有的共享内存,下一个读取位置与下一个写入位置相同。 如果写入器在读取器甚至开始读取之前就填满了所有内存怎么办? 最后的写入操作将写入 EOF 并转到共享内存的开头。 同样,我们将使下一个读取位置与下一个写入位置相同。 我们应该写吗? 当然不。 但是我们如何知道何时是这种情况? 我使用写入器和读取器的“循环掩码”来解决这个问题。 每次我写入 EOF 时,我都会切换循环掩码,因此如果下一个写入指针与下一个读取指针相同,并且两者具有相同的“循环掩码”,则内存是空闲的,我们可以写入。 如果下一个写入指针与下一个读取指针相同,并且它们具有不同的“循环掩码”,则表示内存尚未被读取,我们必须等待下一个读取事件。 您可以在日志数据函数中看到此逻辑

BOOLEAN data_logging::log_data(PLOG_PACKAGE_DATA data)
{
	if (!m_running) return false;
	UINT16 transfer_size = data->data_size + data->header_size;
	if ((NULL == m_mapFile) || (NULL == m_memory)) {
		error_handler(L"log_data > memory mapped file and/or memory pointer is NULL!");
		goto log_error;
	}

	if (m_data_header->first_element_offset + transfer_size /* this packet */ + 
                           sizeof(MMFILE_HEADER) /* the needed EOF packet */ > MMFILE_SIZE)
	{
		// the package is too big and can't be handled here
		error_handler(L"Package too big. Can't handle packages greater than %u bytes.", 
                      MMFILE_SIZE - sizeof(MMFILE_HEADER) - m_data_header->first_element_offset);
		goto log_error;
	}

	if (WaitForSingleObject(m_log_writers_lock, INFINITE) != WAIT_OBJECT_0)
	{
		// the package is too big and can't be handled here
		error_handler(L"Unable to obtain writer lock - aborting");
		goto log_error;
	}
	UINT32 writeOffset = m_data_header->next_writing_pointer;
	BOOLEAN needToWriteEOFPacket = writeOffset + transfer_size /* this packet */ + 
                    sizeof(MMFILE_HEADER) /* the needed EOF packet */ > MMFILE_SIZE;

	// in case the reader is in the previous round, we have to wait for it to catch up us 
	while (((m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK) != 0) != 
                        ((m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0)) {
		int waitResult = WaitForSingleObject(m_log_read_ev, MAX_WAIT_FOR_WRITE);
		if ((waitResult != WAIT_OBJECT_0) && (((m_data_header->flags[LOG_FLAG_TEXT] & 
                      WRITE_MMF_ROUND_MASK) != 0) != ((m_data_header->flags[LOG_FLAG_TEXT] & 
                      READ_MMF_ROUND_MASK) != 0)))
		{
			error_handler(L"log_data > Write wait operation failed - waitResult = %u", 
                                      waitResult);
			ReleaseMutex(m_log_writers_lock);
			goto log_error;
		}
	}

	// manage the EOF write as a full stand-alone package so in case we need to write more 
    // than MMFILE_SIZE / 2 we can wait for the reader to read up to the EOFPacket and then 
    // we will have the whole MMF for our huge package
	if (needToWriteEOFPacket)
	{
		writeEOF();
		m_data_header->next_writing_pointer = writeOffset = 
                                        m_data_header->first_element_offset;
		char logTextFlags = m_data_header->flags[LOG_FLAG_TEXT];
		while (logTextFlags != _InterlockedCompareExchange8
                         ((char*)&m_data_header->flags[LOG_FLAG_TEXT], 
                          logTextFlags ^ WRITE_MMF_ROUND_MASK, logTextFlags))
		{
			logTextFlags = m_data_header->flags[LOG_FLAG_TEXT];
		}
		if (!SetEvent(m_log_avail_ev))
		{
			// unexpected error!
			error_handler(L"log_data > SetEvent(m_log_avail_ev) failed!");
		}
	}

	while (wait_for_read_before_write(writeOffset, transfer_size)) {
		int waitResult = WaitForSingleObject(m_log_read_ev, MAX_WAIT_FOR_WRITE);
		if ((waitResult != WAIT_OBJECT_0) && wait_for_read_before_write
                            (writeOffset, transfer_size))
		{
			error_handler(L"log_data > Write wait operation failed - waitResult = %u", 
                                      waitResult);
			ReleaseMutex(m_log_writers_lock);
			goto log_error;
		}
	}
	writeOffset += (UINT32)write(writeOffset, data->header, data->header_size);
	writeOffset += (UINT32)write(writeOffset, data->data, data->data_size);
	m_data_header->next_writing_pointer = writeOffset;
	ReleaseMutex(m_log_writers_lock);
	if (!SetEvent(m_log_avail_ev))
	{
		// unexpected error!
		error_handler(L"log_data > SetEvent(m_log_avail_ev) (2) failed!");
	}

	return true;
log_error:
	m_running = false;
	return false;
}

函数 wait_for_read_before_write 检查我们是否有足够的可用空间来存储我们的数据,或者我们是否必须等待,同时考虑循环掩码。

BOOLEAN data_logging::wait_for_read_before_write(UINT32 write_offset, UINT16 data_size) 
{
	// The logic here is that it might happen that we are about to write back from where the other 
    // thread is reading, so we have to make sure that we don't override the data that has not being
    // read yet. That explains the unequal sign on both sides. But what if we wrote all the data 
    // until the position where the reading pointer is (in it's previous round)?
	// Using the current check we will miss the possibility that we are in different rounds and we 
    // are about to write over the unread data. In order to avoid that case, we include the 
    // next_reading_pointer as a position to avoid as well.
	// >>>>>>>>>>>>>>>>W------------------------R>>>>>>>>>>>>>>>>>>>//
	BOOLEAN differentRound = ((m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK) 
              != 0) != ((m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0);
	BOOLEAN res = ((write_offset < m_data_header->next_reading_pointer) && 
               (write_offset + data_size >= m_data_header->next_reading_pointer)) || 
		(differentRound && (write_offset == m_data_header->next_reading_pointer));
	return res;
}

最后,读取线程执行 waitForLog 函数

DWORD WINAPI data_logging::waitForLog(LPVOID data)
{
	data_logging * self = (data_logging *)data;
	HANDLE waitOn[2];
	waitOn[0] = self->m_log_avail_ev;
	waitOn[1] = self->m_terminate;

	while (true)
	{
		int wait_result = WaitForMultipleObjects(2, waitOn, FALSE, INFINITE);
		if (wait_result == WAIT_OBJECT_0 + 1) // terminate has been triggered
		{
			self->m_running = false;
			break;
		}
		if (wait_result != WAIT_OBJECT_0) // unexpected result! terminate 
                                                  // as well - but report error!
		{
			self->error_handler(L"waitForLog > WaitForMultipleObjects failed - 
                                                           waitResult = %u\n", wait_result);
			goto error_case;
			break;
		}
		// we have data in the MMF, so look for it. Have to finish when both pointers 
                // have the same value and both operations are in the same round.
		while ((self->m_data_header->next_reading_pointer != 
                                   self->m_data_header->next_writing_pointer) || 
			(((self->m_data_header->flags[LOG_FLAG_TEXT] & WRITE_MMF_ROUND_MASK) != 0) != 
                           ((self->m_data_header->flags[LOG_FLAG_TEXT] & READ_MMF_ROUND_MASK) != 0)))
		{
			switch (((PLOG_HEADER)((char*)self->m_memory + self->m_data_header->
                                          next_reading_pointer))->application)
			{
			case APPLICATION_EOF:
			{
				self->m_data_header->next_reading_pointer = self->m_data_header->
                                         first_element_offset;
				char logTextFlag = self->m_data_header->flags[LOG_FLAG_TEXT];
				while (logTextFlag != _InterlockedCompareExchange8((char *)
                                       &self->m_data_header->flags[LOG_FLAG_TEXT], 
                                       logTextFlag ^ READ_MMF_ROUND_MASK, logTextFlag))
				{
					logTextFlag = self->m_data_header->flags[LOG_FLAG_TEXT];
				}
				break;
			}
			case APPLICATION_LOGGING:
			{
				PLOG_MESSAGES msg_header = (PLOG_MESSAGES)((char*)self->m_memory + 
                                         self->m_data_header->next_reading_pointer);
				int message_length = (msg_header->header.size - 
                                                        sizeof(LOG_MESSAGES))/2 - 1;
				wchar_t * message = (wchar_t*)((char*)msg_header + 
                                                                     sizeof(LOG_MESSAGES));
				if (self->m_kind_callbacks.find(APPLICATION_LOGGING) != 
                                                            self->m_kind_callbacks.end())
					((logAvailableCallbackFunction)self->
                                             m_kind_callbacks[APPLICATION_LOGGING])
                                            (msg_header, message, message_length, 
                                            self->m_kind_callbacks_parameter[APPLICATION_LOGGING]);
				self->m_data_header->next_reading_pointer += msg_header->header.size;
				break;
			}
			default:
				self->error_handler(L"waitForLog > Unexpected application code: %u\n", 
                                     ((PLOG_HEADER)((char*)self->m_memory + self->m_data_header->
                                      next_reading_pointer))->application);
				goto error_case;
				break;
			}
			if (!SetEvent(self->m_log_read_ev))
			{
				// unexpected error! log
				self->error_handler(L"waitForLog > Unexpected error, 
                                                SetEvent of m_log_read_ev failed\n");
			}
		}
	}
	return 0;
error_case:
	self->m_running = false;
	return 0;
}

我每天都使用它,并且它运行良好。 我确信它可以稍微改进一下,但是现在的状态对于我想要达到的目的来说已经足够好了。 您可以通过添加更多数据种类来扩展它。 唯一的限制是它是一种单向通信逻辑:多个写入器和一个侦听器,侦听器无法向他们回复。 如果您想要进行双向通信,则无法使用此逻辑,因为写入器仅在读取器已经在侦听时才设置(读取器创建 MMF)。 您必须重新制定此逻辑以拥有具有两个内存映射文件(一个用于读取,另一个用于写入)的客户端-服务器逻辑,这是下一篇文章的主题,在下一篇文章中,我将介绍我用来将 JazzVPN 服务与 GUI 前端通信的双向 IPC。

© . All rights reserved.