AKKA STREAMS





5.00/5 (5投票s)
上次我们讨论了 Akka Http,这次我们将探讨 Akka Streams。Akka Streams 是一个庞大的主题,您肯定需要通过官方文档来补充这篇文章。
上次我们讨论了 Akka Http,这次我们将探讨 Akka Streams。
Akka Streams 是一个庞大的主题,您肯定需要通过官方文档来补充这篇文章。
Akka Streams 是 Reactive Streams 的创始成员之一,Akka Streams 是 Reactive Streams API 的一种实现(有许多种)。
Reactive Streams 是一项旨在为具有非阻塞反压的异步流处理提供标准的倡议。这包括针对运行时环境(JVM 和 JavaScript)以及网络协议的努力。
引言
可能有些读者像我一样来自 .NET,他们使用过 RX。
您甚至可能以前听说过 Reactive Streams。那么,Reactive Streams 与 Rx 有何不同?
Reactive Streams 相对于 Rx 的最大优势在于反压(back pressure)的概念。以下是 Akka 文档中关于反压的说法:
反压协议根据下游订阅者能够接收和缓冲的元素数量(称为“需求”)来定义。数据源(在 Reactive Streams 术语中称为 Publisher,在 Akka Streams 中实现为 Source)保证它永远不会发出超过任何给定订阅者接收到的总需求量的元素。
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained
幸运的是,这一切都内置在 Akka Streams 中,作为 Akka Streams 的用户,您无需过多担心。
您可以使用 OverflowStrategy
枚举值来决定如何处理内置流管道(我们将在下面更详细地探讨),具体取决于反压。这是一个非常简单的示例:
Source(1 to 10).buffer(10, OverflowStrategy.backpressure)
其中,可用的 OverflowStrategy
值如下:
object OverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. */ def dropHead: OverflowStrategy = DropHead /** * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for * the new element. */ def dropTail: OverflowStrategy = DropTail /** * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. */ def dropBuffer: OverflowStrategy = DropBuffer /** * If the buffer is full when a new element arrives, drops the new element. */ def dropNew: OverflowStrategy = DropNew /** * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until * space becomes available in the buffer. */ def backpressure: OverflowStrategy = Backpressure /** * If the buffer is full when a new element is available this strategy completes the stream with failure. */ def fail: OverflowStrategy = Fail }
这就是基本思想,Akka Streams 提供了很多功能,例如:
- 内置的阶段/形状
- 一个图 API
- 创建您自己的阶段/形状的能力
在本文的其余部分,我们将研究这三点的示例。
使用 Akka Streams API
如本文开头所述,Akka Streams 的实现非常庞大。有很多内容需要涵盖,远远超出了我能在一篇小博客文章中合理涵盖的范围。官方文档仍然是首选,但如果您从未听说过 Akka Streams,这篇文章可能足以让您入门。
官方文档(撰写时)在这里:
http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html
使用内置阶段/形状
Akka 附带了大量我们可以使用的预构建阶段。然而,在我提及这些之前,让我们先花点时间讨论一下如何以最基本的形式使用 Akka Streams API。
我们的想法是,一个可用的管道由四个不同的部分组成。
来源
一个具有一个输出的处理阶段,当下游处理阶段准备好接收数据元素时,它会发出数据元素。
接收器(Sink)
一个具有一个输入的处理阶段,请求并接受数据元素,可能会减慢上游元素生产者的速度。
流程
一个具有一个输入和输出的处理阶段,通过转换流经它的数据元素来连接其上游和下游。
可运行图(RunnableGraph)
一个流(Flow),其两端分别“连接”到 Source 和 Sink,并已准备好 run()
。
正如我所说,Akka 提供了大量的内置阶段,使我们的生活更轻松。例如,以下是撰写时可用的阶段:
源阶段(Source Stages)
- fromIterator
- apply
- single
- 重复
- tick
- fromFuture
- fromCompletionStage
- unfold
- unfoldAsync
- empty
- maybe
- failed
- actorPublisher
- actorRef
- combine
- queue
- asSubscriber
- fromPublisher
- fromFile
接收器阶段(Sink Stages)
- head
- headOption
- last
- lastOption
- ignore
- cancelled
- seq
- foreach
- foreachParallel
- onComplete
- fold
- reduce
- combine
- actorRef
- actorRefWithAck
- actorSubscriber
asPublisher - fromSubscriber
- toFile
现在我们将看一些使用这些的示例。
def simpleFlow() : Unit = { val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) // connect the Source to the Sink, obtaining a RunnableGraph val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // materialize the flow and get the value of the FoldSink implicit val timeout = Timeout(5 seconds) val sumFuture: Future[Int] = runnable.run() val sum = Await.result(sumFuture, timeout.duration) println(s"source.toMat(sink)(Keep.right) Sum = $sum") // Use the shorthand source.runWith(sink) val sumFuture2: Future[Int] = source.runWith(sink) val sum2 = Await.result(sumFuture2, timeout.duration) println(s"source.runWith(sink) Sum = $sum") }
在这个简单的例子中,我们有一个 **Source(1 到 10)**,然后将其连接到一个 **Sink**,该 Sink 将传入的数字相加。
这个代码块演示了各种不同的 Source 和 Sink
def differentSourcesAndSinks() : Unit = { //various sources Source(List(1, 2, 3)).runWith(Sink.foreach(println)) Source.single("only one element").runWith(Sink.foreach(println)) //actor sink val helloActor = system.actorOf(Props[HelloActor], name = "helloactor") Source(List("hello", "hello")) .runWith(Sink.actorRef(helloActor,DoneMessage)) //future source val futureString = Source.fromFuture(Future.successful("Hello Streams!")) .toMat(Sink.head)(Keep.right).run() implicit val timeout = Timeout(5 seconds) val theString = Await.result(futureString, timeout.duration) println(s"theString = $theString") }
这个代码块演示了在 Source 上使用简单的 Map
def mapFlow() : Unit = { val source = Source(11 to 16) val doublerSource = source.map(x => x * 2) val sink = Sink.foreach(println) implicit val timeout = Timeout(5 seconds) // Use the shorthand source.runWith(sink) val printSinkFuture: Future[Done] = doublerSource.runWith(sink) Await.result(printSinkFuture, timeout.duration) }
使用图 API
Akka Streams 还带有一个非常有趣的图形构建 DSL。当您想要创建相当复杂的流时,您会使用它。
关于图构建器 DSL 的另一个非常有趣的事情是,您可以在其中使用自定义形状,并且还可以让它部分连接。这样您就可以潜在地将其用作 Source/Sink。
假设您从使用图 DSL 构建的图中获得了一个输出,那么您可以将这个部分构建的图本身用作 Source。
同样,如果您在创建的图中有未连接的输入,您可以将其用作 Sink。
你可以在这里阅读更多相关信息:
我敦促大家阅读一下,因为图 DSL 能做的事情非常酷。
好的,现在是时候举个例子了,这个例子直接来自 TypeSafe activator 代码。
http://www.lightbend.com/activator/template/akka-stream-scala
package com.sas.graphs import java.io.File import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.ClosedShape import akka.stream.scaladsl._ import akka.util.ByteString import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.{ Failure, Success } class WritePrimesDemo { def run(): Unit = { implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorMaterializer() // generate random numbers val maxRandomNumberSize = 1000000 val primeSource: Source[Int, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))). // filter prime numbers filter(rnd => isPrime(rnd)). // and neighbor +2 is also prime filter(prime => isPrime(prime + 2)) // write to file sink val fileSink = FileIO.toPath(new File("target/primes.txt").toPath) val slowSink = Flow[Int] // act as if processing is really slow .map(i => { Thread.sleep(1000); ByteString(i.toString) }) .toMat(fileSink)((_, bytesWritten) => bytesWritten) // console output sink val consoleSink = Sink.foreach[Int](println) // send primes to both slow file sink and console sink using graph API val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder => (slow, console) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file broadcast ~> console // connect other side of splitter to console ClosedShape } val materialized = RunnableGraph.fromGraph(graph).run() // ensure the output file is closed and the system shutdown upon completion materialized.onComplete { case Success(_) => system.terminate() case Failure(e) => println(s"Failure: ${e.getMessage}") system.terminate() } } def isPrime(n: Int): Boolean = { if (n <= 1) false else if (n == 2) true else !(2 to (n - 1)).exists(x => n % x == 0) } }
这段代码最重要的部分是这里
// send primes to both slow file sink and console sink using graph API val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder => (slow, console) => import GraphDSL.Implicits._ val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file broadcast ~> console // connect other side of splitter to console ClosedShape } val materialized = RunnableGraph.fromGraph(graph).run()
在使用图之前定义了两个 Sink:
- 一个文件 Sink
- 一个控制台 Sink
还有一个生成随机素数的 Source。
所以 Graph DSL 允许你创建图。它允许你接收输入,并使用提供的隐式 **builder** 创建其他形状。
然后,DSL 允许您将输入/其他构建器创建的阶段/形状连接到输入,甚至将已连接的阶段暴露到输出。
这是通过 ~> 语法完成的,它简单地表示连接。
如前所述,您可以创建部分连接的图,但如果所有输入和输出都已连接,则被视为 **ClosedShape**,可以作为独立组件使用。
以下是运行此图示例的输出示例:
创建自定义形状/阶段
不仅如此,我们还可以创建自己的可在流中使用的形状。这是一个相当复杂的主题,您肯定会从阅读此页面中受益匪意:
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html
这篇小文章不可能涵盖足够多的内容,但以下是官方文档的一些亮点:
这是您创建自定义阶段时使用的基本模式。
import akka.stream.SourceShape import akka.stream.stage.GraphStage class NumbersSource extends GraphStage[SourceShape[Int]] { // Define the (sole) output port of this stage val out: Outlet[Int] = Outlet("NumbersSource") // Define the shape of this stage, which is SourceShape with the port we defined above override val shape: SourceShape[Int] = SourceShape(out) // This is where the actual (possibly stateful) logic will live override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? }
大部分实际逻辑将在 createLogic
方法中。但要在其中做任何有用的事情,您需要使用处理器(handlers)。处理器是您用于处理输入/输出的工具。有 InHandler
和 OutHandler
。
每个都有自己的状态机流。例如,这是 OutHandler
的状态机:
而这是 InHandler
的状态机:
这是了解这些处理器的最佳页面:
状态唯一应该维护的地方是 createLogic
方法中。
让我们考虑一个小的例子。假设我们有这样的对象:
case class Element(id: Int, value: Int)
我们想要构建一个自定义阶段,允许我们从这种类型中选择一个值,并且只针对由属性选择器提供的唯一值发出输出值。
我们可以称之为 DistinctUntilChanged
。让我们看看这个例子可能是什么样子:
package com.sas.customshapes import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage} import akka.stream.{Outlet, Attributes, Inlet, FlowShape} import scala.collection.immutable final class DistinctUntilChanged[E, P](propertyExtractor: E => P) extends GraphStage[FlowShape[E, E]] { val in = Inlet[E]("DistinctUntilChanged.in") val out = Outlet[E]("DistinctUntilChanged.out") override def shape = FlowShape.of(in, out) override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) { private var savedState : Option[E] = None setHandlers(in, out, new InHandler with OutHandler { override def onPush(): Unit = { val nextElement = grab(in) val nextState = propertyExtractor(nextElement) if (savedState.isEmpty || propertyExtractor(savedState.get) != nextState) { savedState = Some(nextElement) push(out, savedState.get) } else { pull(in) } savedState = Some(nextElement) } override def onPull(): Unit = { pull(in) } override def onUpstreamFinish(): Unit = { completeStage() } }) override def postStop(): Unit = { savedState = None } } }
其中的亮点是:
- 我们有一个单一的 **Inlet**
- 我们有一个单一的 **Outlet**
- 我们暴露了一个 **FlowShape**(只有输入/输出),有很多形状,但对于一个输入/一个输出,**FlowShape** 是我们想要的。
- 我们使用
createLogic
来完成工作。 - 我们使用
InHandler
处理输入。 - 我们使用
OutHandler
处理输出。
另一个重要的事情(至少对于这个单一输入/输出示例而言)是,我们不能在 createLogic
中调用 pull/push 超过一次。
假设我们有这些元素:
package com.sas.customshapes import scala.collection.immutable object SampleElements { val E11 = Element(1, 1) val E21 = Element(2, 1) val E31 = Element(3, 1) val E42 = Element(4, 2) val E52 = Element(5, 2) val E63 = Element(6, 3) val Ones = immutable.Seq(E11, E21, E31) val Twos = immutable.Seq(E42, E52) val Threes = immutable.Seq(E63) val All = Ones ++ Twos ++ Threes }
以及这段演示代码:
def runDistinctUntilChanged() : Unit = { Source(SampleElements.All) .via(new DistinctUntilChanged(_.value)) .runWith(Sink.foreach(println)) }
我们将得到这个输出到 Sink:
这个例子很大程度上归功于我在这里找到的一篇很棒的博文:
https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
就是这样
无论如何,本系列到此结束,希望您喜欢它,并在此过程中学到了一些 Akka 知识。
我现在要稍作休息,然后我想开始研究一些 Azure/Web 相关的东西。
我在哪里可以找到代码示例?
我将在此系列中不断用示例项目补充这个 GitHub 仓库。