65.9K
CodeProject 正在变化。 阅读更多。
Home

AKKA STREAMS

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2016年12月13日

CPOL

7分钟阅读

viewsIcon

7558

上次我们讨论了 Akka Http,这次我们将探讨 Akka Streams。Akka Streams 是一个庞大的主题,您肯定需要通过官方文档来补充这篇文章。

上次我们讨论了 Akka Http,这次我们将探讨 Akka Streams。

Akka Streams 是一个庞大的主题,您肯定需要通过官方文档来补充这篇文章。

Akka Streams 是 Reactive Streams 的创始成员之一,Akka Streams 是 Reactive Streams API 的一种实现(有许多种)。

Reactive Streams 是一项旨在为具有非阻塞反压的异步流处理提供标准的倡议。这包括针对运行时环境(JVM 和 JavaScript)以及网络协议的努力。

引言

可能有些读者像我一样来自 .NET,他们使用过 RX。

您甚至可能以前听说过 Reactive Streams。那么,Reactive Streams 与 Rx 有何不同?

Reactive Streams 相对于 Rx 的最大优势在于反压(back pressure)的概念。以下是 Akka 文档中关于反压的说法:

反压协议根据下游订阅者能够接收和缓冲的元素数量(称为“需求”)来定义。数据源(在 Reactive Streams 术语中称为 Publisher,在 Akka Streams 中实现为 Source)保证它永远不会发出超过任何给定订阅者接收到的总需求量的元素。

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

幸运的是,这一切都内置在 Akka Streams 中,作为 Akka Streams 的用户,您无需过多担心。

您可以使用 OverflowStrategy 枚举值来决定如何处理内置流管道(我们将在下面更详细地探讨),具体取决于反压。这是一个非常简单的示例:

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

其中,可用的 OverflowStrategy 值如下:

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead
 
  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail
 
  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer
 
  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew
 
  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure
 
  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

这就是基本思想,Akka Streams 提供了很多功能,例如:

  • 内置的阶段/形状
  • 一个图 API
  • 创建您自己的阶段/形状的能力

在本文的其余部分,我们将研究这三点的示例。

使用 Akka Streams API

如本文开头所述,Akka Streams 的实现非常庞大。有很多内容需要涵盖,远远超出了我能在一篇小博客文章中合理涵盖的范围。官方文档仍然是首选,但如果您从未听说过 Akka Streams,这篇文章可能足以让您入门。

官方文档(撰写时)在这里:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

使用内置阶段/形状

Akka 附带了大量我们可以使用的预构建阶段。然而,在我提及这些之前,让我们先花点时间讨论一下如何以最基本的形式使用 Akka Streams API。

我们的想法是,一个可用的管道由四个不同的部分组成。

来源
一个具有一个输出的处理阶段,当下游处理阶段准备好接收数据元素时,它会发出数据元素。

接收器(Sink)
一个具有一个输入的处理阶段,请求并接受数据元素,可能会减慢上游元素生产者的速度。

流程
一个具有一个输入和输出的处理阶段,通过转换流经它的数据元素来连接其上游和下游。

可运行图(RunnableGraph)
一个流(Flow),其两端分别“连接”到 Source 和 Sink,并已准备好 run()

正如我所说,Akka 提供了大量的内置阶段,使我们的生活更轻松。例如,以下是撰写时可用的阶段:

源阶段(Source Stages)

  • fromIterator
  • apply
  • single
  • 重复
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

接收器阶段(Sink Stages)

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

现在我们将看一些使用这些的示例。

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")
 
  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

在这个简单的例子中,我们有一个 **Source(1 到 10)**,然后将其连接到一个 **Sink**,该 Sink 将传入的数字相加。

这个代码块演示了各种不同的 Source 和 Sink

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

这个代码块演示了在 Source 上使用简单的 Map

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)
 
  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

使用图 API

Akka Streams 还带有一个非常有趣的图形构建 DSL。当您想要创建相当复杂的流时,您会使用它。

关于图构建器 DSL 的另一个非常有趣的事情是,您可以在其中使用自定义形状,并且还可以让它部分连接。这样您就可以潜在地将其用作 Source/Sink。

假设您从使用图 DSL 构建的图中获得了一个输出,那么您可以将这个部分构建的图本身用作 Source。

同样,如果您在创建的图中有未连接的输入,您可以将其用作 Sink。

你可以在这里阅读更多相关信息:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

我敦促大家阅读一下,因为图 DSL 能做的事情非常酷。

好的,现在是时候举个例子了,这个例子直接来自 TypeSafe activator 代码。

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs
 
import java.io.File
 
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString
 
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }
 
class WritePrimesDemo {
 
  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
 
    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))
 
    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)
 
    // console output sink
    val consoleSink = Sink.foreach[Int](println)
 
    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()
 
    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }
 
  }
 
  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

这段代码最重要的部分是这里

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

在使用图之前定义了两个 Sink:

  • 一个文件 Sink
  • 一个控制台 Sink

还有一个生成随机素数的 Source。

所以 Graph DSL 允许你创建图。它允许你接收输入,并使用提供的隐式 **builder** 创建其他形状。

然后,DSL 允许您将输入/其他构建器创建的阶段/形状连接到输入,甚至将已连接的阶段暴露到输出。

这是通过 ~> 语法完成的,它简单地表示连接。

如前所述,您可以创建部分连接的图,但如果所有输入和输出都已连接,则被视为 **ClosedShape**,可以作为独立组件使用。

以下是运行此图示例的输出示例:

创建自定义形状/阶段

不仅如此,我们还可以创建自己的可在流中使用的形状。这是一个相当复杂的主题,您肯定会从阅读此页面中受益匪意:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

这篇小文章不可能涵盖足够多的内容,但以下是官方文档的一些亮点:

这是您创建自定义阶段时使用的基本模式。

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
  
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
  
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

大部分实际逻辑将在 createLogic 方法中。但要在其中做任何有用的事情,您需要使用处理器(handlers)。处理器是您用于处理输入/输出的工具。有 InHandlerOutHandler

每个都有自己的状态机流。例如,这是 OutHandler 的状态机:

而这是 InHandler 的状态机:

这是了解这些处理器的最佳页面:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

状态唯一应该维护的地方是 createLogic 方法中。

让我们考虑一个小的例子。假设我们有这样的对象:

case class Element(id: Int, value: Int)

我们想要构建一个自定义阶段,允许我们从这种类型中选择一个值,并且只针对由属性选择器提供的唯一值发出输出值。

我们可以称之为 DistinctUntilChanged。让我们看看这个例子可能是什么样子:

package com.sas.customshapes
 
import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}
 
import scala.collection.immutable
 
final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {
 
  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")
 
  override def shape = FlowShape.of(in, out)
 
  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
 
    private var savedState : Option[E] = None
 
    setHandlers(in, out, new InHandler with OutHandler {
 
      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)
 
        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }
 
      override def onPull(): Unit = {
        pull(in)
      }
 
      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })
 
    override def postStop(): Unit = {
      savedState = None
    }
  }
}

其中的亮点是:

  • 我们有一个单一的 **Inlet**
  • 我们有一个单一的 **Outlet**
  • 我们暴露了一个 **FlowShape**(只有输入/输出),有很多形状,但对于一个输入/一个输出,**FlowShape** 是我们想要的。
  • 我们使用 createLogic 来完成工作。
  • 我们使用 InHandler 处理输入。
  • 我们使用 OutHandler 处理输出。

另一个重要的事情(至少对于这个单一输入/输出示例而言)是,我们不能在 createLogic 中调用 pull/push 超过一次。

假设我们有这些元素:

package com.sas.customshapes
 
import scala.collection.immutable
 
object SampleElements {
 
  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)
 
  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)
 
  val All = Ones ++ Twos ++ Threes
}

以及这段演示代码:

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

我们将得到这个输出到 Sink:

这个例子很大程度上归功于我在这里找到的一篇很棒的博文:

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/

就是这样

无论如何,本系列到此结束,希望您喜欢它,并在此过程中学到了一些 Akka 知识。

我现在要稍作休息,然后我想开始研究一些 Azure/Web 相关的东西。

我在哪里可以找到代码示例?

我将在此系列中不断用示例项目补充这个 GitHub 仓库。

https://github.com/sachabarber/SachaBarber.AkkaExamples

© . All rights reserved.