65.9K
CodeProject 正在变化。 阅读更多。
Home

Apache Kafka 0.9 Scala Producer/Consumer 结合 RxScala 的一些魔法

starIconstarIconstarIconstarIconstarIcon

5.00/5 (12投票s)

2016年3月16日

CPOL

17分钟阅读

viewsIcon

39814

一篇关于如何使用开源 Apache Kafka 消息框架的文章,并加入了一些 RxScala 以增加趣味性。

引言

在我目前的工作中,我大约 50% 的时间花在 .NET 上,另外 50% 的时间花在 Scala 上。因此,许多 Scala/JVM 的工具最近引起了我的兴趣。我最新的任务是尝试学习 Apache Kafka,至少理解其核心概念。我已经阅读了一两本关于 Apache Kafka 的书,现在我觉得自己至少在这篇文章中说得有些道理。

那么,Apache Kafka 到底是什么?

这是 Apache Kafka 团队对他们工具的描述。

Apache Kafka 是一个重新思考过的发布-订阅消息系统,它是一个分布式提交日志。


快速
一个 Kafka broker 每秒可以处理来自数千个客户端的数百万兆字节的读写请求。

可扩展性
Kafka 被设计成一个单一集群,可以作为大型组织的核心数据主干。它可以弹性地、透明地进行扩展,而无需停机。数据流被分区并分散在集群机器上,以便能够处理比任何单机能力更大的数据流,并允许集群协调消费者。

持久性
消息被持久化到磁盘并在集群内复制以防止数据丢失。每个 broker 在不影响性能的情况下可以处理 TB 级消息。

分布式设计
Kafka 具有现代化的集群中心设计,提供强大的持久性和容错保证。

摘自 http://kafka.apache.org/,日期为 2016/03/11

Apache Kafka 是由 LinkedIn 的工程师团队设计和构建的,我相信您会同意他们可能需要处理相当多的数据。

在本文中,我将向您介绍一些核心的 Apache Kafka 概念,并展示如何创建一个 Scala 的 Apache Kafka 生产者和一个 Scala 的 Apache Kafka 消费者。我还会为 Apache Kafka 消费者代码添加一些 RxScala 的魔法,以便将 RX 操作符应用于传入的 Apache Kafka 消息。

我曾尝试模仿我几年前写的一篇关于 SignalR 和 RX 与简单 WPF 用户界面集成的流行的 .NET 文章。在下面的部分中,如果合适,我会引用这篇 .NET 文章,因为它可能对您感兴趣。

 

代码在哪里?

我已将代码上传到我的 GitHub 账户。该 Git 仓库实际上有两个分支。

  1. 其中一个分支实现了基本的功能,就是一个简单的生产者/消费者。生产者将 JSON 发送给消费者。 您可以从这个分支获取代码
  2. 另一个分支则更加完善,并引入了一个更结构化/可重用/通用的消费者 API。如上所述,我还引入了 RxScala,以便 Rx 可以“搭便车”利用 Kafka 消费者接收的传入消息。 您可以从这个分支获取更完善的版本(本文基于此版本)

 

Kafka 概念

本节将讨论核心的 Apache Kafka 概念,这将帮助您更好地理解代码。

 

总体思路

如前所述,Apache Kafka 是一个先进的分布式/容错消息系统。它还有其他一些功能,例如分片、发布-订阅和事件溯源的能力。

让我们来谈谈简单的操作。

有一个生产者负责生产数据,它通常会连接到一个 Apache Kafka broker(很可能是一个集群)。生产者会将数据推送到一个称为“Topic”的东西上。“Topic”是生产者和消费者都同意的一种标识。生产者会在某个 Topic 上生产消息,消费者可以设置为从某个 Topic 读取。最基本的就是这样。但是,让我们谈谈上面提到的一些好东西。

 

分区 (Partitioning)

所以,Apache Kafka 有一个给定 Topic 的分区概念。这意味着您可以找到一种方法将消息分散到多个分区。例如,如果您按 IP 地址记录消息,您可能会决定所有以 127.192 开头的消息都进入一个分区,而其他消息则进入另一个分区。Apache Kafka 允许这样做。您需要为您的特定用例编写一些代码,但这是可能的。

 

发布-订阅 vs 队列

消息传递传统上有两种模式:队列和发布-订阅。在队列中,一组消费者可以从服务器读取,每条消息只发送给其中一个;在发布-订阅中,消息会广播给所有消费者。Apache Kafka 提供了一个单一的消费者抽象,它概括了这两种模式——消费者组。


消费者用一个消费者组名称来标识自己,发布到 Topic 的每条消息都会被发送给订阅的每个消费者组中的一个消费者实例。消费者实例可以位于不同的进程或不同的机器上。

如果所有消费者实例都具有相同的消费者组,那么这就像一个传统的队列,将负载均衡到各个消费者。

如果所有消费者实例都具有不同的消费者组,那么这就像发布-订阅,所有消息都会广播给所有消费者。

也许下面的图能更好地说明这一点。

 

 

事件溯源 (Event Sourcing)

现在,分布式集群容错消息系统可以用于事件溯源可能并不明显。然而,Apache Kafka 有一个提交日志(每个分区一个)的概念,它能够将消息存储(在磁盘上)一段时间。消费者可以选择提交到日志,这会移动它们的索引,但它们也可以回溯到某个时间点,并消费日志中从该索引开始的所有消息。

索引由消费者提交,但存储在 Apache Zookeeper 中。Zookeeper 是一个用于协调分布式应用程序的奇特软件。我可能会写另一篇文章介绍 ZooKeeper,但这属于另一回事。现在,您只需要知道 ZooKeeper 负责维护索引(每个分区一个)。

 

生产者(s)

生产者的职责是向 Topic 发布消息,而消费者基本上只需要关心这一点。如上所述,消费者决定使用哪个消费者组,这又决定了哪些消费者可以看到哪些消息。

 

消费者(s)

消费者需要提供几个信息。 namely

  • Apache Kafka broker 连接详情
  • Topic 详情
  • 消费者组详情

通过提供这些详情,消息才能被正确地接收到消费者中。

 

分区偏移量 (Partition Offset)

如上所述,Apache Kafka 有提交日志的概念。实际上它更复杂一些,因为每个分区都有一个提交日志。分区策略是您的应用程序代码必须定义的。然而,对于 Topic 的每个分区,都有一个单独的提交日志/偏移量。这个偏移量由消费者调整,实际值存储在 Zookeeper 中(Apache Kafka 负责处理这部分)。

应该注意的是,即使您可以创建多线程消费者,也只有一个线程可以访问分区的偏移量,所以即使您有多个线程在运行做相同的事情,也只有一个线程能够调整提交日志。

 

消费者组 (Consumer Groups)

如上所述,有一个消费者使用的“消费者组”概念。如果多个消费者共享一个消费者组,那么消息将发送给其中一个消费者。如果所有消费者实例都具有不同的消费者组,那么这就像发布-订阅,所有消息都会广播给所有消费者。

 

消费者如何使用 Zookeeper

Apache Kafka 在内部使用了另一个 Apache 项目,即 Apache ZookeeperApache Zookeeper 用于存储消费者偏移量。让您熟悉 Apache Zookeeper 的内容不在本文的讨论范围之内,但简而言之,Apache Zookeeper 用于提供分布式应用程序同步。

 

一个演示应用

本节将向您介绍一个简单的基于 Scala 的演示应用程序,它执行以下操作:

  • 拥有一个简单的 Scala 生产者,它将 JSON 序列化的消息推送到一个 Topic/消费者组。
  • 拥有一个简单的 Scala 消费者,它能够读取多个 Topic。
  • 拥有一个通用的抽象,用于一个通用的 Apache Kafka 客户端,该客户端还将传入的消息公开为一个 RxScala Observable[T] 流。
  • 拥有一个通用的仓库抽象。这样,单个仓库就可以用于监控特定的 Apache Kafka Topic。

 

安装

如果您在生产环境中运行,本节将不适用。但是,如果您像我一样,喜欢在自己电脑的安全环境(我的电脑是 Windows 10)中进行隔离测试,请继续阅读。

本质上,生产环境会运行 Apache Kafka 集群以及很可能的 Zookeeper 集群,所有这些都运行在 Linux 服务器上。

然而,对于测试来说,这有点麻烦。所以,让我们继续尝试在 Windows 机器上作为单个本地实例运行 Apache Kafka/Zookeeper。我们该怎么做?

最好的起点是按照另一个 CodeProject 教程。

www.codeproject.com/Articles/1068998/Running-Apache-Kafka-on-Windows-without-Cygwin

如果您按照该指南一直操作到最后,您将对如何设置 Apache Kafka/Zookeeper 有一个相当好的理解。

 

 

每次重启机器时必须执行的任务

每次重启您的电脑时,您都需要执行一些任务。本质上,您必须在可以期望本文附带的生产者/消费者代码正常工作之前执行这两个步骤。

与 GitHub 代码相关的是一个名为“Kafka.txt”的文件,其中包含一些有用的笔记。

每次希望运行本文附带的生产者/消费者代码时,您都需要以管理员身份运行这两个命令行(作为管理员)。

注意:您可能需要根据安装位置更改路径。

 

1. 运行 Zookeeper
cd C:\Apache\Apache-zookeeper-3.4.6\bin
zkserver


2. 运行 Kafka
cd C:\Apache\Apache-Kafka-0.9.0.0
.\bin\windows\kafka-server-start.bat .\config\server.properties

 

 

创建 Topic (只需执行一次)

您还需要为附带的生产者/消费者代码创建 Topic。您只需执行一次此操作,并且还必须确保 Kafka/Zookeeper 正在运行,请参阅上面的 2 个步骤。

一旦知道它们都在运行,我们就可以简单地使用以下命令行:


cd C:\Apache\Apache-Kafka-0.9.0.0\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order-placed-messages

 

可以看到,我们为 Topic 设置了某些内容,例如要使用的 ZooKeeper broker、副本/分区数。我相信您会同意,这些都是不错的功能。

 

 

 

生产者

生产者是更容易解释的组件,它本质上由几个类和一个配置文件组成。我们现在将逐个介绍它们。

 

生产者属性

生产者需要填写某些属性才能运行。为此,我们仅使用 Google Guava 库,该库允许我们轻松读取配置值。

这是生产者的属性文件。

bootstrap.servers=localhost:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true

 

消息

消息本身是简单的 case classes,使用 Play JSON 库序列化传输。以下是生产者包含的消息。

package Messages

import play.api.libs.json.Json

//==============================
// serialize to json
//==============================
// val json = Json.toJson(FastMessage("test", 30))
//==============================
// deserialize
//==============================
// var otherFast = json.as[FastMessage]
// val name = otherFast.name
// println(s"The name of the other fast one is $name")
case class FastMessage(name: String, number: Int)

object FastMessageJsonImplicits {
  implicit val fastMessageFmt = Json.format[FastMessage]
  implicit val fastMessageWrites = Json.writes[FastMessage]
  implicit val fastMessageReads = Json.reads[FastMessage]
}



package Messages

import play.api.libs.json.Json

//==============================
// serialize to json
//==============================
// val json = Json.toJson(OrderPlacedMessage(new java.util.Date()))
//==============================
// deserialize
//==============================
// var theOrder = json.as[OrderPlacedMessage]
// val timeStamp = theOrder.timeOfMessage
// println(s"The name of the other is $timeStamp")
case class OrderPlacedMessage(timeOfMessage: java.util.Date)

object OrderPlacedMessageJsonImplicits {
  implicit val orderPlacedMessageFmt = Json.format[OrderPlacedMessage]
  implicit val orderPlacedMessageWrites = Json.writes[OrderPlacedMessage]
  implicit val orderPlacedMessageReads = Json.reads[OrderPlacedMessage]
}

这里没有什么更多的要说的了,这些对象实际上就是属性容器。

 

生产者循环

生产者需要生产消息。生产者如何生产消息取决于您的业务需求。对我而言,我只是想要一个循环,每隔一段时间就会发送新消息。为此,我有一个新的 Runnable(线程抽象),它将以 x 时间间隔运行消息生产逻辑(针对给定消息类型)。由于演示代码支持两种消息,因此存在两个这样的 Runnable 类。我们一秒钟后就会看到它们。

现在,这是生产者的代码,可以看到此类基本上只是将两个消息运行器的执行交给标准的 Executors.newSingleThreadScheduledExecutor JVM 执行器。

import org.apache.kafka.clients.producer.KafkaProducer
import java.util.Properties
import com.google.common.io.Resources
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

object ScalaProducer {
  def main(args: Array[String]): Unit = {
    val scalaProducer = new ScalaProducer()
    scalaProducer.run(args)
  }
}

/**
  * This producer will send a bunch of messages to topic "fast-messages". Every so often,
  * it will send a message to "heartBeat-messages". This shows how messages can be sent to
  * multiple topics. On the receiving end, we will see both kinds of messages but will
  * also see how the two topics aren't really synchronized.
  */
class ScalaProducer {

  def run(args: Array[String]) : Unit = {

    println("Press enter to start producer")
    scala.io.StdIn.readLine()

    var producer : KafkaProducer[String, String] = null
    var closeableKafkaProducer : CloseableKafkaProducer = null

    try {
      val props = Resources.getResource("producer.props").openStream()
      val properties = new Properties()
      properties.load(props)
      producer = new KafkaProducer[String,String](properties)
      closeableKafkaProducer = new CloseableKafkaProducer(producer)

      //"fast-messages"
      val fastMessageRunnable = 
		new FastMessageRunnable("fast-messages",closeableKafkaProducer)
      val fastMessageRunnerScheduler = 
		Executors.newSingleThreadScheduledExecutor()
      fastMessageRunnerScheduler.scheduleAtFixedRate(fastMessageRunnable, 0, 3, TimeUnit.SECONDS);

      //"order-placed-messages"
      val orderPlacedMessageRunnable = 
		new OrderPlacedMessageRunnable("order-placed-messages",closeableKafkaProducer)
      val orderPlacedMessageScheduler = 
		Executors.newSingleThreadScheduledExecutor()
      orderPlacedMessageScheduler.scheduleAtFixedRate(orderPlacedMessageRunnable, 0, 1, TimeUnit.SECONDS);

      println("producing messages")
      scala.io.StdIn.readLine()

    }
    catch {
        case throwable : Throwable =>
          val st = throwable.getStackTrace()
          println(s"Got exception : $st")
    }
    finally {
      if(closeableKafkaProducer != null) {
        closeableKafkaProducer.closeProducer()
      }
    }
  }
}

 

用于消息生产的可运行类 (Runnable)

如上所述,我决定将每条消息的生产放在自己的 Runnable 中,该 Runnable 将被调度以每 x 时间间隔生产一种所需类型的新消息。那么这些 Runnable 类看起来像什么呢?嗯,它们以一个简单的基类开始,该基类使用 KafkaProducer[String,String] 类实际发布消息,我将其包装起来,以便随时安全地关闭它。

import org.apache.kafka.clients.producer.KafkaProducer

class CloseableKafkaProducer(val producer : KafkaProducer[String, String]) {
  var isClosed : Boolean = false

  def closeProducer() : Unit = {
    if(!isClosed) {
      producer.close()
    }
  }
}

 

实际的 Runnable 代码看起来像这样,我们简单地使用 KafkaProducer[String,String]send(..) 方法。

import org.apache.kafka.clients.producer.ProducerRecord

abstract class MessageRunnable extends Runnable {

  val topic : String
  val closeableKafkaProducer : CloseableKafkaProducer

  override def run() : Unit = {
    try {
      val message = produceMessage()
      println("running > " + message)
      closeableKafkaProducer.producer.send(new ProducerRecord[String, String](topic, message))
      closeableKafkaProducer.producer.flush()
      println(s"Sent message on topic '$topic'")
    }
    catch {
      case throwable: Throwable => {
        val errMessage = throwable.getMessage
        println(s"Got exception : $errMessage")
        closeableKafkaProducer.closeProducer()
      }
    }
  }

  def produceMessage() : String
}

继承此类用于创建要发送的正确 JSON 格式,如下所示,“fast-messages” Topic,我们使用 FastMessage scala case class,它作为 JSON payload 发送。

import java.util.Calendar
import play.api.libs.json.Json
import Messages.{FastMessage}
import Messages.FastMessageJsonImplicits._

class FastMessageRunnable(
	val topic : String, 
	val closeableKafkaProducer : CloseableKafkaProducer) 
  extends MessageRunnable {
	
  override def produceMessage(): String = {
    Json.toJson(FastMessage("FastMessage_" + 
    	Calendar.getInstance().getTime().toString(),1)).toString()
  }
}

 

消费者

注意:对于消费者,我只设置了“fast-messages” Topic 的代码,但您只需按照“order-placed-messages” Topic 的工作方式进行即可。设置应该完全相同。

消费者比生产者稍微复杂一些,但这仅仅是因为我想要实现一些特定的目标,例如:

  • 我想使用 Rx。
  • 我想出一种通用的方法,使其更像一个仓库,您只需使用它即可收听某种消息类型。本质上,消息将被公开为一个 Observable[T],这样您就可以使用所有炫酷的 Rx(用于 Scala)功能。

我将逐步介绍构成此功能的每个部分。

 

消费者属性

消费者需要填写某些属性才能运行。为此,我们仅使用 Google Guava 库,该库允许我们轻松读取配置值。

这是生产者的属性文件。

bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000

# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.  No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152

 

 

底层的 Kafka 消费者部分

正如我上面所说,我希望能够使用 Rx 并创建可重用的架构部分。因此,我戴上思考帽,决定我需要一个通用的 Kafka 消费者,以及一个对其进行特化,该特化仅处理正确的 JSON 消息反序列化(就像我们为生产者做的那样)。为此,有一个通用的基类(将永远只有这一个,请参见 GenericKafkaConsumer)和一个消费者类(请参见 FastMessageKafkaConsumer)每个消息类型。

让我们从通用的 Kafka 消费者基类开始,以下是代码的主要要点:

  • 这个通用类接收一个 Topic,并且只处理与请求 Topic 匹配的传入记录 Topic 的消息。
  • 有一个消息读取循环,它使用 KafkaConsumer[string, string] 从网络读取消息。
  • 有一个抽象方法 readTopicJson(..),它期望由该类的继承者实现。
  • 正在使用一个 RxScala Subject 将传入的消息公开为一个 Observable[T]

 

GenericKafkaConsumer

import java.io.Closeable
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import Messages.FastMessage
import com.google.common.io.Resources
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Arrays
import java.util.Properties
import java.util.Random
import play.api.libs.json.{Reads, Json}
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject

///**
//  * This program reads messages from a single topic.
//  * Whenever a message is received it is pumped out using RX
//  */
abstract class GenericKafkaConsumer[T](topic : String) extends Closeable with Runnable {
  val topicSubject = PublishSubject.apply[T]()
  var consumer : KafkaConsumer[String, String] = null
  val pool : ExecutorService = Executors.newFixedThreadPool(1)
  var shouldRun : Boolean = true

  def startConsuming() : Unit = {
    pool.execute(this)
  }


  def run() : Unit = {

    try {

      val props = Resources.getResource("consumer.props").openStream()
      val properties = new Properties()
      properties.load(props)
      if (properties.getProperty("group.id") == null) {
        properties.setProperty("group.id", "group-" + new Random().nextInt(100000))
      }
      consumer = new KafkaConsumer[String, String](properties)
      consumer.subscribe(Arrays.asList(topic))
      var timeouts = 0

      println(s"THE TOPIC IS : $topic")

      while (shouldRun) {
        println("consumer loop running, wait for messages")
        // read records with a short timeout. If we time out, we don't really care.
        val records : ConsumerRecords[String, String] = consumer.poll(200)
        val recordCount = records.count()
        if (recordCount == 0) {
          timeouts = timeouts + 1
        } else {
          println(s"Got $recordCount records after $timeouts timeouts\n")
          timeouts = 0
        }
        val it = records.iterator()
        while(it.hasNext()) {
          val record : ConsumerRecord[String,String] = it.next()
          val recordTopic = record.topic()
          if(recordTopic == topic) {
            val message = readTopicJson(record,topic)
            message.map(x =>  {
              println(s"Message about to be RX published is $x")
              topicSubject.onNext(x)
              consumer.commitSync()
            })
          }
          else {
            println(s"Unknown message seen for topic '$recordTopic' .....crazy stuff")
          }
        }
      }
    }
    catch {
      case throwable : Throwable =>
        shouldRun = false
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        topicSubject.onError(throwable)
    }
    finally {
      shutdownAndAwaitTermination(pool)
    }
  }

  protected def readJsonResponse[T](
         record: ConsumerRecord[String,String],
         topicDescription : String)(implicit reader: Reads[T]) : Option[T] = {
    try {
      println(s"$topicDescription >")
      Some(Json.parse(record.value()).as[T])
    }
    catch {
      case throwable: Throwable =>
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        None
    }
  }

  def getMessageStream() : Observable[T]  = {
    topicSubject.asInstanceOf[Observable[T]]
  }

  override def close() : Unit = {
    println(s"GneericKafkaConsumer closed")
    shouldRun = false
    shutdownAndAwaitTermination(pool)
  }

  def shutdownAndAwaitTermination(pool : ExecutorService) : Unit = {
    // Disable new tasks from being submitted
    pool.shutdown()
    try {
      // Wait a while for existing tasks to terminate
      if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
        // Cancel currently executing tasks
        pool.shutdownNow()
        // Wait a while for tasks to respond to being cancelled
        if (!pool.awaitTermination(60, TimeUnit.SECONDS))
          println("Pool did not terminate")
      }
    }
    catch {
      case throwable : Throwable =>
        val st = throwable.getStackTrace()
        println(s"Got exception : $st")
        // (Re-)Cancel if current thread also interrupted
        pool.shutdownNow()
        // Preserve interrupt status
        Thread.currentThread().interrupt()
    }
  }

  //implemented by inheritors
  def readTopicJson(record : ConsumerRecord[String,String], topic : String) : Option[T]
}

 

现在我们已经了解了这个通用基类做了什么,让我们看看特化。

 

FastMessageKafkaClient

重要提示:您将需要每种消息类型一个这样的类,并且您需要将其映射到 MessageClient 类。

所以,我们有了一个很好的基类,它完成了大部分工作。它唯一不做的事情是反序列化。为此,我决定使用一个特化基类来完成这项工作。以下是 FastMessage 的示例。与生产者端一样,我使用了 Play JSON 库。

import Messages.FastMessage
import Messages.FastMessageJsonImplicits._
import org.apache.kafka.clients.consumer.ConsumerRecord


class FastMessageKafkaConsumer
  extends GenericKafkaConsumer[FastMessage](Consumers.fastMessageTopic) {

  def pushOneOut(m : FastMessage) : Unit = {
    topicSubject.onNext(m)
  }

  override def readTopicJson(
          record : ConsumerRecord[String,String],
          topic : String) : Option[FastMessage] = {
    readJsonResponse[FastMessage](record,topic)
  }
}

 

创建一个通用的可重用 RxScala 消息客户端

到目前为止,我们有一个通用的 Kafka 消费者,它公开了一个 RxScala Subject 作为 Observable[T],每当读取到与消费者 Topic 匹配的传入消息时,它就会调用 OnNext 。我们还看到该通用基类有一些特化,它们提供了正确的 JSON 反序列化。

那么,我们还想做什么呢?

嗯,如果您还记得,我说过我想要实现以下目标:

  • 我想使用 Rx。
  • 我想出一种通用的方法,使其更像一个仓库,您只需使用它即可收听某种消息类型。本质上,消息将被公开为一个 Observable[T],这样您就可以使用所有炫酷的 Rx(用于 Scala)功能。

那么我们该如何实现呢?嗯,简单来说,我们想创建一个更好、更可靠、可重试的 Observable[T] 流。在演示代码中,这是 MessageClient 类的任务,它看起来像这样:

import rx.lang.scala.subscriptions.CompositeSubscription
import rx.lang.scala.{Subscription, Observable}

class MessageClient() {
  val consumerMap = setupMap()

  def setupMap() : Map[String, (() => GenericKafkaConsumer[AnyRef])] = {
    val map = Map[String, () => GenericKafkaConsumer[AnyRef]]()
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    //TODO : you would need to add other consumers to the map here
    val updatedMap = map + (Consumers.fastMessageTopic, () =>
    {
      new FastMessageKafkaConsumer().asInstanceOf[GenericKafkaConsumer[AnyRef]]}
    )
    updatedMap
  }

  def getMessageStreamForTopic[T](topic : String) : Observable[T] = {
    Observable.create[T](observer => {
      consumerMap.get(topic) match {
        case Some(messageFactory) => {
          try {
            val streamSource = messageFactory().asInstanceOf[GenericKafkaConsumer[T]]
            streamSource.startConsuming()
            val sub = streamSource.getMessageStream().subscribe(observer)
            CompositeSubscription(sub, Subscription(streamSource.close()))
          }
          catch {
            case throwable : Throwable =>
              val st = throwable.getStackTrace()
              println(s"Got exception : $st")
              Subscription()
          }
        }
        case _ => {
            println("OH NO THATS BAD")
            observer.onCompleted()
            Subscription()
        }
      }
    }).publish.refCount
  }
}

这里本质上发生的是,我们使用 Observable.Create(..) 为正在请求的 Topic 创建一个 Observable[T]。我们还确保使用 publish 来确保流被共享(供晚到的订阅者使用),并使用 refCount 来在没有活动订阅时自动处理。

 

 

通用仓库 (Generic Repository)

现在我们有了一个很好的 MessageClient 类,它完成了大部分工作,我们只需要在通用仓库代码中使用它。有人可能会说 MessageClient 和下面的代码可以合并成一个类,但对我来说,将它们分开感觉更好。

以下是主要几点:

  • 我们实例化 MessageClient 来创建映射,并处理流创建(它使用前面讨论过的消费者类)。
  • 我们使用几个 Rx 操作符来在失败时重试流,还将发布它以共享流,并使用 refCount 的自动处理语义。
  • 我们使用 Observable.defer(..) 作为工厂,仅在第一次调用时返回 Observable[T]
import rx.lang.scala.Observable

object GenericRepository {

  private lazy val messageClient = new MessageClient()

  def GetMessageStream[T](topic: String): Observable[T] = {
    Observable.defer[T](messageClient.getMessageStreamForTopic[T](topic))
      .repeat
      .publish
      .refCount
  }
}

 

最终用法

有了所有这些,最终的用法看起来像这样,我相信您会同意这非常简单。

import Messages.FastMessage

object ScalaConsumer {

  def main(args: Array[String]): Unit = {

    var subs = GenericRepository.GetMessageStream[FastMessage](Consumers.fastMessageTopic)
      .subscribe(x => {
        println(s"RX SUBJECT stuff working, got this FAST MESSAGE : $x")
      })
  }
}

 

本文的 Rx 部分与我在 .NET 世界中使用的技术非常相似,所以我对它们的使用很熟悉。如果您对 .NET 文章感兴趣,这里是链接:

 

证明所有这些东西确实有效

所以,我们已经涵盖了很多内容,那么我们来看一个漂亮的截图来证明所有这些东西确实有效。

下面的截图显示了生产者和一个消费者(请记住,GitHub 上提供的演示代码中的消费者只消费“fast-messages” Topic,这就是为什么您永远不会看到“order-placed-messages”的任何消费者输出)。

点击查看大图

 

就这些

这就是我这次要说的全部内容。希望您喜欢。如果您有评论/投票,总是受欢迎的,所以不要害羞 ;-)

© . All rights reserved.