65.9K
CodeProject 正在变化。 阅读更多。
Home

Akka : ‘hello world’

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.33/5 (3投票s)

2016年7月25日

CPOL

6分钟阅读

viewsIcon

17808

这一次,我们将看一个简单的 Akka 示例。

因此,上次,我概述了本系列 Akka 文章的路线图,并发布了来自 Akka 创作者的一些关于他们对 Akka 的看法的信息。

这一次,我们将看一个简单的 Akka 示例。

但在我们这样做之前,让我们多谈谈为什么我认为 Akka 很棒。

我多年来一直是 .NET 程序员,并且已经看到了异步编程的多种不同形式

  • 异步委托
  • BackgroundWorker
  • Task (TPL)
  • Async Await
  • RX
  • Concurrent collections
  • Critical sections (synchronized sections of code)

所有这些在 JVM 中几乎都有等价物,如果你使用过 .NET 或 JVM 的等价物,你可能会不时看到锁的使用。例如,在底层,并发集合仍然会使用一些锁定(monitor enter/exit)来实现临界区。

所有这些都很棒,并且多年来一直更容易使用,但可能有一种更好、更优雅的无锁方式来处理并发/并行编程。

对我来说,这就是 Akka 带来的。我们不必像编写多线程代码时那样处理必须保护的共享状态,而是根本不使用任何共享状态,而是创建专门的微型代码片段,只处理一件事,而且只处理一件事。这些被称为“Actors”。

Actors 不共享状态,而是独立工作,依赖于消息传递。消息的负载应该为 actor 提供完成工作所需的一切(或者至少提供足够的信息来查找事物,例如一个 `Id`,以便 `Actor` 可以查找由其 `Id` 字段所需的实体)。

我们不会在 actor 中使用锁。

好的,我的简短演讲/介绍结束了。现在让我们继续看看用 Scala 编写 Akka 代码需要什么。

我们需要什么库?

正如我在介绍帖中提到的,我将完全使用 Scala 来撰写这些帖子。因此,我将使用 SBT 来进行构建方面的工作。

所以对于这篇帖子,我们只展示如何使用简单的 actor,所以我们不必引入太多依赖项,我们可以保持简单并使用以下“build.sbt”文件,我从中拉入了以下 2 个依赖项

  • Basic Akka stuff
  • Joda time
name := "HelloWorld"
 
version := "1.0"
 
scalaVersion := "2.11.8"
 
libraryDependencies ++= Seq(
  "com.typesafe.akka" % "akka-actor_2.11" % "2.4.8",
  "joda-time" % "joda-time" % "2.9.4")

注意:SBT 的这些依赖项可能会在后续文章中增加,但如果需要引入更多的 Akka JAR,我会在到时候展示。

我们如何创建一个 Actor System?

要使用 Akka Actor System,我们必须首先创建一个 Akka 系统,这是所有 actor 运行的基础。这很容易实现,如下所示

object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("HelloSystem")
 
  //---------------------------
  //   EXTRA STUFF WILL GO HERE
  //---------------------------
 
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

请注意,您应该确保 Akka 系统也已正确关闭。

我在这里展示的示例是一个简单的控制台类型的应用程序,所以我的启动/关闭逻辑都在同一个文件中,但在实际的生产应用程序中,事情可能会更复杂(好吧,我希望如此)。

我们如何创建一个 Actor?

现在我们有了 actor 系统,接下来我们需要做的是创建一些 actor,它们将生活在 actor 系统中。我使用“生活”这个词来表示一种所有权安排。

那么我们究竟如何创建一个 `actor` 呢?

嗯,幸运的是,这也非常简单。我们只需要继承 `Actor` 并为 `receive` 方法提供一个实现来处理可能发送给 `actor` 的不同消息。

请记住,`actor` 通过接收消息并对其进行操作来工作。

骨架代码可能看起来像这样

import akka.actor.Actor
 
class HelloActor extends Actor {
  def receive = {
    //DO THE MESSAGE HANDLING HERE
  }
}

稍后我们将详细讨论 `receive` 方法,目前,您只需要知道您必须实现此方法才能使 Akka `actor` 正确工作

Tell 和 Ask 之间的区别

在我们开始看示例之前,让我们先简单地讲一下 ask 和 tell 之间的区别。

当我们 **ask** (Scala 中的 **?** 方法) 一个 `actor` 时,我们期望通过 `Future[T]` 获得响应,这是一个异步操作。

当我们 **tell** (Scala 中的 **!** 方法) 一个 `actor` 某事时,这相当于“发送即忘”(当然,接收 `actor` 可以通过不同的消息将响应发送回发起 `actor`,但这又是另一回事了),这是一个立即返回的异步操作。

Sender

我们还没有涵盖这些内容,但我们将在后续的帖子中进行介绍,但现在您需要知道的是,当您向 `actor` 发送消息时,您是通过一个称为 `actorRef` 的构造来实现的,它有点像 `actor` 的句柄。

存在特殊类型的 `actorRef`,其中一种情况是“**sender**”,它是正在接收的消息的发起者 `actorRef`(如果我们从接收 `actor` 的角度来看)。您将在下面的示例中看到“`sender`”的使用。

我们如何向 Actor 发送消息?

这是我们向 actor 发送消息的方式。这是一个 tell(**!**),所以是“发送即忘”。

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global
 
object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("HelloSystem")
 
  // default Actor constructor
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
 
  //send some messages to the HelloActor (fire and forget)
  helloActor ! "hello"
  helloActor ! "tis a fine day for Akka"
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

我们有如下的 `HelloActor` 实现

import akka.actor.Actor
 
class HelloActor extends Actor {
  def receive = {
    case "hello" => println("world")
    case _       => println("unknown message")
  }
}

看看对于这个简单的 `actor`,我们在 `receive` 方法中只处理了 2 件事

  • “Hello”
  • Anything else

确保您处理正确的消息并处理未知消息被认为是一种良好的做法。

我们如何等待 Actor 的响应?

我们刚刚看到了一个 **tell**(发送即忘),那么 **ask** 呢?这稍微难一点,但也不是很难,我们只需要处理 **ask** 的结果将是 `Future[T]` 的事实。有许多方法可以处理它,假设我们有以下 `actor`

import akka.actor.Actor
 
class AskActor extends Actor {
  def receive = {
    case GetDateMessage => sender ! new org.joda.time.DateTime().toDate().toString()
    case _       => println("unknown message")
  }
}

以下是一些如何处理 ask 结果的示例

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global
 
object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("HelloSystem")
 
  // default Actor constructor
  val askActor = system.actorOf(Props[AskActor], name = "askactor")
 
  //send some messages to the AskActor, we want a response from it
 
  // (1) this is one way to "ask" another actor for information
  implicit val timeout = Timeout(5 seconds)
  val future1 = askActor ? GetDateMessage
  val result1 = Await.result(future1, timeout.duration).asInstanceOf[String]
  println(s"result1=$result1")
 
  // (2) a slightly different way to ask another actor for information
  val future2: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  val result2 = Await.result(future2, 5 second)
  println(s"result2=$result2")
 
  // (3) don't use blocking call at all, just use future callbacks
  val future3: Future[String] = ask(askActor, GetDateMessage).mapTo[String]
  future3 onComplete {
    case Success(result3) =>  println(s"result3=$result3")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }
 
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

Piping Futures

您可能遇到的另一个用例是,您可能想在 `actor` 代码内部使用 `Future[T]`,并将 `Future[T]` 从一个 `actor` 传递到另一个 `actor`。Akka 也通过使用 pipe 模式支持这一点,您可以使用它,其中我们将 `Future[List[Int]]` 传递回发送者。

import akka.actor._
import akka.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import ExecutionContext.Implicits.global
 
class FutureResultActor extends Actor {
  def receive = {
    case GetIdsFromDatabase => {
      Future(List(1,2,3)).pipeTo(sender)
    }
    case _       => println("unknown message")
  }
}

上面代码中,发送 GetIdsFromDatabase 消息的代码看起来像这样(上面的代码中的发送者)

import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
import scala.util.{Success, Failure}
import ExecutionContext.Implicits.global
 
object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("HelloSystem")
 
  // default Actor constructor
  val futureResultActor = system.actorOf(Props[FutureResultActor], name = "futureresultactor")
 
  //send some messages to the FutureResultActor, we expect a Future back from it
  val future4: Future[List[Int]] = ask(futureResultActor, GetIdsFromDatabase).mapTo[List[Int]]
  future4 onComplete {
    case Success(result4) =>  println(s"result4=$result4")
    case Failure(t) => println("An error has occured: " + t.getMessage)
  }
 
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

代码在哪里?

如前所述,本系列的所有代码都将最终放在这个 GitHub 仓库中

© . All rights reserved.