使用 C# 和 Scala 的简单 Kafka 生产者和消费者。





5.00/5 (5投票s)
在 C# 和 Scala 中编写的简单 Kafka 生产者和消费者,其中 Avro 模式从本地文件或简单的 Web 服务器读取。
引言
本文介绍了一个用 C# 和 Scala 编写的简易 Apache Kafka 生产者/消费者应用程序。这些应用程序在功能和结构上具有可互操作性。它们在 Kafka 中处理相同的数据。
必备组件
要运行代码,需要安装 Zookeeper 和 Kafka。有关本地 Windows 安装的相应指南可以在 这里 找到。Zookeeper 和 Kafka 安装为 Windows 服务,也可以作为控制台应用程序运行。为了简化我们的代码示例,我们将使用相应的命令文件将它们作为控制台应用程序运行。
- 1 - Start_Zookeeper.cmd
- 2 - Start_Kafka.cmd
文件名中的前导数字指示启动顺序——Zookeeper 应首先启动。我将 Zookeeper 版本 3.5.5 安装在目录 C:\zookeeper-3.5.5,将 Kafka 版本 2.12-2.3.0 安装在目录 C:\kafka_2.12-2.3.0。这些目录由启动 cmd 文件使用。因此,如果您的目录不同,则应在 cmd 文件中进行相应更改。在我安装的目录中,包含 Kafka 日志和 Zookeeper 数据的文件夹是 C:\kafka_2.12-2.3.0 下的 kafka_2.12-2.3.0kafka-logs 和 kafka_2.12-2.3.0zookeeper-data。在下次运行 Zookeeper-Kafka 之前,可以删除这些目录以清除先前的数据。
注意:更新计算机上的 Java 可能会导致 Zookeeper 和 Kafka 使用的环境变量 JAVA_HOME
发生更改,从而导致它们停止运行。
设计
专用类用于生成和消费消息,在我们的例子中,这些消息构成 string Key -> Value 对。Value 是一个根据 Avro schema 以 JSON 格式构建的对象。这是一种标准且推荐的方法,可确保不同 Kafka 生产者/消费者之间的互操作性。例如,在我们的例子中,C# 和 Scala 应用程序生成和消费相同类型的对象。通常,Avro schema 可从 Kafka Schema Registry 获取(有关其详细信息,例如,可以在 Sacha Barber 的 精彩文章 中找到)。此服务器应可供所有需要的对象的 Kafka 提供者和消费者使用。Kafka Schema Registry 通过 REST API 为其客户端提供服务。特别地,特定格式的 HTTP POST 请求会将 Avro schema 传输给客户端。Confluent.Kafka
库的相应类会在后台与 Kafka Schema Registry 通信,无需任何额外代码。Kafka Schema Registry 应已安装并维护。
在不同的 Kafka 客户端中使用统一的 Avro 对象 schema 总是非常有益的。但在一些相对简单的场景下,一个标准的完整 Kafka Schema Registry 会有点“杀鸡用牛刀”,schema 也可以从更简单的服务器或仅仅从文件中获取。本文代码提供了通过简单的 HTTP GET
请求或从本地 JSON 文件读取 Avro schema 的方法。
提供了 Confluent.Kafka
生产者和消费者的包装器。消费者包装器允许 Kafka 客户端订阅消息并使用给定的回调函数进行处理。生产者包装器提供发送消息到 Kafka 的方法。两个包装器的构造函数都以自定义方式读取 Avro schema(从某个 Web 服务器或文件)。
在 C# 和 Scala 环境中,应用程序的组织方式类似。首先,提供 Avro schema 访问和 Kafka 配置的数据(为简化起见已硬编码)。然后创建消费者并调用其异步方法来启动消息的消耗。此方法将消息处理回调作为参数。完成此准备后,创建生产者并定期调用其发送消息的方法。如前所述,消息以 Key -> Value 对的形式放入 Kafka。Key 的类型是 string
,Value 是标准的 Avro 类型 GenericRecord
。
访问 Schema
对于 C# 和 Scala 解决方案,Avro schema 文件 schema.json 位于解决方案根目录下的 wwwroot 目录中。它要么直接通过文件系统(默认)访问,要么通过一个简单的 Web 服务器访问。为了测试后者,我在我的 Windows 系统中使用了 IISExpress。应该将一个访问 schema 文件的站点添加到 IISExpress 配置文件 C:\Users\Igorl\Documents\IISExpress\config\applicationhost.config 中。相应的部分放在文件 SiteForIISExpressConfig.txt 中。此文件的内容应复制到 IISExpress 配置文件中相应的章节。然后应该为该站点启动 IISExpress。这可以通过运行 C# 和 Scala 解决方案主目录中放置的 startIIS.cmd 文件来完成。
注意:在 C# 和 Scala 解决方案中,要切换到使用 IISExpress 访问 schema,请将硬编码的布尔常量 isFromLocalFile
的值更改为 false
(代码中用注释 //1
标记了相应的行)。
C#
项目 KafkaHelperLib
提供了 Confluent.Kafka
生产者和消费者的包装器。该项目是用 .NET Standard 开发的,因此可以从 .NET Core 和 Framework 项目引用。该项目包含 KafkaConsumer
和 KafkaProducer
类。它们将一个已知的 GenericRecord
Avro 对象序列化/反序列化为 Kafka 数据对的 Value。类 RecordConfig
在其构造函数参数中接受 schema 访问 string schemaRegistryUrl
。其 public
方法 GetSchemaRegistryClient()
返回 ISchemaRegistryClient
接口的对象。通常 class Confluent.SchemaRegistry.CachedSchemaRegistryClient : ISchemaRegistryClient, ...
被用作 ISchemaRegistryClient
接口的实现。此类访问 Kafka Schema Registry。在我们的简化场景中,我们创建了一个“存根”类 SchemaRegistryClient
作为 ISchemaRegistryClient
接口的实现。KafkaConsumer
和 KafkaProducer
类的构造函数都使用 RecordConfig
类来分别创建 Avro 序列化器和反序列化器对象。
项目 HttpClient
也构建为 .NET Standard,可用于 .NET Core 和 Framework。它仅用于从旧解决方案中实现 HTTP GET
调用,以便从自定义 Web 服务器接收 Avro schema。
测试应用程序 KafkaApp
是一个 .NET Core 应用程序。它将 Kafka 生产者和消费者组合在一起。该应用程序为消耗的对象提供了一个回调。此回调函数只是将对象的内容打印到控制台。生产者根据 Avro schema 创建对象,并将它们发送到 Kafka。
Scala
Scala 解决方案与 C# 解决方案基本相似,但在这里 GenericRecord
对象被序列化为/反序列化为 Array[Byte]
,以便使用标准的 ByteArraySerializer
和 ByteArrayDeserializer
类。Scala 应用程序还将消耗的 Kafka 对打印到其控制台。为了区分 C# 和 Scala 生成的对象,Scala 生成的对象具有一个负数的 Id
字段。
结论
本文介绍了用 C# 和 Scala 编写的简单 Kafka 生产者和消费者代码。使用 Avro schema 创建的对象被生成和消耗。Avro schema 从本地文件或简单的 Web 服务器读取。
历史
- 2019年10月26日:初稿