Apache Kafka 0.9 Scala Producer/Consumer 结合 RxScala 的一些魔法





5.00/5 (12投票s)
一篇关于如何使用开源 Apache Kafka 消息框架的文章,并加入了一些 RxScala 以增加趣味性。
引言
在我目前的工作中,我大约 50% 的时间花在 .NET 上,另外 50% 的时间花在 Scala 上。因此,许多 Scala/JVM 的工具最近引起了我的兴趣。我最新的任务是尝试学习 Apache Kafka,至少理解其核心概念。我已经阅读了一两本关于 Apache Kafka 的书,现在我觉得自己至少在这篇文章中说得有些道理。
那么,Apache Kafka 到底是什么?
这是 Apache Kafka 团队对他们工具的描述。
Apache Kafka 是一个重新思考过的发布-订阅消息系统,它是一个分布式提交日志。
快速
一个 Kafka broker 每秒可以处理来自数千个客户端的数百万兆字节的读写请求。
可扩展性
Kafka 被设计成一个单一集群,可以作为大型组织的核心数据主干。它可以弹性地、透明地进行扩展,而无需停机。数据流被分区并分散在集群机器上,以便能够处理比任何单机能力更大的数据流,并允许集群协调消费者。
持久性
消息被持久化到磁盘并在集群内复制以防止数据丢失。每个 broker 在不影响性能的情况下可以处理 TB 级消息。
分布式设计
Kafka 具有现代化的集群中心设计,提供强大的持久性和容错保证。
摘自 http://kafka.apache.org/,日期为 2016/03/11
Apache Kafka 是由 LinkedIn 的工程师团队设计和构建的,我相信您会同意他们可能需要处理相当多的数据。
在本文中,我将向您介绍一些核心的 Apache Kafka 概念,并展示如何创建一个 Scala 的 Apache Kafka 生产者和一个 Scala 的 Apache Kafka 消费者。我还会为 Apache Kafka 消费者代码添加一些 RxScala 的魔法,以便将 RX 操作符应用于传入的 Apache Kafka 消息。
我曾尝试模仿我几年前写的一篇关于 SignalR 和 RX 与简单 WPF 用户界面集成的流行的 .NET 文章。在下面的部分中,如果合适,我会引用这篇 .NET 文章,因为它可能对您感兴趣。
代码在哪里?
我已将代码上传到我的 GitHub 账户。该 Git 仓库实际上有两个分支。
- 其中一个分支实现了基本的功能,就是一个简单的生产者/消费者。生产者将 JSON 发送给消费者。 您可以从这个分支获取代码
- 另一个分支则更加完善,并引入了一个更结构化/可重用/通用的消费者 API。如上所述,我还引入了 RxScala,以便 Rx 可以“搭便车”利用 Kafka 消费者接收的传入消息。 您可以从这个分支获取更完善的版本(本文基于此版本)
Kafka 概念
本节将讨论核心的 Apache Kafka 概念,这将帮助您更好地理解代码。
总体思路
如前所述,Apache Kafka 是一个先进的分布式/容错消息系统。它还有其他一些功能,例如分片、发布-订阅和事件溯源的能力。
让我们来谈谈简单的操作。
有一个生产者负责生产数据,它通常会连接到一个 Apache Kafka broker(很可能是一个集群)。生产者会将数据推送到一个称为“Topic”的东西上。“Topic”是生产者和消费者都同意的一种标识。生产者会在某个 Topic 上生产消息,消费者可以设置为从某个 Topic 读取。最基本的就是这样。但是,让我们谈谈上面提到的一些好东西。
分区 (Partitioning)
所以,Apache Kafka 有一个给定 Topic 的分区概念。这意味着您可以找到一种方法将消息分散到多个分区。例如,如果您按 IP 地址记录消息,您可能会决定所有以 127.192 开头的消息都进入一个分区,而其他消息则进入另一个分区。Apache Kafka 允许这样做。您需要为您的特定用例编写一些代码,但这是可能的。
发布-订阅 vs 队列
消息传递传统上有两种模式:队列和发布-订阅。在队列中,一组消费者可以从服务器读取,每条消息只发送给其中一个;在发布-订阅中,消息会广播给所有消费者。Apache Kafka 提供了一个单一的消费者抽象,它概括了这两种模式——消费者组。
消费者用一个消费者组名称来标识自己,发布到 Topic 的每条消息都会被发送给订阅的每个消费者组中的一个消费者实例。消费者实例可以位于不同的进程或不同的机器上。
如果所有消费者实例都具有相同的消费者组,那么这就像一个传统的队列,将负载均衡到各个消费者。
如果所有消费者实例都具有不同的消费者组,那么这就像发布-订阅,所有消息都会广播给所有消费者。
也许下面的图能更好地说明这一点。
事件溯源 (Event Sourcing)
现在,分布式集群容错消息系统可以用于事件溯源可能并不明显。然而,Apache Kafka 有一个提交日志(每个分区一个)的概念,它能够将消息存储(在磁盘上)一段时间。消费者可以选择提交到日志,这会移动它们的索引,但它们也可以回溯到某个时间点,并消费日志中从该索引开始的所有消息。
索引由消费者提交,但存储在 Apache Zookeeper 中。Zookeeper 是一个用于协调分布式应用程序的奇特软件。我可能会写另一篇文章介绍 ZooKeeper,但这属于另一回事。现在,您只需要知道 ZooKeeper 负责维护索引(每个分区一个)。
生产者(s)
生产者的职责是向 Topic 发布消息,而消费者基本上只需要关心这一点。如上所述,消费者决定使用哪个消费者组,这又决定了哪些消费者可以看到哪些消息。
消费者(s)
消费者需要提供几个信息。 namely
- Apache Kafka broker 连接详情
- Topic 详情
- 消费者组详情
通过提供这些详情,消息才能被正确地接收到消费者中。
分区偏移量 (Partition Offset)
如上所述,Apache Kafka 有提交日志的概念。实际上它更复杂一些,因为每个分区都有一个提交日志。分区策略是您的应用程序代码必须定义的。然而,对于 Topic 的每个分区,都有一个单独的提交日志/偏移量。这个偏移量由消费者调整,实际值存储在 Zookeeper 中(Apache Kafka 负责处理这部分)。
应该注意的是,即使您可以创建多线程消费者,也只有一个线程可以访问分区的偏移量,所以即使您有多个线程在运行做相同的事情,也只有一个线程能够调整提交日志。
消费者组 (Consumer Groups)
如上所述,有一个消费者使用的“消费者组”概念。如果多个消费者共享一个消费者组,那么消息将发送给其中一个消费者。如果所有消费者实例都具有不同的消费者组,那么这就像发布-订阅,所有消息都会广播给所有消费者。
消费者如何使用 Zookeeper
Apache Kafka 在内部使用了另一个 Apache 项目,即 Apache Zookeeper。Apache Zookeeper 用于存储消费者偏移量。让您熟悉 Apache Zookeeper 的内容不在本文的讨论范围之内,但简而言之,Apache Zookeeper 用于提供分布式应用程序同步。
一个演示应用
本节将向您介绍一个简单的基于 Scala 的演示应用程序,它执行以下操作:
- 拥有一个简单的 Scala 生产者,它将 JSON 序列化的消息推送到一个 Topic/消费者组。
- 拥有一个简单的 Scala 消费者,它能够读取多个 Topic。
- 拥有一个通用的抽象,用于一个通用的 Apache Kafka 客户端,该客户端还将传入的消息公开为一个 RxScala Observable[T] 流。
- 拥有一个通用的仓库抽象。这样,单个仓库就可以用于监控特定的 Apache Kafka Topic。
安装
如果您在生产环境中运行,本节将不适用。但是,如果您像我一样,喜欢在自己电脑的安全环境(我的电脑是 Windows 10)中进行隔离测试,请继续阅读。
本质上,生产环境会运行 Apache Kafka 集群以及很可能的 Zookeeper 集群,所有这些都运行在 Linux 服务器上。
然而,对于测试来说,这有点麻烦。所以,让我们继续尝试在 Windows 机器上作为单个本地实例运行 Apache Kafka/Zookeeper。我们该怎么做?
最好的起点是按照另一个 CodeProject 教程。
www.codeproject.com/Articles/1068998/Running-Apache-Kafka-on-Windows-without-Cygwin
如果您按照该指南一直操作到最后,您将对如何设置 Apache Kafka/Zookeeper 有一个相当好的理解。
每次重启机器时必须执行的任务
每次重启您的电脑时,您都需要执行一些任务。本质上,您必须在可以期望本文附带的生产者/消费者代码正常工作之前执行这两个步骤。
与 GitHub 代码相关的是一个名为“Kafka.txt”的文件,其中包含一些有用的笔记。
每次希望运行本文附带的生产者/消费者代码时,您都需要以管理员身份运行这两个命令行(作为管理员)。
注意:您可能需要根据安装位置更改路径。
1. 运行 Zookeeper
cd C:\Apache\Apache-zookeeper-3.4.6\bin
zkserver
2. 运行 Kafka
cd C:\Apache\Apache-Kafka-0.9.0.0
.\bin\windows\kafka-server-start.bat .\config\server.properties
创建 Topic (只需执行一次)
您还需要为附带的生产者/消费者代码创建 Topic。您只需执行一次此操作,并且还必须确保 Kafka/Zookeeper 正在运行,请参阅上面的 2 个步骤。
一旦知道它们都在运行,我们就可以简单地使用以下命令行:
cd C:\Apache\Apache-Kafka-0.9.0.0\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order-placed-messages
可以看到,我们为 Topic 设置了某些内容,例如要使用的 ZooKeeper broker、副本/分区数。我相信您会同意,这些都是不错的功能。
生产者
生产者是更容易解释的组件,它本质上由几个类和一个配置文件组成。我们现在将逐个介绍它们。
生产者属性
生产者需要填写某些属性才能运行。为此,我们仅使用 Google Guava 库,该库允许我们轻松读取配置值。
这是生产者的属性文件。
bootstrap.servers=localhost:9092 acks=all retries=0 batch.size=16384 auto.commit.interval.ms=1000 linger.ms=0 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer block.on.buffer.full=true
消息
消息本身是简单的 case classes,使用 Play JSON 库序列化传输。以下是生产者包含的消息。
package Messages
import play.api.libs.json.Json
//==============================
// serialize to json
//==============================
// val json = Json.toJson(FastMessage("test", 30))
//==============================
// deserialize
//==============================
// var otherFast = json.as[FastMessage]
// val name = otherFast.name
// println(s"The name of the other fast one is $name")
case class FastMessage(name: String, number: Int)
object FastMessageJsonImplicits {
implicit val fastMessageFmt = Json.format[FastMessage]
implicit val fastMessageWrites = Json.writes[FastMessage]
implicit val fastMessageReads = Json.reads[FastMessage]
}
package Messages
import play.api.libs.json.Json
//==============================
// serialize to json
//==============================
// val json = Json.toJson(OrderPlacedMessage(new java.util.Date()))
//==============================
// deserialize
//==============================
// var theOrder = json.as[OrderPlacedMessage]
// val timeStamp = theOrder.timeOfMessage
// println(s"The name of the other is $timeStamp")
case class OrderPlacedMessage(timeOfMessage: java.util.Date)
object OrderPlacedMessageJsonImplicits {
implicit val orderPlacedMessageFmt = Json.format[OrderPlacedMessage]
implicit val orderPlacedMessageWrites = Json.writes[OrderPlacedMessage]
implicit val orderPlacedMessageReads = Json.reads[OrderPlacedMessage]
}
这里没有什么更多的要说的了,这些对象实际上就是属性容器。
生产者循环
生产者需要生产消息。生产者如何生产消息取决于您的业务需求。对我而言,我只是想要一个循环,每隔一段时间就会发送新消息。为此,我有一个新的 Runnable
(线程抽象),它将以 x 时间间隔运行消息生产逻辑(针对给定消息类型)。由于演示代码支持两种消息,因此存在两个这样的 Runnable
类。我们一秒钟后就会看到它们。
现在,这是生产者的代码,可以看到此类基本上只是将两个消息运行器的执行交给标准的 Executors.newSingleThreadScheduledExecutor
JVM 执行器。
import org.apache.kafka.clients.producer.KafkaProducer
import java.util.Properties
import com.google.common.io.Resources
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
object ScalaProducer {
def main(args: Array[String]): Unit = {
val scalaProducer = new ScalaProducer()
scalaProducer.run(args)
}
}
/**
* This producer will send a bunch of messages to topic "fast-messages". Every so often,
* it will send a message to "heartBeat-messages". This shows how messages can be sent to
* multiple topics. On the receiving end, we will see both kinds of messages but will
* also see how the two topics aren't really synchronized.
*/
class ScalaProducer {
def run(args: Array[String]) : Unit = {
println("Press enter to start producer")
scala.io.StdIn.readLine()
var producer : KafkaProducer[String, String] = null
var closeableKafkaProducer : CloseableKafkaProducer = null
try {
val props = Resources.getResource("producer.props").openStream()
val properties = new Properties()
properties.load(props)
producer = new KafkaProducer[String,String](properties)
closeableKafkaProducer = new CloseableKafkaProducer(producer)
//"fast-messages"
val fastMessageRunnable =
new FastMessageRunnable("fast-messages",closeableKafkaProducer)
val fastMessageRunnerScheduler =
Executors.newSingleThreadScheduledExecutor()
fastMessageRunnerScheduler.scheduleAtFixedRate(fastMessageRunnable, 0, 3, TimeUnit.SECONDS);
//"order-placed-messages"
val orderPlacedMessageRunnable =
new OrderPlacedMessageRunnable("order-placed-messages",closeableKafkaProducer)
val orderPlacedMessageScheduler =
Executors.newSingleThreadScheduledExecutor()
orderPlacedMessageScheduler.scheduleAtFixedRate(orderPlacedMessageRunnable, 0, 1, TimeUnit.SECONDS);
println("producing messages")
scala.io.StdIn.readLine()
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
}
finally {
if(closeableKafkaProducer != null) {
closeableKafkaProducer.closeProducer()
}
}
}
}
用于消息生产的可运行类 (Runnable)
如上所述,我决定将每条消息的生产放在自己的 Runnable
中,该 Runnable
将被调度以每 x 时间间隔生产一种所需类型的新消息。那么这些 Runnable
类看起来像什么呢?嗯,它们以一个简单的基类开始,该基类使用 KafkaProducer[String,String]
类实际发布消息,我将其包装起来,以便随时安全地关闭它。
import org.apache.kafka.clients.producer.KafkaProducer
class CloseableKafkaProducer(val producer : KafkaProducer[String, String]) {
var isClosed : Boolean = false
def closeProducer() : Unit = {
if(!isClosed) {
producer.close()
}
}
}
实际的 Runnable
代码看起来像这样,我们简单地使用 KafkaProducer[String,String]
的 send(..)
方法。
import org.apache.kafka.clients.producer.ProducerRecord
abstract class MessageRunnable extends Runnable {
val topic : String
val closeableKafkaProducer : CloseableKafkaProducer
override def run() : Unit = {
try {
val message = produceMessage()
println("running > " + message)
closeableKafkaProducer.producer.send(new ProducerRecord[String, String](topic, message))
closeableKafkaProducer.producer.flush()
println(s"Sent message on topic '$topic'")
}
catch {
case throwable: Throwable => {
val errMessage = throwable.getMessage
println(s"Got exception : $errMessage")
closeableKafkaProducer.closeProducer()
}
}
}
def produceMessage() : String
}
继承此类用于创建要发送的正确 JSON 格式,如下所示,“fast-messages” Topic,我们使用 FastMessage
scala case class,它作为 JSON payload 发送。
import java.util.Calendar
import play.api.libs.json.Json
import Messages.{FastMessage}
import Messages.FastMessageJsonImplicits._
class FastMessageRunnable(
val topic : String,
val closeableKafkaProducer : CloseableKafkaProducer)
extends MessageRunnable {
override def produceMessage(): String = {
Json.toJson(FastMessage("FastMessage_" +
Calendar.getInstance().getTime().toString(),1)).toString()
}
}
消费者
注意:对于消费者,我只设置了“fast-messages” Topic 的代码,但您只需按照“order-placed-messages” Topic 的工作方式进行即可。设置应该完全相同。
消费者比生产者稍微复杂一些,但这仅仅是因为我想要实现一些特定的目标,例如:
- 我想使用 Rx。
- 我想出一种通用的方法,使其更像一个仓库,您只需使用它即可收听某种消息类型。本质上,消息将被公开为一个
Observable[T]
,这样您就可以使用所有炫酷的 Rx(用于 Scala)功能。
我将逐步介绍构成此功能的每个部分。
消费者属性
消费者需要填写某些属性才能运行。为此,我们仅使用 Google Guava 库,该库允许我们轻松读取配置值。
这是生产者的属性文件。
bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer # fast session timeout makes it more fun to play with failover session.timeout.ms=10000 # These buffer sizes seem to be needed to avoid consumer switching to # a mode where it processes one bufferful every 5 seconds with multiple # timeouts along the way. No idea why this happens. fetch.min.bytes=50000 receive.buffer.bytes=262144 max.partition.fetch.bytes=2097152
底层的 Kafka 消费者部分
正如我上面所说,我希望能够使用 Rx 并创建可重用的架构部分。因此,我戴上思考帽,决定我需要一个通用的 Kafka 消费者,以及一个对其进行特化,该特化仅处理正确的 JSON 消息反序列化(就像我们为生产者做的那样)。为此,有一个通用的基类(将永远只有这一个,请参见 GenericKafkaConsumer
)和一个消费者类(请参见 FastMessageKafkaConsumer
)每个消息类型。
让我们从通用的 Kafka 消费者基类开始,以下是代码的主要要点:
- 这个通用类接收一个 Topic,并且只处理与请求 Topic 匹配的传入记录 Topic 的消息。
- 有一个消息读取循环,它使用
KafkaConsumer[string, string]
从网络读取消息。 - 有一个抽象方法
readTopicJson(..)
,它期望由该类的继承者实现。 - 正在使用一个 RxScala Subject 将传入的消息公开为一个
Observable[T]
。
GenericKafkaConsumer
import java.io.Closeable
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import Messages.FastMessage
import com.google.common.io.Resources
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Arrays
import java.util.Properties
import java.util.Random
import play.api.libs.json.{Reads, Json}
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject
///**
// * This program reads messages from a single topic.
// * Whenever a message is received it is pumped out using RX
// */
abstract class GenericKafkaConsumer[T](topic : String) extends Closeable with Runnable {
val topicSubject = PublishSubject.apply[T]()
var consumer : KafkaConsumer[String, String] = null
val pool : ExecutorService = Executors.newFixedThreadPool(1)
var shouldRun : Boolean = true
def startConsuming() : Unit = {
pool.execute(this)
}
def run() : Unit = {
try {
val props = Resources.getResource("consumer.props").openStream()
val properties = new Properties()
properties.load(props)
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000))
}
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(Arrays.asList(topic))
var timeouts = 0
println(s"THE TOPIC IS : $topic")
while (shouldRun) {
println("consumer loop running, wait for messages")
// read records with a short timeout. If we time out, we don't really care.
val records : ConsumerRecords[String, String] = consumer.poll(200)
val recordCount = records.count()
if (recordCount == 0) {
timeouts = timeouts + 1
} else {
println(s"Got $recordCount records after $timeouts timeouts\n")
timeouts = 0
}
val it = records.iterator()
while(it.hasNext()) {
val record : ConsumerRecord[String,String] = it.next()
val recordTopic = record.topic()
if(recordTopic == topic) {
val message = readTopicJson(record,topic)
message.map(x => {
println(s"Message about to be RX published is $x")
topicSubject.onNext(x)
consumer.commitSync()
})
}
else {
println(s"Unknown message seen for topic '$recordTopic' .....crazy stuff")
}
}
}
}
catch {
case throwable : Throwable =>
shouldRun = false
val st = throwable.getStackTrace()
println(s"Got exception : $st")
topicSubject.onError(throwable)
}
finally {
shutdownAndAwaitTermination(pool)
}
}
protected def readJsonResponse[T](
record: ConsumerRecord[String,String],
topicDescription : String)(implicit reader: Reads[T]) : Option[T] = {
try {
println(s"$topicDescription >")
Some(Json.parse(record.value()).as[T])
}
catch {
case throwable: Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
None
}
}
def getMessageStream() : Observable[T] = {
topicSubject.asInstanceOf[Observable[T]]
}
override def close() : Unit = {
println(s"GneericKafkaConsumer closed")
shouldRun = false
shutdownAndAwaitTermination(pool)
}
def shutdownAndAwaitTermination(pool : ExecutorService) : Unit = {
// Disable new tasks from being submitted
pool.shutdown()
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
pool.shutdownNow()
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
println("Pool did not terminate")
}
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow()
// Preserve interrupt status
Thread.currentThread().interrupt()
}
}
//implemented by inheritors
def readTopicJson(record : ConsumerRecord[String,String], topic : String) : Option[T]
}
现在我们已经了解了这个通用基类做了什么,让我们看看特化。
FastMessageKafkaClient
重要提示:您将需要每种消息类型一个这样的类,并且您需要将其映射到 MessageClient
类。
所以,我们有了一个很好的基类,它完成了大部分工作。它唯一不做的事情是反序列化。为此,我决定使用一个特化基类来完成这项工作。以下是 FastMessage
的示例。与生产者端一样,我使用了 Play JSON 库。
import Messages.FastMessage
import Messages.FastMessageJsonImplicits._
import org.apache.kafka.clients.consumer.ConsumerRecord
class FastMessageKafkaConsumer
extends GenericKafkaConsumer[FastMessage](Consumers.fastMessageTopic) {
def pushOneOut(m : FastMessage) : Unit = {
topicSubject.onNext(m)
}
override def readTopicJson(
record : ConsumerRecord[String,String],
topic : String) : Option[FastMessage] = {
readJsonResponse[FastMessage](record,topic)
}
}
创建一个通用的可重用 RxScala 消息客户端
到目前为止,我们有一个通用的 Kafka 消费者,它公开了一个 RxScala Subject 作为 Observable[T]
,每当读取到与消费者 Topic 匹配的传入消息时,它就会调用 OnNext
。我们还看到该通用基类有一些特化,它们提供了正确的 JSON 反序列化。
那么,我们还想做什么呢?
嗯,如果您还记得,我说过我想要实现以下目标:
- 我想使用 Rx。
- 我想出一种通用的方法,使其更像一个仓库,您只需使用它即可收听某种消息类型。本质上,消息将被公开为一个
Observable[T]
,这样您就可以使用所有炫酷的 Rx(用于 Scala)功能。
那么我们该如何实现呢?嗯,简单来说,我们想创建一个更好、更可靠、可重试的 Observable[T]
流。在演示代码中,这是 MessageClient 类的任务,它看起来像这样:
import rx.lang.scala.subscriptions.CompositeSubscription
import rx.lang.scala.{Subscription, Observable}
class MessageClient() {
val consumerMap = setupMap()
def setupMap() : Map[String, (() => GenericKafkaConsumer[AnyRef])] = {
val map = Map[String, () => GenericKafkaConsumer[AnyRef]]()
//TODO : you would need to add other consumers to the map here
//TODO : you would need to add other consumers to the map here
//TODO : you would need to add other consumers to the map here
//TODO : you would need to add other consumers to the map here
val updatedMap = map + (Consumers.fastMessageTopic, () =>
{
new FastMessageKafkaConsumer().asInstanceOf[GenericKafkaConsumer[AnyRef]]}
)
updatedMap
}
def getMessageStreamForTopic[T](topic : String) : Observable[T] = {
Observable.create[T](observer => {
consumerMap.get(topic) match {
case Some(messageFactory) => {
try {
val streamSource = messageFactory().asInstanceOf[GenericKafkaConsumer[T]]
streamSource.startConsuming()
val sub = streamSource.getMessageStream().subscribe(observer)
CompositeSubscription(sub, Subscription(streamSource.close()))
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
Subscription()
}
}
case _ => {
println("OH NO THATS BAD")
observer.onCompleted()
Subscription()
}
}
}).publish.refCount
}
}
这里本质上发生的是,我们使用 Observable.Create(..)
为正在请求的 Topic 创建一个 Observable[T]
。我们还确保使用 publish
来确保流被共享(供晚到的订阅者使用),并使用 refCount
来在没有活动订阅时自动处理。
通用仓库 (Generic Repository)
现在我们有了一个很好的 MessageClient
类,它完成了大部分工作,我们只需要在通用仓库代码中使用它。有人可能会说 MessageClient
和下面的代码可以合并成一个类,但对我来说,将它们分开感觉更好。
以下是主要几点:
- 我们实例化
MessageClient
来创建映射,并处理流创建(它使用前面讨论过的消费者类)。 - 我们使用几个 Rx 操作符来在失败时重试流,还将发布它以共享流,并使用
refCount
的自动处理语义。 - 我们使用
Observable.defer(..)
作为工厂,仅在第一次调用时返回Observable[T]
。
import rx.lang.scala.Observable
object GenericRepository {
private lazy val messageClient = new MessageClient()
def GetMessageStream[T](topic: String): Observable[T] = {
Observable.defer[T](messageClient.getMessageStreamForTopic[T](topic))
.repeat
.publish
.refCount
}
}
最终用法
有了所有这些,最终的用法看起来像这样,我相信您会同意这非常简单。
import Messages.FastMessage
object ScalaConsumer {
def main(args: Array[String]): Unit = {
var subs = GenericRepository.GetMessageStream[FastMessage](Consumers.fastMessageTopic)
.subscribe(x => {
println(s"RX SUBJECT stuff working, got this FAST MESSAGE : $x")
})
}
}
本文的 Rx 部分与我在 .NET 世界中使用的技术非常相似,所以我对它们的使用很熟悉。如果您对 .NET 文章感兴趣,这里是链接:
证明所有这些东西确实有效
所以,我们已经涵盖了很多内容,那么我们来看一个漂亮的截图来证明所有这些东西确实有效。
下面的截图显示了生产者和一个消费者(请记住,GitHub 上提供的演示代码中的消费者只消费“fast-messages” Topic,这就是为什么您永远不会看到“order-placed-messages”的任何消费者输出)。
点击查看大图
就这些
这就是我这次要说的全部内容。希望您喜欢。如果您有评论/投票,总是受欢迎的,所以不要害羞 ;-)