65.9K
CodeProject 正在变化。 阅读更多。
Home

图交叉 - 使用 Akka 的 MapReduce 方法

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2016年11月22日

CPOL

8分钟阅读

viewsIcon

15474

downloadIcon

74

使用 MapReduce 和 Akka 进行图交叉

引言

本文介绍了 Akka 框架 [1] 在图交叉问题中的应用。我们将讨论 actor、集群以及该问题的一种 MapReduce 方法。

背景

问题

给定一组图 I = {G1,...,Gn},图交叉是 I 中所有图中存在的公共边 (ai,ak) 的集合 O。我们希望找到这样的集合 O

MapReduce 方法

ob

MapReduce 模型背后的思想是将对输入 I 的计算分为两个阶段:

  1. Map 阶段:在每个并发计算单元中执行的一个函数。每个单元将函数应用于 I 的一个子集。
  2. Reduce 阶段:后处理 Map 阶段的输出并返回最终结果。

这种方法在海量数据结构计算、流处理和大数据查询中非常常见。Kafka 和 Apache Spark 等解决方案在底层实现了 MapReduce。

我们希望使用这种方法解决图交叉问题。步骤如下:

  1. 给定输入 I = {G1,...,Gn},我们说一个通用的 Gi 具有一组边 A= {(al,a2),...,(am,am+1)}
  2. 我们通过计算每对 Gi 的划分 A= {Ai', Ai"} 来准备 Map 输入,该划分基于边的第一个节点值的奇偶性。
  3. Map 阶段:两个 Map 函数中的每个函数并发计算分配给它们的分区的交集。因此,第一个 Map 的输入是 {A1',...,An'} ,第二个 Map 的输入是  {A1",...,An"}
  4. Reduce 阶段:它将部分交集收集为 Map 结果的并集,并返回 O

示例:

使用的 Akka 框架和库

Akka 是一个用于编写响应式系统 [2] 的框架,因此非常适合生产级别的分布式计算。它分别用 Java 和 Scala 开发,在本文中我们将只介绍其功能的一个子集。

  • Actor:是一种异步、非阻塞、高性能消息驱动的编程模型的构建块。它们是位置透明的。

  • Akka Cluster:提供一种基于点对点集群的成员服务,并具备实现容错的功能。

  • Spray 框架:允许在 Akka 框架之上构建基于 REST-HTTP 的功能。

技术说明

集群由一台主机器 M 组成,它公开 REST API,运行 Akka 种子节点以及 Reducer。此外,还有 4 台从属机器 Si,它们运行 Map 函数。当然,以下配置并非适用于所有场景的最佳配置,它应被视为一种简单、低成本且通用的设置,以便在分布式场景中开发和运行代码。

  • 机器M
    • 操作系统:Ubuntu 15.10,内核 4.2.0-42-generic
    • JDK:4.2.0-42-generic
    • SBT/Scala:0.13.12/2.10.6
  • 从属机器 Si
    • 操作系统:Ubuntu 16.04 LTS,内核 4.4.0-21-generic
    • JDK:4.2.0-42-generic
    • SBT/Scala:0.13.12/2.10.6
  • 网络说明
    • M 通过 TCP/IP 与每个 Si 通信。

代码概述

亮点

application.conf

  • remote.netty.tcp.hostname (运行 actor 系统的机器)
  • cluster.seed-nodes (集群的初始联系点,供其他节点连接)

worker.conf

  • remote.netty.tcp.hostname (工作节点机器)
  • contact-points (集群的初始联系点)

Main.scala

  • 应用程序的入口点。通过命令行参数,我们将根据当前机器的集群角色启动 WebServer、Master 或 Worker 节点。
object Main {
 
def main(args: Array[<span>String</span>]): Unit = {

WebServer.scala

  • 这个 actor 暴露两个服务:
    • /random (生成 5 个随机图,将工作提交给 Master,等待结果,然后回复 HttpRequest)

    • /submit (将 HttpRequest 的有效负载解组为工作实例,将其提交给 Master,等待结果,然后回复 HttpRequest)

Master.scala

  • 它维护当前已注册到集群的 Worker 列表。
  • 它接受来自 WebServer 的工作,将其拆分为子工作并重新分发给 Worker。一旦所有 Worker 都返回了结果(Map 函数),它就执行 Reduce 阶段并将结果返回给 WebServer。
  • 它通过 WorkState 实例维护计算的当前状态。例如,它可以由于 Worker 计算失败而重新提交之前的子工作。

Worker.scala

  • 每台节点机器运行此 actor 的一个实例。初始化后,它会加入集群并通知 Master。
  • 它等待 Master 发送的子工作,将其发送给 WorkExecutor,并返回 Map 函数的结果。

WorkExecutor.scala

  • 此 actor 与 Worker 通信并执行 Map 函数。

MasterWorkerProtocol.scala

  • 定义 Master 和 Worker 之间的通信协议。

Work.scala

  • 它描述并标识图交叉问题的输入以及相关的子工作(分区)。

WorkState.scala

  • 描述计算的状态,并由 Master(作为单例)维护。
  • 它定义了一些数据结构。
    • worksPending:包含 Master 接收到但尚未缩减的工作。
    • worksAccepted:包含工作及其仅相关的未处理子工作。
    • worksInProgress:包含工作及其仅相关的已提交且仍由 Worker 执行的子工作。
    • worksDone:包含工作及其仅相关的已成功由 Worker 执行的子工作。
    • worksReduced:包含工作及其缩减后的结果。一旦工作被缩减,它就会从 worksPending 中移除。

运行代码

在本节中,我们将在不同场景下执行集群中的代码。下图显示了涉及的机器以及上面介绍的 actor 的执行位置。免责声明:请记住,根据您运行代码的机器地址/主机名,相应地修改 application.config 和 worker.config

实测 1 - 简单执行

概述

在集群设置完成后,我们使用 上面的图 作为有效负载调用 /submit。然后我们将等待响应(希望是边 (2,3) :-P)。

运行集群

  1. 在 192.168.1.30 上启动 Master:将源代码复制到本地,然后在 bash 中运行 sbt 'run M1'。
  2. 在 192.168.1.30 上启动 WebServer:将源代码复制到本地,然后在 bash 中运行 sbt 'run WS'。
  3. 在 192.168.1.53 上启动 Worker/WorkExecutor:将源代码复制到本地,然后在 bash 中运行 sbt run。
  4. 执行以下 POST 调用:http://192.168.1.30:8080/submit,包含以下有效负载:
[["g1",[[1,2],[2,3]]],["g2",[[2,4],[2,3]]],["g3",[[2,3],[2,4],[4,2]]]
  1. 渲染服务调用返回的 HTML。最后一行显示了结果,正如我们所预期的,三个图 g1, g2g3 的交集是 [2,3]!

结论

在此测试中,我们熟悉了问题以及如何提交问题实例。这也是一个很好的机会来查看 Spray 的实现,以便通过 HTTP 消费/提供 HTML/JSON 内容。

实测 2 - 展示节点间的负载均衡

运行集群

  1. 在 192.168.1.30 上启动 Master:将源代码复制到本地,然后在 bash 中运行 sbt 'run M1'

  2. 在 192.168.1.30 上启动 WebServer:将源代码复制到本地,然后在 bash 中运行 sbt 'run WS'

  3. 在 192.168.1.53 上启动 Worker/WorkExecutor:将源代码复制到本地,然后在 bash 中运行 sbt run。为了举例说明,请取消注释该行以在 Map 阶段添加工作负载(几秒钟)。

    package worker
    ...
    class WorkExecutor extends Actor {
    def receive = {
    ...
    simulateWorkload(80000)
    
  4. 场景 1:执行 15 次 /random 服务调用。这模拟了向配备一个从属节点的集群提交 15 个并发问题(每个问题是 5 个包含 10 条边的随机图)。

  5. 向集群添加 Worker/WorkExecutor:在 192.168.1.54 上,将源代码复制到本地,然后在 bash 中运行 sbt run

  6. 场景 2:执行 15 次 /random 服务调用。这模拟了向配备两个从属节点的集群提交 15 个并发问题(每个问题是 5 个包含 10 条边的随机图)。

  7. 向集群添加 Worker/WorkExecutor:在 192.168.1.55 上,将源代码复制到本地,然后在 bash 中运行 sbt run

  8. 场景 3:执行 15 次 /random 服务调用。这模拟了向配备三个从属节点的集群提交 15 个并发问题(每个问题是 5 个包含 10 条边的随机图)。

  9. 向集群添加 Worker/WorkExecutor:在 192.168.1.56 上,将源代码复制到本地,然后在 bash 中运行 sbt run

  10. 场景 4:执行 15 次 /random 服务调用。这模拟了向配备四个从属节点的集群提交 15 个并发问题(每个问题是 5 个包含 10 条边的随机图)。

以下图表总结了结果。

结论

此测试显示了向集群添加(和删除)节点是多么容易。请注意,Master 不知道任何节点的物理位置细节。很酷,不是吗?!

想发现其他已实现的功能吗?尝试在计算过程中关闭一个节点,或取消注释此项以模拟 RuntimeException。Master 将能够通过调用其他节点来重新分发失败的计算!

package worker
...
class WorkExecutor extends Actor {
def receive = {
...
simulateRandomException()

关注点

这次编程经历让我既发现了新兴的框架和语言,又面临了开发分布式算法的挑战。以下是一些考虑因素:

  • 选择 Scala,与 Java 相比,代码更简洁,这要归功于函数式概念,例如模式匹配(例如,Actor 中的命令接收代码)。
    override def receiveCommand: Receive = {
    case MasterWorkerProtocol.RegisterWorker(workerId) =>
    
    以及通过匿名函数对数据结构进行操作(例如)。
val (mainW, rest) = worksAccepted.filter(w => w._2.contains(work)).head
  • 用于并行计算的 MapReduce 方法易于理解和建模。此外,我们还可以通过在 Map 之前的分区阶段(例如,基于其他标准而不是奇偶性)来调整并行度。
  • 在开发和调试分布式系统时面临的一个大问题是如何在代码修改后保持其正确性。例如,“我修复了那个,Map 阶段是否仍在计算交集?”。我发现使用断言代码非常有用,原因有两个:您在专注于该代码块时编写断言,也就是说,在开发过程中,并且您被迫思考您应该期望的前置条件和后置条件,从而将职责分配给代码。例如。
case WorkCompleted(work, result) ⇒
...
require(subWCompleted.workId == work.workId, s"<span>WorkStarted expected workId ${work.workId} == ${subWCompleted.workId}"</span>)
require(!worksPending(mainW).filter(w => w == work).isEmpty, s<span>"</span>WorkId ${work.workId} was not expected to be ¬
in workPending")
require(!worksAccepted.contains(mainW) || worksAccepted(mainW).filter(w => w == work).isEmpty, "WorkId ${work.workId} was not expected ¬
to be in workIdsAccepted")
require(!worksDone.contains(mainW) || worksDone(mainW).filter(w => w == work).isEmpty, s<span>"</span>WorkId ${work.workId} was not expected to be ¬
in workIdsDone")

未来改进:仅是一些想法

Akka 是一个庞大的框架,包含许多本文未涵盖的功能。在本节中,我提供几个关于如何提高可伸缩性和健壮性的想法。

  • 使用 Akka Persistence,我们可以将 Actor 的状态存储在非易失性内存中。例如,持久化 WebServer Actor 的状态,我们可以恢复未发送给 Master 的消息,从而避免在临时故障转移时丢失客户端请求。
  • 使用位于不同机器上的多个 Master 和 WebServer 实例,我们可以实现更具响应性、弹性且弹性的系统 [2]。

祝您编码愉快!

下载代码

下载 akka-distributed-workers.tar.zip

参考文献

[1] - http://doc.akka.io/docs/akka/2.4/scala.html

[2] - http://www.reactivemanifesto.org/

© . All rights reserved.