65.9K
CodeProject 正在变化。 阅读更多。
Home

Apache Spark/Cassandra 2 of 2

starIconstarIconstarIconstarIconstarIcon

5.00/5 (11投票s)

2016 年 2 月 11 日

CPOL

9分钟阅读

viewsIcon

21815

了解 Spark/Cassandra 的协同工作

系列链接

 

 

目录 

 

引言

上次我讲了如何在本地安装单个节点的 Apache Cassandra。正如我之前所说,这不适合生产环境安装,在生产环境中您会使用 Cassandra 集群,但对于学习来说,单个节点就足够了。

我还讲了 Apache Sparkp,它是一个通用网格计算引擎。在本文中,我将向您展示如何将 Apache Spark 与 Apache Cassandra 一起使用,以便从 Cassandra 将数据保存到/从 Apache Spark 的弹性分布式数据集 (RDD) 中。

 

代码在哪里?

您可以从我的 github 仓库下载代码:https://github.com/sachabarber/CassandraSparkDemo

 

为什么要保存/读取数据

正如我刚才所说,Apache Spark 是一个通用计算引擎,它通过工作节点和 RDD 来工作。其中 RDD 在可用的工作节点之间并行执行。还有一个驱动程序的概念,它负责执行特定的操作来执行计算,这将导致工作节点执行计算的一部分,并将其交回给驱动程序,在那里将其组装为最终结果的一部分。

因此,驱动程序最终会得到一些结果,因此您可能希望存储一些昂贵计算的结果,这并非不合理。反之,我们也可能希望检索一些先前存储的值。

这就是 Cassandra 发挥作用的地方。我们可以使用 Cassandra 来保存 RDD 的计算结果,我们也可以使用 Cassandra 来填充 RDD。

我希望这有点道理。我认为老实说,阅读本系列的前两篇文章是个好主意,可以为我在这里谈论的内容打下基础。我告诉您这些技术非常棒,如果您和我一样,认为自己是多面手,那么它们是基于 JVM 的这一事实不应让您犹豫。它们非常棒,依我之见,您应该尝试学习它们。

总之,本文的其余部分将向您展示各种保存/检索操作的样式。

 

DataStax Cassandra 连接器

为了处理与 Cassandra 的保存/检索,您将需要使用 DataStax Cassandra 连接器,这是必需的。您可以使用类似以下的 SBT(Scala 构建工具)文件从 Maven Central Repository 下载它

 

name := "SparkCassandraDemoApp"

version := "1.0"

scalaVersion := "2.10.5"


libraryDependencies ++=Seq(
  "org.apache.spark"    %     "spark-core_2.10"                 %   "1.4.1",
  "com.datastax.spark"  %     "spark-cassandra-connector_2.10"  %   "1.4.0"
  )
  

这将引入 Apache Spark 库以及 Apache Cassandra 连接器。

注意

Scala、Spark 和 Cassandra 连接器的版本高度相关,请确保使用正确的版本。

 

您可以在这里阅读更多关于 Apache Cassandra 连接器的信息,它有很棒的文档

https://github.com/datastax/spark-cassandra-connector

 

将 Datastax 连接器与 Apache Spark 一起使用

与正常的 Apache Spark 开发一样,我们有一个所谓的驱动程序。这是负责协调工作节点上的分布式计算的代码部分(通常通过使用 .Parallelize)。

对于这个演示应用程序,我没有提供如何创建 Apache Spark 集群或 Apache Cassandra 集群的说明,但我们仍然会偶尔进行并行化,对于这个演示来说,这仅仅意味着使用运行驱动程序的机器上的可用核心。

话虽如此,本文中的大部分演示代码将仅仅展示如何最好地使用 DataStax Cassandra 连接器。

 

我选择 Scala 作为我的首选语言,以下是完整的驱动程序

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SimpleApp {
  def main(args: Array[String]) {

    val conf = new SparkConf(true)
      .setAppName("Simple Application")
      .set("spark.cassandra.connection.host", "127.0.0.1")

    // Use local Spark (non clustered in this example) with 2 cores.
    // Note this relies on all the SBT dependencies being
    // downloaded to C:\Users\XXXXX\.ivy2 cache folder
    conf.setMaster("local[2]")
    implicit val sc = new SparkContext(conf)

    val tableReader = new CassandraTableReader()
    val tableWriter = new CassandraTableWriter()

    tableWriter.initialise()
    
    tableReader.readTestTableValues()
    tableWriter.writeTestTableValues()
    tableReader.readTestTableValues()
    tableReader.foreachTestTableValues()
    tableReader.getColumnAttributes()
    tableReader.getSets()
    tableReader.getUDT()
    tableReader.foreachSelectedTableColumnValues()
    tableReader.foreachFilteredTableColumnValues()
    tableReader.foreachTableRowCount()
    tableReader.foreachTableRowAsTuples()
    tableReader.foreachTableRowAsCaseClasses()
    tableReader.foreachTableRowAsCaseClassesUsingColumnAliases()
    tableReader.foreachTableRowAsTuples()
    tableWriter.saveCollectionOfTuples()
    tableReader.foreachTableRowAsTuples()
    tableWriter.saveCollectionOfCaseClasses()
    tableWriter.saveUDT()

    println("====== DONE ======")

    readLine()
  }
}

可以看出,那里没有太多花哨的东西,我们只运行一些初始化代码,然后调用一个函数,该函数期望从 Cassandra 中读取一些数据到 Spark RDD,或者将一些数据从 Spark RDD 写入 Cassandra。

因此,我认为将文章的其余部分分为 4 个部分是个好主意

  • SparkContext
  • 初始化
  • 从 Cassandra 读取到 Spark RDD
  • 将数据从 Spark RDD 写入 Cassandra

 

SparkContext

为了使用 DataStax Cassandra 连接器进行任何操作,您都需要拥有一个 SparkContext。用于读取/写入 Cassandra 的类都需要 SparkContext。这如何发生?对于演示应用程序,这很简单,就是使用一个作用域内的 Scala 隐式 val。

import com.datastax.spark.connector._
/* SimpleApp.scala */

//Driver program creates the SparkContext

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._

object SimpleApp {
  def main(args: Array[String]) {
    implicit val sc = new SparkContext(conf)

    val tableReader = new CassandraTableReader()
    val tableWriter = new CassandraTableWriter()
    ....
    ....
    ....
}


//Reader class needs the SparkContext
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext

class CassandraTableReader()(implicit val sc : SparkContext) {
    ....
    ....
    ....
}


//Writer class needs the SparkContext
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkContext

class CassandraTableWriter()(implicit val sc : SparkContext) {
    ....
    ....
    ....
}

 

初始化

为了确保我们有一些表可以进行操作,驱动程序会调用一个初始化函数。基本上就是这段代码

def initialise(): Unit =   {
  val conf = sc.getConf
  CassandraConnector(conf).withSessionDo { session =>

    //create keyspace
    session.execute("DROP KEYSPACE IF EXISTS test")
    session.execute("CREATE KEYSPACE test WITH REPLICATION = " +
      "{'class': 'SimpleStrategy', 'replication_factor': 1 }")

    //create table kv
    session.execute("DROP TABLE IF EXISTS test.kv")
    session.execute("CREATE TABLE test.kv(key text PRIMARY KEY, value int)")
    session.execute("INSERT INTO test.kv(key, value) VALUES ('key1', 1)")
    session.execute("INSERT INTO test.kv(key, value) VALUES ('key2', 2)")


    //create table words
    session.execute("DROP TABLE IF EXISTS test.words")
    session.execute("CREATE TABLE test.words(word text PRIMARY KEY, count int)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('elephant', 1)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('tiger', 12)")
    session.execute("INSERT INTO test.words(word, count) VALUES ('snake', 5)")


    //create table emails
    session.execute("DROP TABLE IF EXISTS test.users ")
    session.execute("CREATE TABLE test.users  (username text PRIMARY KEY, emails SET<text>)")
    session.execute("INSERT INTO test.users  (username, emails) " +
      "VALUES ('sacha', {'sacha@email.com', 'sacha@hotmail.com'})")
    session.execute("INSERT INTO test.users  (username, emails) " +
      "VALUES ('bill', {'bill23@email.com', 'billybob@hotmail.com'})")

    //create address and company
    session.execute("DROP TYPE IF EXISTS test.address")
    session.execute("DROP TABLE IF EXISTS test.companies")
    session.execute("CREATE TYPE test.address (city text, street text, number int)")
    session.execute("CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>)")
    session.execute("INSERT INTO test.companies (name, address) VALUES ('company1', " +
      "{ city : 'London', street : 'broad street', number : 111 })")
    session.execute("INSERT INTO test.companies (name, address) VALUES ('company2', " +
      "{ city : 'Reading', street : 'rushmore road', number : 43 })")
  }
}

 

将数据从 Cassandra 读取到 Spark RDD

本节概述了从 Cassandra 读取数据的各种方法。

 

读取表值

我们可能都使用过支持按索引或名称获取列值的记录集。DataStax 连接器也支持这一点,您可以使用 CassandraRow 来获取值。

假设我们有这个表

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

这是一个例子。

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def readTestTableValues(): Unit = {
  println(s"In method : readTestTableValues")
  val rdd = sc.cassandraTable("test", "kv")
  val count = rdd.count
  val first = rdd.first()
  val sum = rdd.map(_.getInt("value")).sum
  println(s"============ RDD COUNT $count")
  println(s"============ RDD FIRST $first")
  println(s"============ RDD SUM $sum")
}

遍历表值

我们也可以对从 Cassandra 检索到的值运行一个 foreach 循环。

假设我们有这个表

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

这是一个例子

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def foreachTestTableValues(): Unit = {
  println(s"In method : foreachTestTableValues")
  val rdd = sc.cassandraTable("test", "kv")
  rdd.foreach(println)
}

获取列属性

CassandraRow 对象还公开了许多有用的属性,例如 columnNames、size,我们可以使用它们。

假设我们有这个表

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getColumnAttributes(): Unit = {
  println(s"In method : getColumnValue")
  val rdd = sc.cassandraTable("test", "kv")
  val firstRow = rdd.first
  val colNames = firstRow.columnNames
  val colSize = firstRow.size
  val firstKey = firstRow.get[String]("key")
  val firstValue = firstRow.get[Int]("value")
  println(s"============ RDD COLUMN NAMES $colNames")
  println(s"============ RDD COLUMN SIZE $colSize")
  println(s"============ RDD FIRST KEY $firstKey")
  println(s"============ RDD FISRT $firstValue")

}

获取 SET

Cassandra 支持对象集合。假设我们在 Cassandra 中有这个表定义

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) .....
 

其中可以看到我们存储了一个 `set` 来存储电子邮件。我们如何读取集合?幸运的是,这也很容易。这是一个例子

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getSets(): Unit = {
  println(s"In method : getSets")
  val rdd = sc.cassandraTable("test", "users")
  val row = rdd.first()
  PrintHelper.printIt("getSets", "List[String]", row.get[List[String]]("emails"))
  PrintHelper.printIt("getSets", "IndexedSeq[String]", row.get[IndexedSeq[String]]("emails"))
  PrintHelper.printIt("getSets", "Seq[String]", row.get[Seq[String]]("emails"))
  PrintHelper.printIt("getSets", "Set[String]", row.get[Set[String]]("emails"))
}

获取 UDT

Cassandra 还支持用户定义类型 (UDT),因此 DataStax 连接器提供了读取它的支持

假设我们的表定义看起来像这样

CREATE TYPE test.address (
    city text,
    street text,
    number int
)


CREATE TABLE test.companies (
    name text PRIMARY KEY,
    address frozen<address>
) .....

然后我们可以使用这段代码从这个表及其 UDT(地址)中读取

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md
def getUDT(): Unit = {
  println(s"In method : getUDT")
  val rdd = sc.cassandraTable("test", "companies")
  val row = rdd.first()
  val address: UDTValue = row.getUDTValue("address")
  PrintHelper.printIt("getUDT", "city", address.getString("city"))
  PrintHelper.printIt("getUDT", "street", address.getString("street"))
  PrintHelper.printIt("getUDT", "number", address.getString("number"))
}

使用投影进行遍历

另一个流行的想法是使用投影,即创建只包含我们感兴趣的属性/字段的新对象。这就像 .NET 中的 LINQ 表达式 `select(x => new { a = someProp, b = someOtherProp })`

我们来看一个例子,我们查询一个有许多列但我们只需要其中几列的表,通过投影我们指定只使用“username”列

假设我们有这个表

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def foreachSelectedTableColumnValues(): Unit = {
  println(s"In method : foreachSelectedTableColumnValues")

  val filteredColumnsRdd = sc.cassandraTable("test", "users")
    .select("username")

  filteredColumnsRdd.foreach(println)

  val row = filteredColumnsRdd.first()
  PrintHelper.printIt("foreachSelectedTableColumnValues", "username", row.getString("username"))
}

使用服务器端过滤进行遍历

另一件人们期望能做到的事情是执行服务器端过滤(可以想象成 SQL 中的 where 子句),该过滤将在 Cassandra 数据库级别应用。

假设我们有这个表

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

这实际上相当容易做到,我们可以这样做

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def foreachFilteredTableColumnValues(): Unit = {
  println(s"In method : foreachFilteredTableColumnValues")

  val filteredColumnsRdd = sc.cassandraTable("test", "users")
    .select("username")
    .where("username = ?", "bill")

  filteredColumnsRdd.foreach(println)

  val row = filteredColumnsRdd.first()
  PrintHelper.printIt("foreachFilteredTableColumnValues", "username", row.getString("username"))
}

聚合(计数)

聚合也是一个典型的用例,比如计数/求和。您不应该尝试自己做这些,您绝对应该使用 DataStax 的聚合方法,因为这些可能需要访问 Cassandra 集群中的所有节点来执行聚合。所以把它留给专家吧。

假设我们有这个表

CREATE TABLE test.users (
    username text PRIMARY KEY,
    emails set<text>
) ....

 

总之,这是一个计数的例子,我们使用了 DataStax 连接器提供的 `cassandraCount` 方法。

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md
def tableRowCount(): Unit = {
  println(s"In method : foreachTableRowCount")

  val count = sc.cassandraTable("test", "users")
    .select("username")
    .cassandraCount()

  PrintHelper.printIt("foreachTableRowCount", "all users count", count)
}

使用元组进行遍历

有时我们可能只需要获取几列,而不想为这些读取创建实际的类。在这种情况下,元组是很好的选择,而且幸运的是,这也得到了很好的支持。

假设我们有这个表

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....
//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsTuples(): Unit = {
  println(s"In method : foreachTableRowAsTuples")

  val rdd = sc.cassandraTable[(String, Int)]("test", "words")
    .select("word", "count");
  val items = rdd.take(rdd.count().asInstanceOf[Int])
  items.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
    "tuple(String, Int)", tuple))

  val rdd2 = sc.cassandraTable[(Int, String)]("test", "words")
    .select("count", "word")
  val items2 = rdd2.take(rdd2.count().asInstanceOf[Int])
  items2.foreach(tuple => PrintHelper.printIt("foreachTableRowAsTuples",
    "tuple(Int, String)", tuple))
}

使用 case class 进行遍历

然而,如果我们需要进行的读取更具持久性,我们可以使用类来表示检索到的数据。Case class 非常适合这种情况。

假设我们有这个表

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) .... 

这是一个例子。

case class WordCount(word: String, count: Int)


//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsCaseClasses(): Unit = {
  println(s"In method : foreachTableRowAsCaseClasses")

  val items = sc.cassandraTable[WordCount]("test", "words")
    .select("word", "count").take(3)

  items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClasses",
    "WordCount(word : String, count : Int)", wc))
}

列别名

列的别名可能非常有用。这是一个例子,我们使用“select”和“as”。

假设我们有这个表

CREATE TABLE test.kv (
    key text PRIMARY KEY,
    value int
) ....

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/4_mapper.md
def foreachTableRowAsCaseClassesUsingColumnAliases(): Unit = {
  println(s"In method : foreachTableRowAsCaseClassesUsingColumnAliases")

  val items = sc.cassandraTable[WordCount]("test", "kv")
    .select("key" as "word", "value" as "count").take(1)

  items.foreach(wc => PrintHelper.printIt("foreachTableRowAsCaseClassesUsingColumnAliases",
    "WordCount(word : String, count : Int)", wc))
}

 

将数据从 Spark RDD 写入 Cassandra

本节概述了将数据写入 Cassandra 的各种方法。

 

保存元组集合

Scala 支持元组的概念,它可能看起来像这样 `(1, "cat")`,这是一个 `(Int, String)` 的元组。元组可以有不同的长度,但对于这个演示,我们将坚持使用长度为 2 的元组。

所以,如果我们有一个表定义看起来像这样

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....

然后,我们可以像这样将我们的元组(长度为 2)保存到此表中

 

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveCollectionOfTuples(): Unit = {
  println(s"In method : saveCollectionOfTuples")

  val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
  collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}

 

保存 case class 集合

Scala 也有 case class 的概念,这些类有点神奇,实现了许多功能(如 `HashCode`、`Equals` 等)。因此,与 Tuples 相比,使用它们通常更好。因为它们是强类型且为人所知的类。

假设我们有这个 scala case class

case class WordCount(word: String, count: Int)

并且我们的表定义看起来像这样

CREATE TABLE test.words (
    word text PRIMARY KEY,
    count int
) ....

然后,我们可以使用这段代码保存这些类的集合

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveCollectionOfCaseClasses(): Unit = {
  println(s"In method : saveCollectionOfCaseClasses")

  val collection = sc.parallelize(Seq(WordCount("shark", 50), WordCount("panther", 60)))
  collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
}

 

保存用户定义类型 (UDT) 集合

Cassandra 还支持 UDT 的概念,这可以在下面的表定义中看到,其中有一个“address”UDT 列

CREATE TABLE test.companies (
    name text PRIMARY KEY,
    address frozen<address>
) ....

所以我们应该能够使用 Scala 存储这些 UDT。我们在 Scala 中处理 UDT 的方式是使用 case class。因此,对于这个例子,我们将有以下 case class

case class Address(city: String, street: String, number: Int)

然后我们可以像这样将它保存到 Cassandra

//See this post for more details
//https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
def saveUDT(): Unit = {
  println(s"In method : saveUDT")

  val address = Address(city = "Oakland", street = "Broadway", number = 90210 )
  val col = Seq(("Oakland cycles", address))
  sc.parallelize(col).saveToCassandra("test", "companies", SomeColumns("name", "address"))
}

 

就这些

就这样,我已经全面介绍了如何使用 Apache Spark 执行通用网格计算。我还向您展示了如何安装本地 Cassandra 实例,并且本文讲解了如何使用 DataStax Cassandra Spark 连接器,将数据保存到/从 Cassandra,以及从 Cassandra 读取到/从 Spark 弹性分布式数据集。

我敦促大家亲自尝试这些工具。Cassandra + Spark 是目前非常热门的工具,至少在英国是这样。所以从这个角度来看,这不言而喻。

 

 

 

 

 

 

 

© . All rights reserved.