65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 ZooKeeper-Kafka 和 Redis 的简单基于服务的海量数据处理系统

starIconstarIconstarIconstarIconstarIcon

5.00/5 (16投票s)

2018年2月15日

CPOL

14分钟阅读

viewsIcon

23005

downloadIcon

356

海量连续数据采集和处理的基本基础设施,使用 ZooKeeper-Kafka 和 Redis。

引言

本文介绍了一个紧凑型软件,为大规模连续数据采集和处理提供了基本基础设施。该系统基于业界广泛使用的知名产品。Kafka 和 Zookeeper 负责数据流,Redis 作为内存数据存储。下面称为 ZKR 的产品栈。这些是来自 Wikipedia 关于 ZKR 堆栈元素的简短摘录。

  • Apache ZooKeeper 是 Apache 软件基金会的项目。它本质上是一个分布式分层键值存储,用于为大型分布式系统提供分布式配置服务、同步服务和命名注册表。

  • Apache Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,用 Scala 和 Java 编写。该项目旨在为处理实时数据馈送提供一个统一的、高吞吐量、低延迟的平台。其存储层本质上是一个“大规模可扩展的发布/订阅消息队列,设计为一个分布式事务日志……ZooKeeper 被 Kafka 用于消费者之间的协调。”

  • Redis 是一个开源的内存数据库项目,实现了一个可选持久化的分布式内存键值存储。Redis 支持不同类型的抽象数据结构,如字符串、列表、映射、集合、有序集合、hyperloglogs、位图和空间索引。Redis 的意思是 REmote DIctionary Server(远程字典服务器)。

尽管文章的代码示例采用了 ZKR 堆栈,但演示的方法相当通用。通过使用统一的接口,这些产品可以相对轻松地替换。整体系统结构如图 1 所示。

图 1. 整体系统结构。

数据源 (DS) 定期生成要存储和处理的指标和文本数据。DS 将数据写入数据流组件(在本例中是 Zookeeper/Kafka)。数据处理服务集合负责从数据流读取数据,将其存储到数据存储(在本例中是 Redis),并处理数据。处理结果也写入数据存储。用于 UI 和客户应用程序的 API 构成了读取数据存储数据的附加服务层。它为系统提供了查看功能。这种布局非常灵活且方便。使用标准的高性能数据流组件可以节省开发人员编写大量繁琐的通信代码。第三方内存数据存储支持服务快速的读/写操作。数据流和数据存储通过 Web API 和专用 DLL 适配器(后者在文章示例中使用)提供数据访问。数据处理服务独立运行,并通过数据存储进行数据交换。这种布局允许用户拥有一个相对简单的服务集合,这些服务可以用不同的编程语言开发。

Data Model

本系统的主要目的是高效的数据采集、存储和处理。为了简单起见,数据从 DS 通过数据流打包到数据存储,对象定义为 `ObjectsLib` 程序集中的 `DSData` 类。该对象由一个 `timestamp`、一个 `float` 数组(一个表示数组长度的整数不是 `DSData` 的一部分,而是在序列化时插入到数组本身之前)和一个 `string` 组成。每个 DS 定期生成一个这样的对象。它被序列化并写入数据流,然后由数据处理服务从中读取。后者如果需要,可以反序列化数据对象进行预处理,然后将其写入数据存储。服务可以决定不将数据对象写入数据存储(也许对象太旧或无效),或者只存储处理后的数据。在我们的系统中,服务执行一个非常简单的检查,看数据是否不太旧,如果不旧,则按原样存储。

每个 `DSData` 对象都带有其标识符 *key* 放置到数据流中。在我们的例子中,这是生成对象的 DS 的整数序列号。如上所述,我们简单的系统在从数据流读取数据时将其存储到数据存储,而不进行任何重新排序。图 2 显示了存储在数据存储中的数据结构的示例。可以看出,一些 `float` 可能被省略。`String` 可能包含有关指标数据的数据和澄清(例如,在我们的示例中,值 `0` 和 `3` 实际上已被写入)。

<

图 2. 数据对象

正如你所见,使用相同 *key* 存储的数据以反向时间顺序存储,并具有一定的历史深度(如果需要)。存储的数据结构不一定需要复制接收对象的结构。通常,通过一次从数据存储的读取操作,可以获得单个 *key* 的数据。但数据存储布局应根据后续读取进行优化。

设计

本系统的设计考虑了以下目标:

  • 提供统一的基础设施,适用于小型单进程解决方案(带进程内数据存储)和大型多服务系统。
  • 可以轻松替换特定的数据流和/或数据存储提供商。
  • 为水平扩展(scale-out)到多台机器提供便捷的起点。
  • 开发开放系统,能够无缝添加/删除服务。

解决方案在下表中描述。

表 1. 解决方案中文件夹和项目的描述

文件夹 描述
测试 两个测试应用程序。
FullFlowTest 允许开发人员在一个调试友好的环境中,在单个应用程序中测试完整的数据流:数据模拟 --> 写入数据流 --> 从数据流读取 --> 写入数据存储 --> 从数据存储读取
RedisEventTest 测试 Redis 事件的发布和订阅。
应用 该文件夹包含数据源模拟器 DataSourcesSimulator,用于从数据流读取数据到数据存储的服务 DataReceiver,用于数据处理的服务 DataProcessor 和展示 Dashboard,以及 NodeJsDataProcessor。所有这些应用程序都是 C# 控制台应用程序,但最后一个是用 Node.js 编写的。 _RunMe 应用程序启动整个工作流程。
Common 整个解决方案引用的基本通用程序集。
序列化器 提供标准序列化和反序列化过程的程序集。
数据存储 通用数据访问层,向用户隐藏特定数据存储的细节。
工厂 用于根据统一接口生成具体数据流和数据存储的通用工厂。
Kafka 带有 Kafka 数据生产者和消费者的具体数据流实现。使用 Confluent.Kafka 库(版本 0.11.0.0)。
Redis 使用 Redis 的具体内存数据存储实现。使用 StackExchange.Redis(版本 1.2.6.0)。

该系统提供以下数据流:

  • 将数据生成到数据流
  • 从数据流消费数据
  • 将数据写入数据存储
  • 从数据存储读取数据,以及
  • 数据处理

与数据流和存储相关的活动通过实现 `CommonInterfacesLib` 命名空间中定义的某些接口的类型来执行。

流生产者接口有一个方法,用于将 `(string key -> T value)` 对的数组发送到数据流组件。

public interface IStreamingProducer<T> : IDisposable
{
    void SendDataPairs(params KeyValuePair<string, T>[] arr);
}

这可以同步或异步完成,具体取决于具体实现。在我们的系统中,该接口由 `KafkaProducer` 类型在 `KafkaProducerLib` 命名空间中实现。其构造函数接收连接到数据流、主题和分区的参数以及错误处理委托。构造函数启动周期性地出队 `KeyValuePair` 对,并使用 `ThreadProcessing` 类型将它们写入数据流。`SendDataPairs()` 方法将对入队并触发写入过程。

流使用者接口也有一个方法来启动异步数据消费。

public interface IStreamingConsumer : IDisposable
{
    void StartConsuming();
}

类型 `KafkaConsumer` 实现该接口。

写入和读取数据存储的机制由单个接口 `IDataStore : IDisposable` 定义。

public interface IDataStore<T> : IDisposable
{
    // Add value to a key and trim historical data up to given trim length 
    void Add(string key, T t, int trimLength = -1);

    // Get values for the keys within given historical limits
    Dictionary<string, IList<T>> 
       Get(ICollection<string> keys, long start = 0, long stop = -1);

    // Get values for a single key within given historical limits
    IList<T> Get(string key, long start = 0, long stop = -1);
	
    // Trim historical data for the keys up to given trim length
    void Trim(ICollection<string> keys, int length);

    // Delete all values for the keys
    void Delete(ICollection<string> keys);

    // Publish events asynchronously
    Task PublishAsync(string channelName, string message);
	
    // Subscribe for event
    void Subscribe(string channelPattern, Action<string> callback);

    // Get average duration of the above operations for diagnostic purposes
    Dictionary<string, TimeSpan> OperationsAverageDuration { get; }
}

上述接口支持向数据存储添加数据、修剪历史数据、按单个键或一组键获取数据。它还允许发布和订阅数据存储事件(Redis 提供的一个有用功能)。类型 `RedisAdapter` 实现该接口。

如你所见,数据流和存储的接口非常基本且要求不高。这里,它们各自的具体实现处理 Kafka 和 Redis 这两个特定产品。工厂类型 `StreamingProducer`、`StreamingConsumer` 和 `DataStore` 将通用接口与其具体实现分离。这意味着可以使用适当的工厂类型实例化接口的不同实现。

从 `FullFlowTest` 应用程序开始熟悉系统很方便。该应用程序在调试友好的环境中演示了数据从 DS 到数据处理的所有阶段。`StreamingProducerFactoryLib.StreamingProducer` 类型的静态方法 `StreamingProducer.Create()` 创建一个 `DSData` 生产者对象 `streamingDataProducer` 来模拟 DS 的数据生成。其 `SendDataPairs()` 方法实际上异步生成 `KeyValuePair[]` 数组作为键->值对(在此测试用例中只有一个对)。然后实例化 `StorageWriter` 类,创建一个 `dataStorageWriter` 对象。该对象负责将数据插入数据存储。其构造函数启动适当的周期性进程。构造函数接收所有必需的参数,包括存储的地址和端口、写入周期、数据过滤器和错误处理方法。类型参数 `T` 定义了存储数据的格式,在示例中可以是 `byte[]` 或 `string`。后者更容易被各种读取器(特别是 Node.js 读取器)读取。数据存储写入器 `StorageWriter` 的工作方式如下:其构造函数启动一个周期性进程,该进程将数据从内部队列出队,并在另一个线程中将它们写入数据存储。数据流使用者(下文介绍)会将从数据流读取的数据入队到队列中。数据流使用者对象 `streamingDataConsumer` 由 `StreamingConsumer.Create()` 静态方法创建,该方法提供一个消息处理器作为参数,其 `StartConsuming()` 方法启动从数据流读取。消息处理器只是将消息入队到数据存储写入器队列,但也可以分配一些过滤和预处理任务。

代码示例

必备组件

第一步是安装 Zookeeper、Kafka 和 Redis。在 Windows 上安装 Zookeeper 和 Kafka 的简单但全面的指南可以在例如 此处(Zookeeper 和 Kafka)找到。Redis 可作为 msizip 文件从 此处(Redis)下载。使用了版本 Redis-x64-3.2.100。

ZKR 堆栈的元素作为 Windows 服务安装,但也可以作为控制台应用程序运行。为了在我们代码示例中简化,我们将使用相应的命令文件 `1 - Start_Zookeeper.cmd`、`2 - Start_Kafka.cmd` 和 `Start_Redis.cmd` 将它们作为控制台应用程序运行(文件名中的数字表示它们的启动顺序)。

关于运行 Redis 的注意事项。默认情况下,Redis 以所谓的 `protected` 模式运行,只允许本地调用。要允许网络调用,Redis 服务器不应在 `protected` 模式下启动。

redis-server.exe --protected-mode no

这是 Redis 从我们的命令文件中启动的方式。

NodeJsDataProcessor 项目是用 Node.js 开发的。因此,如果你想运行它,你需要安装 Node.js,例如按照 此指南。从 此处安装 Visual Studio 的 Node.js 开发工具也会很有用。

一些我在玩这个示例时凭经验发现的更方便的技巧。更新计算机中的 Java 可能会导致 ZKR 堆栈元素使用的环境变量 `JAVA_HOME` 发生变化,并且它们可能停止工作。Zookeeper 应该在 Kafka 之前启动。在每次启动 Kafka(至少作为控制台应用程序)之前,应该从 Kafka 安装文件夹(在我的安装中是 `C:\kafka_2.11-0.11.0.1`)删除包含 Kafka 日志和 Zookeeper 数据(在我安装中是 `kafka_2.11-0.11.0.1kafka-logs` 和 `kafka_2.11-0.11.0.1zookeeper-data`)的文件夹。
注意。当你按照下面的说明运行示例应用程序时,你无需担心这些复杂性,因为这些操作由样本应用程序本身执行。

成功安装 ZKR 堆栈元素后,必须安装相应的 ZKR 引用程序集。这可以通过 NuGet 来完成。所需包包含在相应项目的 `packages.config` 文件中。它们是以下组件:

  • 对于 KafkaConfluent.Kafka.0.11.0librdkafka.redist.0.11.0
  • 对于 RedisStackExchange.Redis.1.2.6

运行软件

安装了所需的包并构建了解决方案后,你就可以运行应用程序了。建议从测试应用程序 `FullFlowTest` 开始。该应用程序首先作为三个控制台应用程序启动了 ZKR 堆栈的所有元素(你将看到屏幕上出现三个额外的控制台窗口),然后执行数据流链的所有步骤:

数据模拟 --> 写入数据流 --> 从数据流读取 --> 写入数据存储 --> 从数据存储读取

上述操作(除了最后一次)只执行一次。从数据存储读取是周期性执行的。因此,应用程序读取并显示与分隔字符串相同的 `DSData` 对象。从第二次开始,`string` 对象前面会有一个“?”,表示数据是旧的(事实上,读取的是具有固定 `timestamp` 的同一个对象)。要退出应用程序,按任意键并关闭所有 ZKR 窗口。

你可以继续进行下一个小型测试应用程序 `RedisEventTest`。它演示了 Redis 事件的使用。这些事件是非常方便的进程间同步工具。该应用程序首先启动 Redis,然后邀请用户发布事件。结果显示在屏幕上。

最后,可以通过从 *Applications* 文件夹启动 `_RunMe` 应用程序来测试主要数据流。它启动所有 ZKR 应用程序,然后启动 `DataProcessor`、两个 `DataReceiver`、`Dashboard`、`NodeJsDataProcessor`,最后是 `DataSourcesSimulator`。`_RunMe` 通过数据存储(Redis)配置所有启动的应用程序(但 `DataSourcesSimulator` 除外,它自行配置):它将配置数据写入应用程序知道的数据存储,并且应用程序读取它们。根据默认配置,`DataSourcesSimulator` 每秒生成并写入 1000 个带有 4 个 float 值数组的 `DSData` 对象到数据流(Kafka)中。这些数据均匀地分布在两个 `DataReceiver` 之间,它们从数据流读取数据并写入数据存储。第一个 `DataReceiver` 将数据存储为字节数组,第二个将数据存储为 `string`。`DataProcessor`、`Dashboard` 和 `NodeJsDataProcessor` 从数据存储读取数据。`Dashboard` 可视化来自一个 DS 的数据(在本例中,这是由 `DataSourcesSimulator` 模拟的正弦函数)。

图 3. 使用 *Dashboard* 应用程序进行数据可视化。

用 Node.js 编写的非常简单的(几行代码)应用程序 `NodeJsDataProcessor`,说明了使用不同供应商开发的用不同语言编写的服务来扩展系统的可能性。

一些测量和讨论

运行示例时,进行了一些时间测量。使用 Dell 笔记本电脑(CPU i7-7600U、RAM 16 GB、2.90 GHz;Windows 10 Enterprise)进行本地测试。一台较旧的笔记本电脑(CPU i7-3610QM、RAM 16 GB、2.30 GHz;Windows 10 Pro)在远程测试中运行 Redis。使用了相对较慢的家庭 WiFi 连接。测量单个记录的读写 Redis 的平均持续时间很有趣。下表呈现了一些结果。

表 2. Redis 读写操作的平均持续时间。

操作 (Operation) 写入 byte[]
mcs
写入 string
mcs
读取 byte[]
mcs
读取 string
mcs
本地 140 140 120 320

写入 Redis 的测试是在同步模式下进行的(`RedisAdapter` 类中的 `CommandFlags.None`)。直接远程读写花费的时间要长得多(毫秒)。在弱网络下,大量远程读取和同步写入是不可能的(尽管对于此示例,使用 `CommandFlags.FireAndForget` 的异步写入是成功的)。

这些粗略的测量为系统布局设计提供了一些线索。当 Redis 作为本地应用程序安装时,运行在不同机器上的服务可以异步写入数据,而本地读取是首选。对于大量读取,最好将数据存储为字节数组,但要记住使用不同编程语言的读取器的可能性。在可能的情况下,在写入数据存储之前预处理数据是一个好主意。不要低估明智地分组存储数据以优化其后续读取。如果绝对有必要进行远程数据读取,那么最好先在本地读取数据,然后再通过其他通信方式将其传输到客户端机器。应该检查 Redis 集群以进行远程操作。

因此,很明显,数据存储的性能,特别是对于远程读取,对于实际应用程序来说应该得到改进。这些增强可能包括,例如,使用 Redis 管道技术,该技术允许一次发送多个命令。可以使用 Redis 集群来缓解远程调用问题。可以使用 `redis-benchmark` 工具进行更精确的测量。很有可能,熟练的 Redis 专家会立即提出如何提高性能。

结论

文章中介绍的简单数据采集和处理基础设施,允许开发人员构建从单进程应用程序到复杂的分布式多服务系统的各种软件。文章中的示例处理 Zookeeper - Kafka - Redis 堆栈。但这些产品可以替换为其他产品,而不会改变基本基础设施(例如,可以考虑 RabbitMQ 用于数据流,Apache Ignite 是数据存储的一个有前途的候选者)。此外,在简单的单进程系统中,数据流和/或数据存储可以使用“内部”开发,利用相同的通用接口。下一步是使用分布式版本的数据流和数据存储。例如,使用 Redis 的分布式集群安装将非常有趣。文章中提供的示例可以轻松地迁移到 .NET Core。需要检查使用相同的第三方库或查找与 .NET Core 兼容的其他库的可能性。在云中安装该系统也将非常有趣。

历史

  • 2018 年 2 月 15 日:初始版本
© . All rights reserved.