65.9K
CodeProject 正在变化。 阅读更多。
Home

用于间歇性输入数据的处理设计模式

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2020 年 6 月 29 日

CPOL

4分钟阅读

viewsIcon

14698

downloadIcon

159

使用 RAM 和 CPU 优化的数据处理

引言

如果您的数据是间歇性的(非连续的),那么我们可以利用时间间隔来优化 CPU 和 RAM 的利用率。 想法是在下一批数据到达之前处理数据。 假设您每 T 秒收到 N 个输入数据,每个数据的大小为 d,并且处理一个数据需要 P 秒。 使用单线程,所需的总输出时间将为 N x P 秒。 如果 N x P < T,那么无论您如何编程都没有问题。 但是,如果 N x P > T,那么您需要多个线程,即,当处理输入所需的时间大于两批连续数据之间的时间时。 如果我们为多个线程引入另一个变量,那么我们的问题简化为 [ (N x P) / c ] < T

下一个约束是您可以创建多少个线程? 这限制了因子 c。 如果 c 太高,那么它会消耗大量的 CPU。 在这里,我们引入 RAM 利用率。 当数据进入时,我们首先将其存储在内存中,然后使用 c 个线程来处理它。 因此,在任何时候,都会有 c 个活动线程和 N-c 个挂起的队列项。 假设 r 个批次可以存储在内存中,一次可以由 c 个线程处理一个批次。 一个批次的大小是 c x d。 现在我们可以将其归结为

  • [ (N x P) / (r x c) ] < T
  • r = 可承受的 RAM / (c x d)

背景

这种情况主要适用于基于轮询的系统,当您以特定频率收集数据时。 因此,假设数据流是间歇性的并且按间隔发生。 您可以利用数据收集之间的时间间隔来优化利用 CPU 和 RAM。

Using the Code

我们需要一种研究性的数据处理方法,因为一种尺寸不适合所有情况。 许多参数,如 N、d 和 P,事先都是未知的。 因此,我们需要设计也提供统计信息,以便我们可以了解 N、d 和 P,并相应地调整 CPU 和 RAM 需求。

作为粗略的指导,我们需要一种方法来摄取通过线程提交的所有数据。 然后,立即开始处理它们,或者将它们排队并在多个线程中处理它们。

C# 为线程安全集合提供阻塞和限制功能。 这是一个有趣的功能,可用于优化高工作负载应用程序的 CPU 和内存。 这种模式可以进一步堆叠和互连,以构建数据路由的有向图。 这种模式广泛用于 Apache Nifi 处理器中。

在进一步深入研究模式之前,让我们了解什么是限制和阻塞。 它们解决了什么问题?

当有多个线程尝试从容器中获取数据时,我们希望线程阻塞,直到有更多数据可用。 这被称为“阻塞”。

当多个线程正在写入数据时,我们希望它们在有可用内存来容纳新数据之前进行限制。 这被称为“限制”。

因此,我们可以使用阻塞集合作为底层数据容器。

BlockingCollection DataContainer = new BlockingCollection<string>(
new ConcurrentBag<string>(),
this.MaxContainerSize);

对于线程池,您可以使用 .NET 框架内置的线程池,但为了简单起见,我使用的是简单的线程数组。 事实上,我不倾向于让其他人“管理我的线程”😊。

Thread[] Workers = new Thread[this.MaxWorkerThreads];

for (int i = 0; i < Workers.Length; i++)
{

    Thread newThread = new Thread(new ParameterizedThreadStart(ThreadFunction));
    Workers[i] = newThread;
}

这些线程中的每一个都使用一个函数来阻塞,直到新数据到达。 这是此函数的基本骨架。

private void ThreadFunction(object threadContext)
{
       CancellationToken token = (CancellationToken)threadContext;
       while (!token.IsCancellationRequested)
       {
              string Data = DataContainer.Take();
              ProcessData(Data);           
       }
}

并且容器提供了阻塞传入线程以将新数据添加到容器的功能。

public void Add(string data)
{
    DataContainer.Add(data);
}

这就是简单的配方。

关注点

现在要优化和调整 RAM 和 CPU 利用率,您需要调整 MaxWorkerThreadsMaxContainerSize。 我们需要收集一些统计信息来了解数据流模式。

  1. 输入速率或每秒有多少数据传入?
  2. 输出速率或每秒处理多少数据?
  3. 平均活动线程
  4. 平均容器大小

这些指标以下列方式提供帮助

  1. 如果输入速率 > 输出速率,那么容器大小要么永远增长,要么在输入处会增加阻塞线程,但会使程序崩溃。
    • 所以输入速率 < 输出速率
  2. 平均活动线程,如果活动线程主要在最大限制,但容器大小接近零,那么您可以通过使用一些 RAM 来优化 CPU。
  3. 平均容器大小始终处于最大限制,那么将必须创建更多的 CPU 线程。

历史

  • 2020 年 6 月 29 日 - 发布第一个版本
  • 2020 年 6 月 30 日 - 应用格式更改
© . All rights reserved.