Apache Spark 入门






4.98/5 (19投票s)
一篇关于 Apache Spark 的介绍性文章,附带一个演示应用
系列链接
- 第 1 部分:Apache Spark 简介(通用网格计算引擎)(本文)
- 第 2 部分:安装 Apache Cassandra
- 第 3 部分:将数据从 Cassandra 存入/取回到 Apache Spark
目录
引言
我已经有一段时间没有写博客文章或类似的文章了。这有一个很好的理由,那就是我开始了一份新工作,这对我来说是一个全新的领域(我主要在金融外汇领域工作),这份工作是在一家对冲基金/再保险公司。所以我一直相当累。我之所以接受这份工作,是因为他们在面试阶段问我,如果我们不将 .NET 用于所有事情(他们之前一直是这样做的),我会有什么感觉,以及我对使用由以下这些技术组成的堆栈有什么想法:
- Scala
- Apache Spark
- Apache Zookeeper
- Cassandra DB
坦白说,我高兴极了,我刚读完一本关于 Apache Spark 的书,并且非常希望能有机会使用 Scala / Cassandra,所以我接受了。
这并不意味着我不再使用或热爱 .NET,天哪,我认为 .NET 是最好的。然而,在非 .NET / 微软领域也有一些很棒的事情正在发生。Apache Spark 对我来说尤其有趣。
本文将作为 Apache Spark 的入门指南。公平地说,Apache Spark 有很棒的文档,本文只是对其的一种补充。然而,如果你从未听说过 Apache Spark,你可能会喜欢你所读到的内容,我第一次读到它时确实如此。
关于本文的说明
本文中的所有示例和演练以及演示应用都将使用 Scala,因为它是一门优秀的现代面向对象/函数式语言。
代码在哪里
你可以从我的 GitHub 仓库获取一个小型的 IntelliJ IDEA Scala 项目: https://github.com/sachabarber/SparkDemo
什么是 Spark
这是 Apache Spark 的创造者对他们自己工作的评价。
Apache Spark 是一个快速且通用的集群计算系统。它提供了 Java、Scala、Python 和 R 的高级 API,以及一个支持通用执行图的优化引擎。它还支持一套丰富的高级工具,包括用于 SQL 和结构化数据处理的 Spark SQL、用于机器学习的 MLlib、用于图处理的 GraphX,以及 Spark Streaming。
https://spark.apache.ac.cn/docs/latest/index.html 更新于 24/08/15
嗯,想要比这更多的信息,试试这个
在较高层面上,每个 Spark 应用程序都包含一个驱动程序,它运行用户的主函数并在集群上执行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集(RDD),它是一个跨集群节点分区的元素集合,可以并行操作。RDD 的创建方式是,从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件开始,或者从驱动程序中现有的 Scala 集合开始,然后对其进行转换。用户也可以要求 Spark 将 RDD 持久化在内存中,从而使其可以在并行操作之间高效地重用。最后,RDD 会自动从节点故障中恢复。
Spark 中的第二个抽象是共享变量,它可以在并行操作中使用。默认情况下,当 Spark 在不同节点上以一组任务的形式并行运行一个函数时,它会把函数中使用的每个变量的副本发送给每个任务。有时,一个变量需要在任务之间或任务与驱动程序之间共享。Spark 支持两种类型的共享变量:广播变量,可用于在所有节点上缓存一个值;以及累加器,它们是只进行“累加”操作的变量,例如计数器和求和。
Spark 编程指南 更新于 24/08/15
如果你是那种喜欢图表的人,这里有一张可能会有帮助的图
以上就是他们对 Spark 的看法。这里是我自己的一些要点,可能也会有所帮助
- Spark 提供了一个通用的抽象:弹性分布式数据集(RDD)
- Spark 提供了类似 LINQ 的语法,你可以将其真正地分布到多个工作节点上(类似 LINQ 的语法跨多个节点,想一想,这简直太棒了)
- Spark 提供容错性
- Spark 在本地运行和在集群中运行使用相同的 API,这使得试验变得非常容易
- Spark 提供了相当简单的 API
- 非常活跃的社区
- 直截了当地说,它是一个非常棒的产品
下图展示了当前 Apache Spark 提供的功能。图中每个深蓝色方块都是一个略有不同的产品,例如:
- Spark SQL:允许查询结构化数据,例如将 JSON 转换为 RDD(这意味着你可以有效地对 JSON 运行 RDD 操作)
- Spark Streaming:允许对随时间推移的数据进行微批处理(窗口缓冲、大小缓冲、滑动缓冲等),缓冲区中接收到的数据以列表形式呈现。这使得处理非常容易。Spark Streaming 有许多数据源,例如:
- 套接字 (Sockets)
- Kafka
- Flume
- Zero MQ
所有这些的基础是通用的抽象——弹性分布式数据集(RDD)。那么让我们继续,多谈谈 RDD。
附注:在本文中,我将只涵盖 Apache Spark 的基础知识,而不是图中深蓝色区域所示的每个额外产品。
它与 Map/Reduce 的比较
Map Reduce
Map Reduce 是一种编程模型和相关的实现,用于在集群上通过并行分布式算法处理和生成大型数据集。概念上类似的方法自 1995 年以来就已广为人知,消息传递接口(MPI)标准就具有 reduce 和 scatter 操作。
一个 Map Reduce 程序由一个执行过滤和排序的 Map() 过程(例如按名字将学生排队,每个名字一个队列)和一个执行汇总操作的 Reduce() 过程(例如计算每个队列中的学生数量,得出名字的频率)组成。“Map Reduce 系统”(也称为“基础设施”或“框架”)通过调度分布式服务器、并行运行各种任务、管理系统各部分之间的所有通信和数据传输,并提供冗余和容错来协调处理过程。
https://en.wikipedia.org/wiki/MapReduce 更新于 24/08/2015
Map Reduce 通常在这些 Map/Reduce 阶段上运行,每个阶段通常都需要硬件支持。
Spark
在较高层面上,每个 Spark 应用程序都包含一个驱动程序,它运行用户的主函数并在集群上执行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集(RDD),它是一个跨集群节点分区的元素集合,可以并行操作。RDD 的创建方式是,从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件开始,或者从驱动程序中现有的 Scala 集合开始,然后对其进行转换。用户也可以要求 Spark 将 RDD 持久化在内存中,从而使其可以在并行操作之间高效地重用。最后,RDD 会自动从节点故障中恢复。
https://spark.apache.ac.cn/docs/latest/programming-guide.html#overview 更新于 24/08/2015
Spark 完全在商用级硬件的内存中运行,如果数据集能够装入内存,它的速度快如闪电。Spark 声称在这种情况下,它可以比 Map/Reduce 快 100 倍。如果你将 RDD 持久化到磁盘,Spark 声称比 Map/Reduce 快 10 倍。
Spark 胜出的另一个领域是 RDD API,它比 Map/Reduce 的操作丰富得多。
让我们看一个试图说明 Spark 和 Map/Reduce 之间差异的图表。可以看出,在 Map/Reduce 中,采用的是分阶段的方法,即 Map/Reduce,然后重复。
在 Spark 中,工作被分发给工作节点,每个节点可以完成一部分工作,然后结果被合并并返回给驱动程序,或者可能被持久化到磁盘。
想了解更多关于差异的讨论,Google 是一个不错的选择。
RDD 转换/操作
如前所述,使用 Apache Spark 的核心概念之一就是尝试使用 RDD。 (相当无耻地)从 Apache Spark 文档中摘取一些信息:
RDD 支持两种类型的操作:转换(transformations),从现有数据集中创建一个新数据集;以及动作(actions),在数据集上运行计算后向驱动程序返回一个值。例如,map 是一个转换操作,它将数据集的每个元素通过一个函数处理,并返回一个代表结果的新 RDD。另一方面,reduce 是一个动作操作,它使用某个函数聚合 RDD 的所有元素,并将最终结果返回给驱动程序(尽管也有一个并行的 reduceByKey,它返回一个分布式数据集)。
Spark 中所有的转换都是惰性的,也就是说它们不会立即计算结果。相反,它们只是记住应用于某个基础数据集(例如一个文件)的转换。只有当某个动作需要一个结果返回给驱动程序时,这些转换才会被计算。这种设计使 Spark 能够更高效地运行——例如,我们可以意识到通过 map 创建的数据集将在 reduce 中使用,因此只将 reduce 的结果返回给驱动程序,而不是返回更大的映射后数据集。
默认情况下,每次对一个转换后的 RDD 运行动作时,它都可能被重新计算。但是,你也可以使用 persist(或 cache)方法将 RDD 持久化到内存中,这样 Spark 会将元素保留在集群上,以便下次查询时能更快地访问。还支持将 RDD 持久化到磁盘上,或在多个节点间复制。
RDD 操作 更新于 24/08/15
让我们来研究一下其中的几个
创建 RDD
有两种方法可以创建 RDD,一种是通过并行化驱动程序中的一些现有数据,另一种是使用 SparkContext
对象上的一些工厂方法。我们现在将看到这两种方法的示例。
并行化
如果我们想从现有数据(比如来自 Cassandra)创建一个 RDD,我们可以这样做:
al data = Array(1, 2, 3, 4, 5)
val dataRdd = sc.parallelize(data)
这创建了一个现在可以并行操作的 RDD。你可以通过 Spark 文档阅读更多相关信息。
外部数据集
Spark 还支持从外部数据源创建 RDD,例如 Hadoop 文件系统、Amazon S3 文件系统以及 Cassandra 等。如果你只是想在本地进行尝试,甚至可以使用文件系统路径。但请记住,如果你选择部署到集群,这将不起作用,除非你选择的文件系统对象在集群中的所有节点上都位于相同的位置。
我在本文中使用的设置是,我在 Windows 上运行一个独立的 Scala 应用程序,并且只在本地使用 Spark。也就是说,我没有连接到 Spark 集群,我只在我笔记本电脑的内核上进行并行化,对我来说是 2 个。因此,我能够使用本地文件系统的文件来创建 RDD,但如果我尝试在集群中运行我的代码,这很可能行不通,因为 Spark 集群需要运行在 Linux 上,而我的 Scala 应用程序运行在 Windows 上,因此文件系统路径的格式是不同的。我们将在文章后面讨论如何创建 Spark 集群。虽然不会非常详细,但我会引导你入门,从那里你可以自行探索建立 Spark 集群的乐趣。
好了,闲聊够了,如何从这些外部源创建一个 RDD 呢?
val txtFileRdd = sc.textFile("data.txt")
这个例子简单地从一个文本文件创建了一个 RDD。
现在你已经了解了如何创建 RDD,我们可以继续看看如何使用 RDD 来执行转换和操作。
RDD 转换
RDD 有很多转换操作,要获取完整的列表,您应该查阅 spark 文档的以下页面:https://spark.apache.ac.cn/docs/latest/programming-guide.html#transformations
我们在这里看几个例子,但基本上名称就暗示了当你使用转换时发生了什么,你本质上是在以某种方式转换数据。
Filter
描述:返回一个新的数据集,该数据集由源数据中 func 返回 true 的元素组成
签名:filter(func)
//create an RDD using external data (i.e. the text file)
val textFileRDD = sc.textFile(someTextFilePath, 2).cache()
//filter example
val numAs = textFileRDD.filter(line => line.contains("a")).count()
映射
描述:返回一个新的数据集,该数据集由源数据中 func 返回 true 的元素组成
签名:map(func)
//MAP example
val mapHoHo = textFileRDD.map(line => line + "HO HO")
println("HoHoHo line : %s".format(mapHoHo.first().toString()))
更多示例请参见此链接:Spark 转换
RDD 操作
RDD 有很多操作,要获取完整的列表,您应该查阅 spark 文档的以下页面
https://spark.apache.ac.cn/docs/latest/programming-guide.html#actions
我们在这里看几个例子,对于动作,你可以把它们看作是执行某种最终操作,比如 count() 或 first(),这两者都需要完整的数据集准备就绪(这可能意味着要在工作节点之间进行结果的 shuffle)。你可能还会找到保存 RDD 的方法,比如 saveAsTextFile(..),它可以让你保存一个 RDD。
Collect
描述:将数据集的所有元素作为数组返回到驱动程序中。这通常在 filter 或其他返回足够小数据子集的操作之后很有用
签名:collect()
//COLLECT ACTION example, note that filter() is actually an transformation
val numAsArray = textFileRDD.filter(line => line.contains("a")).collect()
println("Lines with a: %s".format(numAsArray.length))
numAsArray.foreach(println)
println("Lines with a as Array: %s".format(numAsArray.getClass().getTypeName()))
First
描述:返回数据集的第一个元素(类似于 take(1))
签名:first()
//FIRST ACTION example
val firstLine = textFileRDD.first()
println("First Line: %s".format(firstLine))
Persist
您可以选择将 RDD 持久化到 IO。对此有很多选项和需要考虑的事项。更多示例请参见此链接:Spark 操作
更多示例请参见此链接:Spark 操作
共享变量
通常在 Spark 中,您会将一个操作发送到远程节点执行,该节点处理的是数据的独立副本。在任务/节点之间提供通用的读/写变量是低效的。然而,Spark 确实提供了两种类型的共享变量。
广播变量
广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。例如,它们可以用来高效地为每个节点提供一个大型输入数据集的副本。
广播变量是通过调用 SparkContext.broadcast(v) 从变量 v 创建的。广播变量是 v 的一个包装器,可以通过调用 value 方法来访问它的值。
https://spark.apache.ac.cn/docs/latest/programming-guide.html#shared-variables 更新于 24/08/2015
这是一个简单的例子
val broadcastVar = sc.broadcast(Array(1, 2, 3))
.....
broadcastVar.value
累加器
累加器是只能通过关联操作进行“累加”的变量,因此可以有效地在并行中得到支持。它们可以用来实现计数器(如 Map Reduce 中)或求和。Spark 原生支持数值类型的累加器,程序员也可以为新类型添加支持。如果创建累加器时指定了名称,它们将显示在 Spark 的 UI 中。这对于了解正在运行的阶段的进度很有用(注意:Python 中尚不支持此功能)。
累加器是通过调用 SparkContext.accumulator(v) 从一个初始值 v 创建的。在集群上运行的任务可以使用 add 方法或 += 操作符(在 Scala 和 Python 中)对其进行累加。但是,它们无法读取其值。只有驱动程序可以使用其 value 方法读取累加器的值。
下面的代码展示了如何使用累加器来对一个数组的元素求和
https://spark.apache.ac.cn/docs/latest/programming-guide.html#shared-variables 更新于 24/08/2015
这是一个简单的例子
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
var accuulatedValue = accum.value
Shuffle 操作
某些操作会导致 Spark 执行所谓的“Shuffle”。这指的是从所有分区读取所有数据的任务。这是一项成本很高的操作,因为它涉及磁盘 I/O、网络 I/O 和序列化。
可能导致 shuffle 的操作包括重分区操作,如 repartition 和 coalesce,以及 ‘ByKey’ 操作(除了计数),如 groupByKey
和 reduceByKey
,还有连接操作,如 cogroup 和 join。
您可以在这里阅读更多相关信息:shuffle 操作
持久化 RDD
Spark 中最重要的功能之一是在多个操作之间将数据集持久化(或缓存)在内存中。当你持久化一个 RDD 时,每个节点都会将其计算出的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)的其他操作中重用它们。这使得未来的操作速度更快(通常快 10 倍以上)。缓存是迭代算法和快速交互式使用的关键工具。
您可以使用 RDD 上的 persist() 或 cache() 方法来标记一个 RDD 进行持久化。当它在一个操作中首次被计算时,它将被保存在节点的内存中。Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。
持久化 RDD 更新于 24/08/15
如何安装 Spark
Apache Spark 本身体积相当大,幸运的是我们只需要一些 JAR 文件就可以开始运行。开始使用 Apache Spark 的最简单方法(至少在我看来)是执行以下操作:
- 下载 SBT (免费)
- 下载 1.8 JDK(或更高版本)(免费)
- 下载 IntelliJ IDEA 社区版 (免费)
你当然可以下载真正的 Apache Spark Windows 安装程序,它会把你选择的文件夹里放一堆东西。问题是 Apache Spark 团队说 Apache Spark 可以在 Windows 上运行,但运行得不是很好。你通常会在 Linux 集群上运行它。我们稍后会详细讨论这一点。但现在请记住,Apache Spark 在 Linux 虚拟机/主机/集群上运行得确实好得多,你应该确保在真实环境中这样做。如果只是随便玩玩,你可以使用 Windows,我们稍后也会谈到这一点。
SBT
您需要安装简单构建工具,可以从这里获取:
https://sbt.scala-lang.org.cn/
如果你没有听说过 SBT 或者不知道它是什么,你可以把它看作是一个构建工具和包管理工具,在某些方面它类似于 Nuget,不过 SBT 实际上使用 Scala 作为语法/DSL 来表达你的构建需求。
你可以用它来搭建一个 Scala 项目,也可以下载一个 Scala 项目的依赖项。我们稍后会看到更多关于这方面的内容。
JDK
Apache Spark 使用以下 3 种编程语言之一:
- Java
- Scala
- Python
对于前两种,我们显然需要 JDK。所以我们需要去下载它。你可以从 Oracle 网站上获取:
标准的 Java 8 SDK 安装应该就可以了,选择适合你环境的版本。
IntelliJ IDEA
您可以从这里下载免费版的 IntelliJ IDEA: https://www.jetbrains.com/idea/download/
处理 Windows 上奇怪的 Hadoop Commons 问题
就像我在这篇文章中已经说过的,你可能不会在 Windows 上运行 Apache Spark 集群,但你可能想在 Windows 上运行驱动程序。我就是这么想的,然后我遇到了一个关于缺少“winutils.exe”文件的奇怪错误。
我用谷歌搜索了一下,发现修复起来很简单,你可以从下面这位朋友的网站上找到修复方法,他指出了你可能会遇到的具体异常。
.....你会看到以下异常
java.io.IOException: 在 Hadoop 二进制文件中找不到可执行文件 null\bin\winutils.exe。
原因是因为 spark 期望找到指向 hadoop 二进制发行版的 HADOOP_HOME 环境变量,.........
让我们下载 hadoop 通用二进制文件,将下载的压缩文件解压到“C:\hadoop”,然后将 HADOOP_HOME 设置为该文件夹。
http://ysinjab.com/2015/03/28/hello-spark/ 更新于 24/08/2015
修复这个问题所需的下载文件在这里: http://www.srccodes.com/p/article/39/error-util-shell-failed-locate-winutils-binary-hadoop-binary-path
操作步骤如上所述
创建我们的第一个项目
文章的这一部分是整篇文章中唯一原创的部分。在这一部分,我将讨论我自己是如何开始并运行的,具体如下:
- 在 Windows 上以本地模式运行 Spark
- 无集群
- 通过 IntelliJ IDEA 使用 Scala
这一节的关键在于,如果你需要切换到使用集群,你只需要更改主节点连接字符串的详细信息,其余部分将保持不变。
使用 IntelliJ 和 SBT 创建项目结构
在本节中,我们将创建一个新的 Apache Spark 应用程序,它具有以下特点:
- 通过 IntelliJ IDEA 在 Windows 上运行
- 演示使用 SBT 管理依赖项
- 演示一些 RDD 转换
- 演示一些 RDD 操作
创建初始 IntelliJ IDEA 项目
我们需要创建一个新项目
点击查看大图
然后我们需要选择 SBT 项目
点击查看大图
创建项目时,你应该选择以下设置:
点击查看大图
注意:选择 Scala 版本 2.10.5 非常重要,因为在撰写本文时,Apache Spark 的当前发布版本是 v1.4.1,它不支持 Scala 2.11.x,所以你需要确保你的目标是 Scala 2.10.5。
完成这些操作后,你需要等待一段时间,让 SBT 做一些事情,过一会儿你应该会看到这样的项目结构
修改 SBT 依赖
现在我们有了一个默认的 SBT 项目。这很好,但现在我们需要获取实际的 Spark JAR 文件。这可以通过修改主 build.sbt 文件来完成。
所以打开那个文件,然后把它的内容改成这样
name := "SparkDemo"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
接下来打开 SBT 窗口,如下图所示
点击刷新图标
这将获取你在上面的 build.sbt 文件中指定的依赖项所需的所有依赖项。
这会花费很长时间,让它自己运行就好。
完成后,你应该做两件事
- 检查你的 SBT 缓存是否包含 Spark JAR 文件。该目录是 "C:\Users\YOUR_USER\.ivy2\cache"
- 你应该确保 SBT 缓存被用作 IntelliJ IDEA 的实际包源(类似于你为自己设置的 Nuget 存储库时对 Nuget 和 Visual Studio 所做的操作)。
这可以在 IntelliJ IDEA 中按以下步骤完成。
-
项目结构 -> 库
然后按照下面的步骤操作,你会从你自己的 .ivy2
缓存目录(例如 C:\Users\YOUR_USER\.ivy2\cache")中添加内容。
点击查看大图
Scala 驱动程序的简单应用
一旦你经历了下载运行 Spark 所需的大量依赖项的痛苦过程,你应该能够创建一个简单的驱动程序,就像下面这样:
到目前为止,在下面的代码中我唯一没有解释的是这一行:
conf.setMaster("local[2]")
这行代码的作用是,在本地使用 2 个核心运行 spark(这是我笔记本电脑的全部核心数)。如果你要在一个集群上运行驱动程序,你需要修改这一行。
你可以在这个网址阅读更多相关信息:Spark Master Urls 这是一个旧页面,但数据仍然有效。
无论如何,这是示例驱动程序
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
// Should be some file on your system
val someTextFilePath = "C:/Temp/SomeTextFile.txt"
val conf = new SparkConf().setAppName("Simple Application")
//use local Spark (non clustered in this example). Note this relies
// on all the SBT dependencies being downloaded to
// C:\Users\XXXXX\.ivy2 cache folder
conf.setMaster("local[2]")
val sc = new SparkContext(conf)
//creaate an RDD using external data (ie the text file)
val textFileRDD = sc.textFile(someTextFilePath, 2).cache()
//FILTER TRANSFORMATION example, note that count() is actually an action
val numAs = textFileRDD.filter(line => line.contains("a")).count()
val numBs = textFileRDD.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
//MAP TRANSFORMATION example, note that first() is actually an action
val mapHoHo = textFileRDD.map(line => line + "HO HO")
println("HoHoHo line : %s".format(mapHoHo.first().toString()))
//COLLECT ACTION example, note that filter() is actually an transformation
val numAsArray = textFileRDD.filter(line => line.contains("a")).collect()
println("Lines with a: %s".format(numAsArray.length))
numAsArray.foreach(println)
println("Lines with a as Array: %s".format(numAsArray.getClass().getTypeName()))
//FIRST ACTION example
val firstLine = textFileRDD.first()
println("First Line: %s".format(firstLine))
readLine()
}
}
延伸阅读
你可以通过这些链接继续阅读更多关于这些内容的信息
就这些
总之,这就是我这次想说的全部内容,我希望你们中一些可能以前没有接触过这些东西的人能从中有所收获。