65.9K
CodeProject 正在变化。 阅读更多。
Home

Azure:检查 Databricks Apache Spark 平台

starIconstarIconstarIconstarIconstarIcon

5.00/5 (8投票s)

2018年5月16日

CPOL

22分钟阅读

viewsIcon

22201

检查 Azure 上的 Apache Spark Databricks 平台

引言

在本文中,我将讨论 Apache Spark / Databricks。它将指导您如何使用新的基于云的托管 Databricks 平台。本文将从 Microsoft Azure 的角度进行讨论,但如果您使用 Amazon 云,其理念应该完全相同。

 

什么是 Spark / Databricks?

都说一图胜千言,所以这里有一张关于 Apache Spark 的精美图片。

点击查看大图

 

本质上,Apache Spark 是一款数据科学家工具,可以轻松地探索和处理大量数据。它具有以下功能:

  • 分布式数据集(因此可以在计算集群中进行数字运算)
  • DataFrame API,可以进行添加列、聚合列值、别名数据、连接 DataFrame 等操作。它还支持 SQL 语法,并且可以在集群中分布式运行
  • 流式 API
  • 机器学习 API
  • 图 API

 

Spark 还附带了各种适配器,使其能够连接到各种数据源,例如

  • Blob
  • 数据库
  • 文件系统(HDFS / s3 / Azure 存储 / Azure Data Lake / Databricks 文件系统)

 

这不是我第一次撰写关于 Apache Spark 的文章,如果您感兴趣,这里有一些旧文章:

 

 

因此,当我撰写这些文章时,您在集群上运行 Apache Spark 作业的选择有限,您基本上可以执行以下操作之一

  • 创建使用 Apache Spark API 并将在集群上运行的 Java/Scala/Python 应用程序
  • 创建一个 JAR 文件,您可以使用 spark-submit 命令行让现有集群运行它

 

问题是这两种方法都不理想,对于应用程序方法,您并不希望您的分析作业是一个应用程序,您真正希望它只是某种类库

 

Spark-Submit 可以让您提交一个类库,但您从中获得的反馈并不理想

 

后来又出现了一个名为 Apache Livy 的产品来解决这个问题,它是一个基于 Apache Spark 的 REST API。但它也有自己的问题,即设置起来并不方便,而且 API 功能也相当有限。为了解决这个问题,拥有/维护 Apache Spark 的优秀团队推出了 Databricks

 

Databricks 本质上是云端(Amazon / Azure)上完全托管的 Apache Spark。它还具有针对常见操作的 REST API 概念。接下来我们深入探讨一下。

 

Databricks 为何如此出色?

在我们深入了解 Databricks 之前,为什么我认为它如此出色?

我上面已经说过一点,但让我们看看完整的列表

  • 出色的(非常好的)REST API
  • 云端友好的管理仪表板
  • 为您的作业运行创建按需集群并在作业结束时将其销毁的能力是巨大的。仅凭这一点就应该让您考虑研究 Databricks 的原因如下:
    • 通过使用您自己的集群,您不会与其他人共享任何资源,因此您可以根据为集群选择的节点来保证您的性能
    • 您的作业运行后,集群将被销毁。这为您节省了资金
  • 如果您选择使用单个集群而不是每个作业都使用一个**新**集群,则可以将单个静态集群设置为**自动扩展**。这是一个非常棒的功能。想象一下在本地尝试这样做。显然,由于Apache Spark也支持在 Kubernetes 上运行,这确实简化了过程。然而,在我看来,要充分利用 Kubernetes,您还需要在 Amazon/Azure/Google 等云上运行它,因为如果您在云中,扩展集群所需的虚拟机要容易得多。顺便说一句,如果您对 Kubernetes 一无所知,我写了一个迷你系列,您可以在这里找到它:https://sachabarbs.wordpress.com/kubernetes-series/
  • 我觉得它提供的功能相当便宜
  • 高度直观

 

代码在哪里?

所以这篇文章有一些代码,分为两部分

  1. 一个一次性 WPF 应用程序,它只是作为演示 REST 调用的载体,说实话,您可以使用 Postman 来探索 Databricks REST。WPF 应用程序只是让这更容易,因为您不必担心记住设置访问令牌,一旦您设置了一次,也不必担心尝试找到正确的 REST API 语法。UI 只是向您展示了一组用于 Databricks 的工作 REST 调用。
  2. 一个简单的 IntelliJ IDEA Scala/SBT 项目,代表我们希望上传并在 Databricks 上运行的 Apache Spark 作业。这将使用 SBT 编译,因此如果您想运行此代码,SBT 是必需的。

代码库可以在这里找到:https://github.com/sachabarber/databrick-azure-spark-demo

 

必备组件

有几个这样的,不是因为 Databricks 本身需要它们,而是因为我渴望向您展示如何真正使用 Databricks 来完成您自己的作业的完整工作流程,这意味着我们应该创建一个新的 JAR 文件作为要发送到 Databricks 的作业。

因此,需要以下内容:

  • 一个 Databricks 安装(由 Amazon/Azure 托管)
  • Visual Studio 2017(社区版即可)
  • Java 1.8 SDK 已安装并位于您的路径中
  • IntelliJ IDEA Community Edition 2017.1.3(或更高版本)
  • SBT Intellij IDEA 插件已安装

显然,如果您只想阅读而不自己尝试,则不需要这些东西

 

Azure 中 Databricks 入门

正如我之前所说,我将使用 Microsoft Azure,但在云中首次创建 Databricks(这将是云供应商特定的)之后,其余的说明应该适用于 Amazon 或 Azure。

无论如何,在云中与 Databricks 合作的第一步是创建一个新的 Databricks 实例。对于 Azure,这只是简单地创建一个新资源,如下所示

点击查看大图

 

一旦您创建了 Databricks 实例,您应该能够从 Databricks 实例的概述中启动工作区

 

点击查看大图

 

这将启动您进入一个与您的 Azure/Amazon 订阅关联的新的 Databricks 工作区网站,因此在您通过登录阶段(会自动发生,至少在 Azure 上是这样)后,您最初应该会看到类似以下的内容

 

点击查看大图

 

在这里,您可以查看/执行以下操作,其中一些我们将在下面更详细地探讨

  • 创建新作业/集群/笔记本
  • 探索 DBFS (Databricks 文件系统) 数据
  • 查看之前的作业运行
  • 查看/启动/停止集群

 

 

探索工作区

在本节中,我们将探讨您在与您的 Databricks 云安装绑定的 Databricks 工作区网站中可以做什么。我们不会离开工作区网站,下面的所有内容都将在工作区网站中直接完成,这在我看来非常酷,但这些东西真正闪耀的地方在于我们可以通过编程方式完成所有这些事情,而不仅仅是点击网站上的按钮。

毕竟,我们希望将这些东西构建到我们自己的处理管道中。幸运的是,使用网站可以做的事情与通过 REST API 公开的事情之间存在相当好的一对一转换。但我跳过了,我们将在本节之后的部分中达到那里,所以现在让我们只检查一下我们可以在与您的 Databricks 云安装绑定的 Databricks 工作区网站中做什么。

 

创建集群

那么您可能想做的第一件事是创建一个集群来运行一些代码。这可以通过 Databricks 工作区网站轻松完成,如下所示

 

这将带您进入这样的屏幕,您可以在其中配置集群,并为集群选择各种组件

点击查看大图

 

一旦您创建了一个集群,它将最终显示在集群概述页面上

点击查看大图

  • 交互式集群是与我们将接下来讨论的笔记本绑定的集群
  • 作业集群是用于运行作业的集群

 

探索笔记本

现在我是一个资深开发人员(我老了,或者感觉自己老了,或者别的什么),所以打开 IDE 并编写应用程序/类库/jar 没什么大不了的。但是 Apache Spark 本质上是数据科学家用来轻松尝试一些简单的数字运算代码的工具。这正是笔记本发挥作用的地方。

笔记本是一个托管的单元格编辑器,允许用户对 Apache Spark 集群运行 python/R/Scala 代码。

您可以从主菜单创建新的笔记本,如下所示

点击查看大图

因此,在您选择了语言后,您将看到一个空白笔记本,您可以在其中将一些代码写入单元格。教您快捷方式以及如何正确使用笔记本超出了本文的范围,但这里有一些关于笔记本的要点:

  • 它们允许您快速探索 API
  • 它们允许您重新分配被记住的变量
  • 它们允许您只输入特定的单元格
  • 它们为您提供了一些预定义的变量。但请注意,如果您将其转换为真实代码,则需要替换这些变量。例如,`spark` 是一个预定义变量

 

运行笔记本代码

所以假设我刚刚创建了一个 Scala 笔记本,并在一个单元格中输入了如下图所示的文本。我可以使用 ALT + Enter 或笔记本 UI 中的运行按钮快速运行它。

 

点击查看大图

 

这将运行活动单元格并将变量/输出语句打印到笔记本 UI,这也可以在上面看到

 

从 Azure Blob 存储读取/写入

使用 Apache Spark 时最常见的任务之一是从某个外部源获取数据,并在将其转换为所需结果后将其保存到存储中

 

这是一个使用现有 Azure Blob 存储帐户和一些 Scala 代码的示例。这个特定的示例只是从 Azure Blob 存储加载一个 CSV 文件,转换该文件,然后将其作为日期标记的 CSV 文件保存回 Azure Blob 存储。

 

import java.util.Calendar
import java.text.SimpleDateFormat

spark.conf.set("fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net", "YOUR_STORAGE_KEY_HERE")

spark.sparkContext.hadoopConfiguration.set(
  "fs.azure.account.key.YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net",
  "YOUR_STORAGE_KEY_HERE"
)


val now = Calendar.getInstance().getTime()
val minuteFormat = new SimpleDateFormat("mm")
val hourFormat = new SimpleDateFormat("HH")
val secondFormat = new SimpleDateFormat("ss")

val currentHour = hourFormat.format(now)      // 12
val currentMinute = minuteFormat.format(now)  // 29
val currentSecond = secondFormat.format(now)           // PM
val fileId  = currentHour  + currentMinute + currentSecond
fileId

val data = Array(1, 2, 3, 4, 5)
val dataRdd = sc.parallelize(data)
val ages_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/Ages.csv")
ages_df.head

//https://github.com/databricks/spark-csv 
val selectedData = ages_df.select("age")
selectedData.write
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .save("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")


val saved_ages_df = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("wasbs://YOUR_CONTAINER_NAME_HERE@YOUR_STORAGE_ACCOUNT_NAME_HERE.blob.core.windows.net/" + fileId + "_SavedAges.csv")

saved_ages_df.show()

 

对于我的 Azure 存储帐户,它看起来像这样

点击查看大图

如果你想知道为什么结果会有像 080629_SavedAges.csv 这样的文件夹,这是因为 Apache Spark 处理分区的方式。相信我,当你将它们重新加载到内存中时,这无关紧要,因为 Apache Spark 会处理这个问题,正如所见

 

点击查看大图

 

在 DataBricks Web UI 中创建作业

Databricks Web UI 允许您从笔记本或您拥有的 JAR 文件创建新作业,您可以将其拖入并设置主入口点。您还可以设置调度和要使用的集群。一旦满意,您可以单击“立即运行”按钮,这将运行您的作业。

点击查看大图

 

启动 Spark UI/仪表板

一旦您运行了一个作业,您很可能希望查看它,以确保它按预期工作,并且运行最佳。幸运的是,Apache Spark 配备了一个漂亮的分析运行可视化工具,您可以用于此。它有点像 SQL Server 中的 SQL 查询分析器。

这可以从“作业”页面访问

点击查看大图

因此,让我们看看一个成功的作业,我们可以通过“作业”页面“上次运行”列中的“成功”链接来查看它。

从那里我们可以查看 Apache Spark UI 或作业的日志。

让我们看看这个作业的 Apache Spark UI。

点击查看大图

Spark UI 是你的朋友,尝试熟悉它

 

Databricks API

好的,现在我们已经介绍了如何使用 Databricks Web UI,那么让我们熟悉一下 REST API,以便我们可以围绕使用 Apache Spark 作为我们的分析引擎来编写自己的代码。下一节将展示如何使用一些可用的 REST API。

 

有哪些可用的 API?

那么您现在可能想知道实际有哪些可用的 API?这里是查看的地方:https://docs.databricks.com/api/latest/index.html

从中,主要顶级 API 是

  • 集群
  • DBFS
  • 实例配置文件
  • 作业
  • 秘密
  • Token
  • 工作区

一天的时间根本不够我向您展示所有这些。所以我选择了一些进行演示,我们将在下面讨论这些。

 

创建用于 API 的令牌

Databricks REST API 都需要关联一个 JWT 令牌。这意味着您首先需要为此创建一个令牌。这在 Databricks web UI 中很容易实现。只需按照这些步骤操作:

点击查看大图

因此,一旦您完成此操作,获取令牌值,并且您还需要记下图像中突出显示的另一条信息。有了这两条信息,我们就可以使用 Postman 尝试一个请求。

点击查看大图

 

任何 API 调用都需要做的常见事情

正如我刚才所说,您需要确保在每次调用中都提供上一步中的令牌。但是我们具体如何做呢?它看起来像什么?让我们使用 Postman 展示一个示例,使用上一段中的最后两条信息。

创建 Base64 编码的令牌值

您从上面获得的令牌需要转换为 Base64 编码字符串。有很多在线工具可以做到这一点,随便选择一个。需要注意的重要一点是,您还必须包含前缀 "token:"。因此,要编码的完整字符串类似于 "token:dapi56b...........d5"

 

这将为您提供一个 base64 编码字符串。然后我们需要前往 Postman 尝试该请求,它可能看起来像这样

点击查看大图

这里需要注意的重要事项是

  • 我们创建一个头:`Key = Authorization, Value = Basic YOUR_BASE64_ENCODED_STRING_FROM_PREVIOUS_STEP`
  • 我们使用来自 Azure/AWS 门户的信息作为 Uri 的一部分。因此,对于我的 Databricks Azure 安装,这是一个有效的请求:https://northeurope.azuredatabricks.net/api/2.0/clusters/spark-versions

 

一个简单的抛弃式演示应用

显然,您可以在 Postman 中随意摸索,了解 Databricks REST API 的工作原理,这完全没有问题。但是为了让您更轻松,我开发了一个简单的(一次性)演示应用程序,您可以使用它来探索我认为最重要的两个 API。

它运行时看起来是这样的

当您选择运行演示应用提供的预设 REST API 调用之一时,它看起来是这样的

 

 

如何为演示应用设置我的令牌?

上面,当我开始谈论 Databricks REST API 时,我们说过需要提供一个 API 令牌。那么演示应用程序如何处理这个问题呢?

它有两种处理方式,App.Config 中的这个条目应该指向包含令牌信息的您自己的文件

所以对我来说,这是我的文件

C:\Users\sacha\Desktop\databrick-azure-spark-demo\MyDataBricksToken.txt

该文件只包含一行内容:"token:dapi56b...........d5",这是我们上面讨论的以 "token:" 开头的 base64 编码字符串。

然后将其读取到演示应用程序的全局可用属性中,如下所示

using System.Configuration;
using System.IO;
using System.Windows;
using SAS.Spark.Runner.REST.DataBricks;

namespace SAS.Spark.Runner
{
    public partial class App : Application
    {
        protected override void OnStartup(StartupEventArgs e)
        {
            base.OnStartup(e);

            var tokenFileName = ConfigurationManager.AppSettings["TokenFileLocation"];
            if (!File.Exists(tokenFileName))
            {
                MessageBox.Show("Expecting token file to be provided");
            }

            AccessToken = File.ReadAllText(tokenFileName);

            if(!AccessToken.StartsWith("token:"))
            {
                MessageBox.Show("Token file should start with 'token:' + 
				  "following directly by YOUR DataBricks initial token you created");
            }
        }

        public static string AccessToken { get; private set; }
    }
}

 

就是这样。演示应用程序应该为您处理其余部分。

 

正如我所说,我没有时间探索每一个 API,但我有时间研究其中最常见的两个:集群和作业。我将在下面讨论。

但在我开始之前,我只想向您展示每个 API 探索的大致思路

 

示例 ViewModel

大多数 API 探索都是使用类似这样的视图模型完成的

using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;

namespace SAS.Spark.Runner.ViewModels.Clusters
{
    public class ClusterGetViewModel : INPCBase
    {
        private IMessageBoxService _messageBoxService;
        private IDatabricksWebApiClient _databricksWebApiClient;
        private string _clustersJson;
        private string _clusterId;

        public ClusterGetViewModel(
            IMessageBoxService messageBoxService,
            IDatabricksWebApiClient databricksWebApiClient)
        {
            _messageBoxService = messageBoxService;
            _databricksWebApiClient = databricksWebApiClient;
            FetchClusterCommand = 
				new SimpleAsyncCommand<object, object>(ExecuteFetchClusterCommandAsync);
        }

        private async Task<object> ExecuteFetchClusterCommandAsync(object param)
        {
            if(string.IsNullOrEmpty(_clusterId))
            {
                _messageBoxService.ShowError("You must supply 'ClusterId'");
                return System.Threading.Tasks.Task.FromResult<object>(null);
            }

            try
            {
                var cluster = await _databricksWebApiClient.ClustersGetAsync(_clusterId);
                ClustersJson = cluster.ToString();
            }
            catch(Exception ex)
            {
                _messageBoxService.ShowError(ex.Message);
            }
            return System.Threading.Tasks.Task.FromResult<object>(null);
        }


        public string ClustersJson
        {
            get
            {
                return this._clustersJson;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._clustersJson, 
					value, () => ClustersJson);
            }
        }

        public string ClusterId
        {
            get
            {
                return this._clusterId;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._clusterId, 
					value, () => ClusterId);
            }
        }

        public ICommand FetchClusterCommand { get; private set; }
    }
}

想法是,我们使用简单的 REST 服务,并且有一个属性代表 JSON 响应。REST 服务实现了这个接口

using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public interface IDatabricksWebApiClient 
    {

        //https://docs.databricks.com/api/latest/jobs.html#create
        Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest);

        //https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
        Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest);

        //https://docs.databricks.com/api/latest/jobs.html#runs-get
        Task<DatabricksRunResponse> JobsRunsGetAsync(int runId);

        //https://docs.databricks.com/api/latest/jobs.html#list
        Task<JObject> JobsListAsync();

        //https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
        Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(RunsSubmitJarTaskRequest runsSubmitJarTaskRequest);

        //https://docs.azuredatabricks.net/api/latest/clusters.html#start
        Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId);

        //https://docs.azuredatabricks.net/api/latest/clusters.html#get
        Task<JObject> ClustersGetAsync(string clusterId);

        //https://docs.databricks.com/api/latest/clusters.html#list
        Task<ClusterListResponse> ClustersListAsync();

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#list
        Task<DbfsListResponse> DbfsListAsync();

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#put
        Task<JObject> DbfsPutAsync(FileInfo file);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
        Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
        Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest);

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#close
        Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest);
    }
}

 

用于 UI 的 DataTemplate

实际的 UI 是通过一个 `DataTemplate` 简单完成的,我们将相关 ViewModel 绑定到一个 `ContentControl`。对于 JSON 表示,我只是使用了 AvalonEdit TextBox

这是上述 ViewModel 的一个示例

<Controls:MetroWindow x:Class="SAS.Spark.Runner.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        xmlns:vms="clr-namespace:SAS.Spark.Runner.ViewModels"
        xmlns:vmsClusters="clr-namespace:SAS.Spark.Runner.ViewModels.Clusters"
        xmlns:vmsJobs="clr-namespace:SAS.Spark.Runner.ViewModels.Jobs"
        xmlns:avalonEdit="http://icsharpcode.net/sharpdevelop/avalonedit"
        xmlns:Controls="clr-namespace:MahApps.Metro.Controls;assembly=MahApps.Metro"
        xmlns:local="clr-namespace:SAS.Spark.Runner"       
        mc:Ignorable="d"
        WindowState="Maximized"
        Title="DataBricks API Runner">
    <Controls:MetroWindow.Resources>
		.....
		.....
		<DataTemplate DataType="{x:Type vmsClusters:ClusterGetViewModel}">
			<DockPanel LastChildFill="True">
				<StackPanel Orientation="Horizontal" DockPanel.Dock="Top">
					<Label Content="ClusterId" Margin="3" VerticalAlignment="Center"
						VerticalContentAlignment="Center" Height="24"/>
					<TextBox Text="{Binding ClusterId}" Width="200" VerticalAlignment="Center"
						VerticalContentAlig
						nment="Center" Height="24"/>
					<Button Content="Get Cluster" Margin="3,0,3,0" Width="100" 
					HorizontalAlignment="Left"
					VerticalAlignment="Center"
					VerticalContentAlignment="Center"
					Command="{Binding FetchClusterCommand}"/>
				</StackPanel>
				<avalonEdit:TextEditor
				FontFamily="Segoe UI"
				SyntaxHighlighting="JavaScript"
				FontSize="10pt"
				vms:TextEditorProps.JsonText="{Binding ClustersJson}"/>
			</DockPanel>
		</DataTemplate>
	<Controls:MetroWindow.Resources>
</Controls:MetroWindow>

 

由于演示应用中使用的 ViewModels 主要都遵循这种模式,我将不再展示任何其他 ViewModel 代码,除了上传 JAR 文件的那一个,因为那个有点特殊。

只要记住所有这些大致都以这种方式工作,您就会没问题。

 

集群 API 探索

本节展示了我选择查看的集群 API

 

集群列表

Databricks 文档在这里:https://docs.databricks.com/api/latest/clusters.html#list,此 API 调用执行以下操作

  • 返回所有固定集群、当前活动集群、过去 30 天内最近终止的交互式集群(最多 70 个)以及过去 30 天内最近终止的作业集群(最多 30 个)的信息。例如,如果过去 30 天内有 1 个固定集群、4 个活动集群、45 个终止的交互式集群和 50 个终止的作业集群,则此 API 返回 1 个固定集群、4 个活动集群、所有 45 个终止的交互式集群以及最近终止的 30 个作业集群。

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/clusters.html#list
        public async Task<ClusterListResponse> ClustersListAsync()
        {
            var request = new RestRequest("api/2.0/clusters/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");

            var response = await _client.ExecuteTaskAsync<ClusterListResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<ClusterListResponse>(response.Content);
            return dbResponse;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

集群获取

Databricks 文档在此处:https://docs.azuredatabricks.net/api/latest/clusters.html#get,此 API 调用执行以下操作:

  • 根据集群标识符检索集群信息。集群可以在运行期间或终止后 60 天内进行描述

 

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.azuredatabricks.net/api/latest/clusters.html#get
        public async Task<JObject> ClustersGetAsync(string clusterId)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/get", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("cluster_id", clusterId);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

集群启动

Databricks 文档在此处:https://docs.azuredatabricks.net/api/latest/clusters.html#start,此 API 调用执行以下操作:

  • 根据其 ID 启动已终止的 Spark 集群。这类似于 createCluster,不同之处在于
    • 保留了之前的集群 ID 和属性。
    • 集群以最后指定的集群大小启动。如果之前的集群是自动扩展集群,则当前集群以最小节点数启动。
    • 如果集群当前不在 TERMINATED 状态,则不会发生任何事情。
      用于运行作业的集群无法启动。

 

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.azuredatabricks.net/api/latest/clusters.html#start
        public async Task<DatabricksClusterStartResponse> ClustersStartAsync(string clusterId)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/clusters/start", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(new { cluster_id = clusterId });

            var response = await _client.ExecuteTaskAsync<DatabricksClusterStartResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksClusterStartResponse>(response.Content);
            return dbResponse;
        }
        
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

作业 API 探索

本节展示了我选择查看的作业 API

 

作业列表

Databricks 文档在这里:https://docs.databricks.com/api/latest/jobs.html#list,此 API 调用执行以下操作:

  • 列出所有作业

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#list
        public async Task<JObject> JobsListAsync()
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

 

作业创建

Databricks 文档在此处:https://docs.databricks.com/api/latest/jobs.html#create,此 API 调用执行以下操作:

  • 使用提供的设置创建新作业

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#create
        public async Task<CreateJobResponse> JobsCreateAsync(string jsonJobRequest)
        {
            var request = new RestSharp.Serializers.Newtonsoft.Json.RestRequest("api/2.0/jobs/create", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddParameter("application/json", jsonJobRequest, ParameterType.RequestBody);
            var response = await _client.ExecuteTaskAsync<CreateJobResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<CreateJobResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

对于这个,值得特别提一下一个示例请求,因为它不是一个简单的参数,我们需要为此请求传递大量的 JSON。这是一个作业的示例请求,该作业每晚 10:15 运行

{
  "name": "Nightly model training",
  "new_cluster": {
    "spark_version": "4.0.x-scala2.11",
    "node_type_id": "r3.xlarge",
    "aws_attributes": {
      "availability": "ON_DEMAND"
    },
    "num_workers": 10
  },
  "libraries": [
    {
      "jar": "dbfs:/my-jar.jar"
    },
    {
      "maven": {
        "coordinates": "org.jsoup:jsoup:1.7.2"
      }
    }
  ],
  "email_notifications": {
    "on_start": [],
    "on_success": [],
    "on_failure": []
  },
  "timeout_seconds": 3600,
  "max_retries": 1,
  "schedule": {
    "quartz_cron_expression": "0 15 22 ? * *",
    "timezone_id": "America/Los_Angeles"
  },
  "spark_jar_task": {
    "main_class_name": "com.databricks.ComputeModels"
  }
}

 

虽然演示应用程序没有直接使用这个,但我使用了一个非常相似的,我将在下面详细介绍。

 

作业运行获取

Databricks 文档在此处:https://docs.databricks.com/api/latest/jobs.html#runs-get,此 API 调用执行以下操作:

  • 检索运行的元数据

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#runs-get
        public async Task<DatabricksRunResponse> JobsRunsGetAsync(int runId)
        {
            var request = new RestRequest("api/2.0/jobs/runs/get", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("run_id", runId.ToString());
            var response = await _client.ExecuteTaskAsync<DatabricksRunResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

 

作业立即运行

Databricks 文档在此处:https://docs.databricks.com/api/latest/jobs.html#jobsrunnow,此 API 调用执行以下操作:

  • 立即运行作业,并返回触发运行的 run_id

这可以通过以下代码简单地完成

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

		.....
		.....
		

        //https://docs.databricks.com/api/latest/jobs.html#jobsrunnow
        public async Task<DatabricksRunNowResponse> JobsRunNowAsync(DatabricksRunNowRequest runRequest)
        {
            var request = new RestRequest("api/2.0/jobs/run-now", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(runRequest);
            var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
            return dbResponse;
        }
		
		
        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

我们使用这种请求

using Newtonsoft.Json;

namespace SAS.Spark.Runner.REST.DataBricks.Requests
{
    // If you want to pass args you can do so using extra properties
    // See : https://docs.databricks.com/api/latest/jobs.html#run-now
    // - jar_params : 
    //   A list of parameters for jobs with jar tasks, e.g. "jar_params": ["john doe", "35"]. 
    //   The parameters will be used to invoke the main function of the main class specified 
    //   in the spark jar task. If not specified upon run-now, it will default to an empty list.
    // - notebook_params : 
    //   A map from keys to values for jobs with notebook task, 
    //   e.g. "notebook_params": {"name": "john doe", "age":  "35"}. 
    //   The map is passed to the notebook and will be accessible through the 
    //   dbutils.widgets.get function
    // - python_params :
    //   A list of parameters for jobs with python tasks, e.g. "python_params": ["john doe", "35"]. 
    //   The parameters will be passed to python file as command line parameters. 
    // If specified upon run-now, it would overwrite the parameters specified in job setting. 
    public class DatabricksRunNowRequest
    {
        [JsonProperty(PropertyName = "job_id")]
        public int JobId { get; set; }
    }
}

 

作业运行提交

Databricks 文档在此处:https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit,此 API 调用执行以下操作:

  • 使用提供的设置提交一次性运行。此端点不需要创建 Databricks 作业。您可以直接提交您的工作负载。通过此端点提交的运行不会显示在 UI 中。提交运行后,您可以使用 jobs/runs/get API 检查运行状态。

现在这可能是所有 REST API 中最复杂但最有用的,它允许您执行以下操作:

  • 使用 Scala 编写的 JAR 运行(您也可以传入命令行参数)
  • 使用笔记本运行
  • 使用 Python 文件运行(您也可以传入命令行参数)

由于我对 Scala 非常热衷,我将使用 Scala 进行演示

 

Scala 项目

演示代码在源代码中有一个第二个项目:Src/SAS.SparkScalaJobApp,这是一个 IntelliJ IDEA Scala 项目。要运行此项目,您将需要本文顶部的先决条件。

下载代码后,您应该在命令行窗口中运行 SBT,并导航到 Src/SAS.SparkScalaJobApp 文件夹。然后执行这些 SBT 命令

 
> clean
>compile
>assembly

从那里您应该能够进入 `Target` 目录并看到一个 FAT Jar(一个包含所有依赖项的 Jar)

我们很快就会用到它,但让我们花一点时间来检查代码。这是一个非常简单的 Spark 作业,它需要一个单一的 Int 命令行参数(我们稍后将通过 REST 调用发送),然后将创建那么多项目的列表,并应用一些简单的 Spark 转换。

 

注意

 

需要注意的是,我们需要小心使用 SparkContextSparkSession 等对象,如果您以前做过 Spark,您可能会自己创建它们。当使用 AWS 或 Azure 等云提供商时,您需要使用现有的 SparkContextSparkSession,并且我们还需要避免终止/关闭这些对象,因为它们实际上是共享的。这篇博客对此有很好的阐述:https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

 

 

import scala.util.Try
import scala.util.Success
import scala.util.Failure
import org.apache.spark.sql.SparkSession

object SparkApp extends App {

  println("===== Starting =====")

  if(args.length != 1) {
    println("Need exactly 1 int arg")
  }

  Try {
    Integer.parseInt(args(0))
  } match {
    case Success(v:Int) => {
      val combinedArgs = args.aggregate("")(_ + _, _ + _)
      println(s"Args were $combinedArgs")
      SparkDemoRunner.runUsingArg(v)
      println("===== Done =====")
    }
    case Failure(e) =>  {
      println(s"Could not parse command line arg [$args(0)] to Int")
      println("===== Done =====")
    }
  }
}

object SparkDemoRunner {
  def runUsingArg(howManyItems : Int) : Unit =  {
    val session = SparkSession.builder().getOrCreate()
    import session.sqlContext.implicits._

    val multiplier = 2
    println(s"multiplier is set to $multiplier")

    val multiplierBroadcast = session.sparkContext.broadcast(multiplier)
    val data = List.tabulate(howManyItems)(_ + 1)
    val dataRdd = session.sparkContext.parallelize(data)
    val mappedRdd = dataRdd.map(_ * multiplierBroadcast.value)
    val df = mappedRdd.toDF
    df.show()
  }
}

 

无论如何,一旦我们有了那个 Jar 文件,我们需要使用一些 API,我将逐一介绍,但大致流程如下:

  • 检查所选的 Jar 文件是否存在于 Dbfs (Databricks 文件系统) 中 (这意味着我们已经上传了它)
  • 开始上传文件(我们必须分块进行,因为单个 2.0/dbfs/put API 有 1MB 限制)以获取文件句柄
  • 上传文件句柄的数据块作为 Base64 编码字符串
  • 使用文件句柄关闭文件
  • 构建一个 runs-submit 请求以使用刚刚上传的/最新的 Dbfs 文件

 

这就是它的粗略轮廓

 

所以,这是允许您选择 Jar 的 ViewModel(如上所述,如果您按照上面的说明使用 SBT 编译它,它应该在 Src/SAS.SparkScalaJobApp 源代码的 Target 文件夹中)。

using SAS.Spark.Runner.REST.DataBricks;
using System;
using System.IO;
using System.Threading.Tasks;
using System.Windows.Input;
using SAS.Spark.Runner.Services;
using System.Linq;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System.Collections.Generic;
using Newtonsoft.Json;
using System.Diagnostics;

namespace SAS.Spark.Runner.ViewModels.Jobs
{
    public class JobsPickAndRunJarViewModel : INPCBase
    {
        private IMessageBoxService _messageBoxService;
        private IDatabricksWebApiClient _databricksWebApiClient;
        private IOpenFileService _openFileService;
        private IDataBricksFileUploadService _dataBricksFileUploadService;
        private string _jarFilePath;
        private string _status;
        private FileInfo _jarFile;
        private bool _isBusy;
        private bool _isPolling = false;
        private string _jobsJson;
        private Stopwatch _watch = new Stopwatch();

        public JobsPickAndRunJarViewModel(
            IMessageBoxService messageBoxService,
            IDatabricksWebApiClient databricksWebApiClient,
            IOpenFileService openFileService,
            IDataBricksFileUploadService dataBricksFileUploadService)
        {
            _messageBoxService = messageBoxService;
            _databricksWebApiClient = databricksWebApiClient;
            _openFileService = openFileService;
            _dataBricksFileUploadService = dataBricksFileUploadService;
            PickInputJarFileCommand = new SimpleAsyncCommand<object, object>(x => !IsBusy && !_isPolling, ExecutePickInputJarFileCommandAsync);
        }

        public string JobsJson
        {
            get
            {
                return this._jobsJson;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._jobsJson, value, () => JobsJson);
            }
        }

        public string JarFilePath
        {
            get
            {
                return this._jarFilePath;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._jarFilePath, value, () => JarFilePath);
            }
        }

        public string Status
        {
            get
            {
                return this._status;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._status, value, () => Status);
            }
        }

        public bool IsBusy
        {
            get
            {
                return this._isBusy;
            }
            set
            {
                RaiseAndSetIfChanged(ref this._isBusy, value, () => IsBusy);
            }
        }

        public ICommand PickInputJarFileCommand { get; private set; }
 
        

        private async Task<object> ExecutePickInputJarFileCommandAsync(object param)
        {
            IsBusy = true;

            try
            {
                _openFileService.Filter = "Jar Files (*.jar)|*.jar";
                _openFileService.InitialDirectory = @"c:\";
                _openFileService.FileName = "";
                var dialogResult = _openFileService.ShowDialog(null);
                if(dialogResult.Value)
                {
                    if(!_openFileService.FileName.ToLower().EndsWith(".jar"))
                    {
                        _messageBoxService.ShowError($"{_openFileService.FileName} is not a JAR file");
                        return Task.FromResult<object>(null);
                    }
                    _jarFile = new FileInfo(_openFileService.FileName);
                    JarFilePath = _jarFile.Name;
                    var rawBytesLength = File.ReadAllBytes(_jarFile.FullName).Length;
                    await _dataBricksFileUploadService.UploadFileAsync(_jarFile, rawBytesLength,
                        (newStatus) => this.Status = newStatus);

                    bool uploadedOk = await IsDbfsFileUploadedAndAvailableAsync(_jarFile, rawBytesLength);
                    if (uploadedOk)
                    {
                        //2.0/jobs/runs/submit
                        //poll for success using jobs/runs/get, store that in the JSON

                        var runId = await SubmitJarJobAsync(_jarFile);
                        if(!runId.HasValue)
                        {
                            IsBusy = false;
                            _messageBoxService.ShowError(this.Status = $"Looks like there was a problem with calling Spark API '2.0/jobs/runs/submit'");
                        }
                        else
                        {
                            await PollForRunIdAsync(runId.Value);
                        }

                    }
                    else
                    {
                        IsBusy = false;
                        _messageBoxService.ShowError("Looks like the Jar file did not upload ok....Boo");
                    }
                }
            }
            catch (Exception ex)
            {
                _messageBoxService.ShowError(ex.Message);
            }
            finally
            {
                IsBusy = false;
            }
            return Task.FromResult<object>(null);
        }

        private async Task<bool> IsDbfsFileUploadedAndAvailableAsync(FileInfo dbfsFile, int rawBytesLength)
        {
            bool fileUploadOk = false;
            int maxNumberOfAttemptsAllowed = 10;
            int numberOfAttempts = 0;

            while (!fileUploadOk || (numberOfAttempts == maxNumberOfAttemptsAllowed))
            {
                //check for the file in Dbfs
                var response = await _databricksWebApiClient.DbfsListAsync();
                fileUploadOk = response.files.Any(x =>

                    x.file_size == rawBytesLength &&
                    x.is_dir == false &&
                    x.path == $@"/{dbfsFile.Name}"
                );
                numberOfAttempts++;
                this.Status = $"Checking that Jar has been uploaded ok.\r\nAttempt {numberOfAttempts} out of {maxNumberOfAttemptsAllowed}";
                await Task.Delay(500);
            }
            return fileUploadOk;
        }

        private async Task<int?> SubmitJarJobAsync(FileInfo dbfsFile)
        {
            this.Status = $"Creating the Spark job using '2.0/jobs/runs/submit'";

            // =====================================================================
            // EXAMPLE REQUEST
            // =====================================================================
            //{
            //  "run_name": "my spark task",
            //  "new_cluster": 
            //  {
            //    "spark_version": "3.4.x-scala2.11",
            //    "node_type_id": "Standard_D3_v2",
            //    "num_workers": 10
            //  },
            //  "libraries": [
            //    {
            //      "jar": "dbfs:/my-jar.jar"
            //    }
            //  ],
            //  "timeout_seconds": 3600,
            //  "spark_jar_task": {
            //    "main_class_name": "com.databricks.ComputeModels",
            //    "parameters" : ["10"]
            //  }
            //}

            var datePart = DateTime.Now.ToShortDateString().Replace("/", "");
            var timePart = DateTime.Now.ToShortTimeString().Replace(":", "");
            var request = new RunsSubmitJarTaskRequest()
            {
                run_name = $"JobsPickAndRunJarViewModel_{datePart}_{timePart}",
                new_cluster = new NewCluster
                {
                    // see api/2.0/clusters/spark-versions
                    spark_version = "4.0.x-scala2.11",
                    // see api/2.0/clusters/list-node-types
                    node_type_id = "Standard_F4s",
                    num_workers = 2
                },
                libraries = new List<Library>
                {
                    new Library { jar = $"dbfs:/{dbfsFile.Name}"}
                },
                timeout_seconds = 3600,
                spark_jar_task = new SparkJarTask
                {
                    main_class_name = "SparkApp",
                    parameters = new List<string>() { "10" }
                }
            };

            var response = await _databricksWebApiClient.JobsRunsSubmitJarTaskAsync(request);
            return response.RunId;
        }

        private async Task PollForRunIdAsync(int runId)
        {
            _watch.Reset();
            _watch.Start();
            while (_isPolling)
            {
                var response = await _databricksWebApiClient.JobsRunsGetAsync(runId);
                JobsJson = JsonConvert.SerializeObject(response, Formatting.Indented);
                var state = response.state;
                this.Status = "Job not complete polling for completion.\r\n" +
                    $"Job has been running for {_watch.Elapsed.Seconds} seconds";

                try
                {
                    if (!string.IsNullOrEmpty(state.result_state))
                    {
                        _isPolling = false;
                        IsBusy = false;
                        _messageBoxService.ShowInformation(
                            $"Job finnished with Status : {state.result_state}");
                    }
                    else
                    {
                        switch (state.life_cycle_state)
                        {
                            case "TERMINATING":
                            case "RUNNING":
                            case "PENDING":
                                break;
                            case "SKIPPED":
                            case "TERMINATED":
                            case "INTERNAL_ERROR":
                                _isPolling = false;
                                IsBusy = false;
                                break;
                        }
                    }
                }
                finally
                {
                    if (_isPolling)
                    {
                        await Task.Delay(5000);
                    }
                }
            }
        }
    }
}

我们使用这个辅助类来实际上传到 Dbfs

using SAS.Spark.Runner.REST.DataBricks;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace SAS.Spark.Runner.Services
{
    public class DataBricksFileUploadService : IDataBricksFileUploadService
    {
        private IDatabricksWebApiClient _databricksWebApiClient;

        public DataBricksFileUploadService(IDatabricksWebApiClient databricksWebApiClient)
        {
            _databricksWebApiClient = databricksWebApiClient;
        }

        public async Task UploadFileAsync(FileInfo file, int rawBytesLength, 
            Action<string> statusCallback, string path = "")
        {
            var dbfsPath = $"/{file.Name}";

            //Step 1 : Create the file
            statusCallback("Creating DBFS file");
            var dbfsCreateResponse = await _databricksWebApiClient.DbfsCreateAsync(
                new DatabricksDbfsCreateRequest
                    {
                        overwrite = true,
                        path = dbfsPath
                    });

            //Step 2 : Add block in chunks
            FileStream fileStream = new FileStream(file.FullName, FileMode.Open, FileAccess.Read);
            var oneMegaByte = 1 << 20;
            byte[] buffer = new byte[oneMegaByte];
            int bytesRead = 0;
            int totalBytesSoFar = 0;
            while ((bytesRead = fileStream.Read(buffer, 0, buffer.Length)) != 0)
            {
                totalBytesSoFar += bytesRead;
                statusCallback(
                    $"Uploaded {FormatAsNumeric(totalBytesSoFar)} " +
                    $"out of {FormatAsNumeric(rawBytesLength)} bytes to DBFS");
                var base64EncodedData = Convert.ToBase64String(buffer.Take(bytesRead).ToArray());

                await _databricksWebApiClient.DbfsAddBlockAsync(
                    new DatabricksDbfsAddBlockRequest
                        {
                            data = base64EncodedData,
                            handle = dbfsCreateResponse.Handle
                    });

            }
            fileStream.Close();

            //Step 3 : Close the file
            statusCallback($"Finalising write to DBFS file");
            await _databricksWebApiClient.DbfsCloseAsync(
                    new DatabricksDbfsCloseRequest
                    {
                        handle = dbfsCreateResponse.Handle
                    });

        }

        private string FormatAsNumeric(int byteLength)
        {
            return byteLength.ToString("###,###,###");
        }
    }
}

为了完整起见,以下是使上述两个代码片段正常工作的一组 REST API:

using System;
using System.Configuration;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RestSharp;
using SAS.Spark.Runner.REST.DataBricks.Requests;
using SAS.Spark.Runner.REST.DataBricks.Responses;
using RestRequest = RestSharp.Serializers.Newtonsoft.Json.RestRequest;

namespace SAS.Spark.Runner.REST.DataBricks
{
    public class DatabricksWebApiClient : IDatabricksWebApiClient
    {
        private readonly string _baseUrl;
        private readonly string _authHeader;
        private readonly RestClient _client;

        public DatabricksWebApiClient()
        {
            _baseUrl = ConfigurationManager.AppSettings["BaseUrl"];
            _authHeader = Base64Encode(App.AccessToken);
            _client = new RestClient(_baseUrl);
        }

        //https://docs.azuredatabricks.net/api/latest/jobs.html#runs-submit
        public async Task<DatabricksRunNowResponse> JobsRunsSubmitJarTaskAsync(
			RunsSubmitJarTaskRequest runsSubmitJarTaskRequest)
        {
            var request = new RestRequest("2.0/jobs/runs/submit", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(runsSubmitJarTaskRequest);
            var response = await _client.ExecuteTaskAsync<DatabricksRunNowResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksRunNowResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#list
        public async Task<DbfsListResponse> DbfsListAsync()
        {
            var request = new RestRequest("api/2.0/dbfs/list", Method.GET);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddQueryParameter("path", "/");

            var response = await _client.ExecuteTaskAsync<DbfsListResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DbfsListResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#put
        public async Task<JObject> DbfsPutAsync(FileInfo file)
        {
            var request = new RestRequest("api/2.0/dbfs/put", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddFile("back", file.FullName);
            request.AddHeader("Content -Type", "multipart/form-data");

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsservicecreate
        public async Task<DatabricksDbfsCreateResponse> DbfsCreateAsync(DatabricksDbfsCreateRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/create", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync<DatabricksDbfsCreateResponse>(request);
            var dbResponse = JsonConvert.DeserializeObject<DatabricksDbfsCreateResponse>(response.Content);
            return dbResponse;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#dbfsdbfsserviceaddblock
        public async Task<JObject> DbfsAddBlockAsync(DatabricksDbfsAddBlockRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/add-block", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        //https://docs.azuredatabricks.net/api/latest/dbfs.html#close
        public async Task<JObject> DbfsCloseAsync(DatabricksDbfsCloseRequest dbfsRequest)
        {
            var request = new RestRequest("api/2.0/dbfs/close", Method.POST);
            request.AddHeader("Authorization", $"Basic {_authHeader}");
            request.AddJsonBody(dbfsRequest);

            var response = await _client.ExecuteTaskAsync(request);
            JObject responseContent = JObject.Parse(response.Content);
            return responseContent;
        }

        private static string Base64Encode(string plainText)
        {
            var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(plainText);
            return Convert.ToBase64String(plainTextBytes);
        }
    }
}

正如我所说,这是我选择查看的所有 API 中最复杂的。实际上,您可能不会从 UI 启动 Databricks 作业。您可能会使用自己的 REST API,该 API 可以委托给像 https://www.hangfire.io/ 这样的作业管理器,它显然仍然需要为您完成轮询部分。

有了这些,您应该能够从 UI 中选择 JAR,并提交它,观察它运行,并从 Databricks Web UI 中查看运行日志。

 

结论

我不得不说,使用 Apache Spark / Databricks 简直是梦想。 Databricks 简直太棒了,这正是我们所需要的,它能做的事情简直太棒了。能够按需启动一个集群来运行作业,并在作业运行结束后销毁(以节省空闲成本)简直太棒了。

我强烈建议您去看看,我相信您会喜欢它的

 

© . All rights reserved.