使用 C#、Azure 和 Apache Hadoop 分析一些“大数据”——分析 Stack Overflow 数据转储






4.97/5 (17投票s)
介绍如何使用 Apache Hadoop 和 Azure 分析大型数据集,在 C# 中使用 MapReduce 作业
是时候用 C#、Azure 和 Apache Hadoop 做些有意义的事情了。在这篇文章中,我们将探讨如何在 C# 中创建 Mapper 和 Reducer,以分析 Stack Overflow 帖子中命名空间的使用频率。在我们开始之前,让我们简要了解一下 Hadoop 和 MapReduce 的概念。
MapReduce 快速入门
Map/Reduce 是一个用于处理海量数据集的编程模型,最初由Google实现。Map 和 Reduce 函数非常容易理解。
- Map(list) –> 键值对列表
- Map 函数将处理数据集并将其拆分成多个键值对。
- 聚合,分组
- Map/Reduce 框架可能会对 Map 函数的输出执行诸如分组、排序等操作。分组是根据键进行的,给定键的值会被传递给 Reduce 方法。
- Reduce(Key, List of Values for the key) -> Another List of Key,Value
- Reduce 方法通常会对给定键的所有值执行聚合函数(求和、平均值或其他复杂函数)。
有趣的是,你可以使用 Apache Hadoop 这样的 Map/Reduce 框架,在数据集上分层并行化 Map/Reduce 操作。M/R 框架将负责多项操作,包括将 Map 和 Reduce 方法部署到多个节点,聚合 Map 方法的输出,将其传递给 Reduce 方法,并在节点发生故障时处理容错等。
如果你对这个概念还不熟悉,Ayende @ Rahien提供了一个优秀的视觉解释。
Apache Hadoop 和 Hadoop Streaming
Apache Hadoop 是一个非常成熟的分布式计算框架,提供 Map/Reduce 功能。
Hadoop streaming 是 Apache Hadoop 自带的一个实用工具,该工具允许你使用任何可执行文件创建和运行 map/reduce 作业。如果你使用 Hadoop Streaming,你的 Map 和 Reduce 可执行文件将从控制台读取/写入数据,Hadoop 将负责将数据正确地管道化到你的 mapper/reducer。
从 Hadoop 主节点,你可以运行以下命令来启动你的 map/reduce 作业。不用太担心,因为之后你会看到我们在 Azure 中使用的 Hadoop 控制面板将为我们提供一个不错的 UI 来创建 Map/Reduce 命令,同时我们也会创建一个新的 Map Reduce 作业。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper <yourmapperexecutable>\ -reducer <yourreducerexecutable>
目前,Windows Azure 上提供 Hadoop 的开发者预览版本,因此你可以前往https://www.hadooponazure.com/并登录创建你自己的 Hadoop 集群,以运行 Map/Reduce 作业。注册后,你可以使用 JavaScript 交互式控制台与你的 Hadoop 集群进行交互。
在 Azure 中设置你的 Hadoop 集群
登录到https://www.hadooponazure.com/后,你可以轻松地在 Azure 中设置你的 Hadoop 集群。请求一个集群,并为它命名。我请求了一个名为 Stackanalyzer 的集群,以便我可以访问它 @stackanalyzer.cloudapp.net 进行远程登录等。
你需要等待几分钟,直到你的集群准备就绪。集群设置完成后,你可以访问集群控制面板。你可以通过 JavaScript/Hive 交互式控制台、远程桌面等方式访问集群。在本例中,我们将使用 JavaScript 控制台将我们的 mapper、reducer 和数据文件上传到集群。
获取一些 Stackoverflow 数据进行分析
现在,让我们来分析一些真实数据。你可以访问Stack Exchange 数据浏览器,编写查询,并下载一些数据。你可以从http://data.stackexchange.com/stackoverflow/query/new执行以下查询以获取一些帖子数据。你可以发出任何查询,或者你可以使用整个 Stackoverflow 数据转储中的帖子数据,这只是一个例子。
select top 10000 Posts.id, Posts.body from posts where posts.ViewCount>10000
将数据下载为 CSV 文件,并将其保存为“data.csv”——并放在手边,以便稍后将其上传到集群。
在 C# 中创建我们的 Mapper 和 Reducer 应用程序
现在,让我们在 C# 中创建我们的 Mapper 和 Reducer 应用程序。启动 Visual Studio,并创建两个 C# 控制台应用程序。确保在编译这些应用程序时,你是在发布模式下编译的,以便我们可以将其部署到 Hadoop 集群。我们的 mapper 程序中的代码如下。一旦我们启动了 Job,Hadoop Streamer 将逐行将 Stackoverflow 数据 csv 文件中的行推送到我们的 Mapper。因此,让我们使用正则表达式来查找所有以 System 开头的 C# 命名空间声明,并输出找到的匹配项。
- // 简单的 mapper,用于提取命名空间声明
- using System.IO;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Text.RegularExpressions;
- namespace StackOverflowAnalyzer.Mapper
- {
- class Program
- {
- static void Main(string[] args)
- {
- string line;
- Regex reg=new Regex(@"(using)\s[A-za-z0-9_\.]*\;");
- while ((line = Console.ReadLine()) != null)
- {
- var matches = reg.Matches(line);
- foreach (Match match in matches)
- {
- Console.WriteLine("{0}", match.Value);
- }
- }
- }
- }
- }
- using System.IO;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace StackOverflowAnalyzer.Reducer
- {
- class Program
- {
- static void Main(string[] args)
- {
- string ns, prevns = null;
- int nscount = 0;
- while ((ns = Console.ReadLine()) != null)
- {
- if (prevns != ns)
- {
- if (prevns != null)
- {
- Console.WriteLine("{0} {1}", prevns, nscount);
- }
- prevns = ns;
- nscount = 1;
- }
- else
- {
- nscount += 1;
- }
- }
- Console.WriteLine("{0} {1}", prevns, nscount);
- }
- }
- }
同样,请注意你必须以发布模式编译 mapper 和 reducer。
将我们的 Mapper、Reducer 和数据文件部署到 Azure Hadoop 集群。
转到 Azure Hadoop 集群控制面板,然后选择 JavaScript 控制台。
- JavaScript 控制台提供了一些实用方法,如 fs.put() 用于将文件上传到 HDFS。输入 help() 查看一些实用命令。
- 你可以从 JavaScript 运行Apache Pig命令。你可以认为 Pig 几乎是一个分布式的 LINQ,你一定会喜欢 Pig 命令,因为它们与 Linq 的概念相似。在本例中,我们不使用 Pig。
- 你可以使用 '#' 前缀从 JavaScript 控制台执行 Hadoop 文件系统命令——例如,#ls 用于列出文件,或者 #cat <filepath> 用于查看文件内容(温习你的 Unix/Linux 技能
)。
让我们将 mapper、reducer 和我们的 csv 数据文件上传到集群。在 JavaScript 控制台中,输入 fs.put(),浏览你的 Mapper 可执行文件,我将其上传到目标位置/so/bin/mapper.exe。
同样,你也可以
- 将 Reducer 可执行文件上传到位置/so/bin/reducer.exe。
- 将我们的 Stackoverflow CSV 数据文件上传到位置/so/data/data.csv。
有时,我发现 Web 上传器对大型数据集不起作用——在这种情况下,你可以从控制面板远程登录到你的节点,并使用 Hadoop 命令 Shell 将文件从本地文件系统复制到 HDFS。如果你的 CSV 文件已下载到你的集群 Name Node 的本地路径 d:\data.csv,你可以像这样发出 Hadoop FSShell 命令,将文件复制到所需位置。
hadoop fs -put d:\data.csv /so/data/data.csv
你可以通过在 JavaScript 控制台中发出命令#ls /so/bin和#ls /so/data来验证你的文件是否已准备好。现在你已经将你的好东西放在了集群中,让我们来执行 Job 吧。
创建和执行 Job
你可以从控制面板创建和执行 Job,或者直接从 Hadoop 命令 Shell 执行。为了简单起见,让我们通过控制面板创建一个新的 Job。你需要 hadoop-streaming.jar 文件来创建一个新的 streaming Job,因此你可以从 Hadoop Azure 控制面板的“Samples”部分下的C# Streaming 示例文件列表下载它。下载并准备好 hadoop-streaming.jar 文件。另外,请记下你的 Hadoop 节点 IP/URL,为此你可以通过在 JavaScript 控制台中发出以下命令来检查 core-site.xml 文件的内容
#cat file:///apps/dist/conf/core-site.xml
所以,现在让我们在 Hadoop for Azure 控制面板中创建 Job。在新的 Job 控制面板中,为文件、输入、输出、mapper 和 reducer 参数指定值。另外,选择 hadoop-streaming.jar 作为 JAR 文件。请确保使用你从上一步获得你的实际 IP/URL,而不是我的。
Hadoop jar hadoop-streaming.jar -files "hdfs://10.26.72.64:9000/so/bin/mapper.exe,hdfs://10.26.72.64:9000/so/bin/reducer.exe" -input "/so/data/data.txt" -output "/so/data/output" -mapper "mapper.exe" -reducer "reducer.exe"
如果遇到错误,请确保你提供的所有文件路径都正确,并且在 files 参数中的 URL 之间没有空格。如果一切顺利,你将看到以下结果屏幕。
现在,从 JavaScript 控制台检查下面的 Job 输出。js> #cat /so/data/output/part-00000你可以找到在我们的检查的帖子集中,这些命名空间被使用的次数。你可以对整个数据集执行此类任务,无论是 Stackoverflow 还是其他数据集——通过 Azure 和 Hadoop 实现你的分析或计算的并行化。
using ; 1 using B; 1 using BerkeleyDB; 1 using Castle.Core.Interceptor; 1 using Castle.DynamicProxy; 1 using ClassLibrary1; 1 using ClassLibrary2; 1 using Dates; 1 using EnvDTE; 2 using G.S.OurAutomation.Constants; 1 using G.S.OurAutomation.Framework; 2 using HookLib; 1 using Ionic.Zip; 1 using Linq; 1 using MakeAggregateGoFaster; 1 using Microsoft.Build.BuildEngine; 1 using Microsoft.Build.Framework; 2 using Microsoft.Build.Utilities; 2 using Microsoft.Office.Interop.Word; 1 using Microsoft.SharePoint.Administration; 1 using Microsoft.SqlServer.Management.Smo; 1 using Microsoft.SqlServer.Server; 1 using Microsoft.TeamFoundation.Build.Client; 1 using Microsoft.TeamFoundation.Build; 1 using Microsoft.TeamFoundation.Client; 1 using Microsoft.TeamFoundation.WorkItemTracking.Client; 1 using Microsoft.Web.Administration; 1 using Microsoft.Web.Management.Server; 1 using Microsoft.Xna.Framework.Graphics; 1 using Microsoft.Xna.Framework; 1 using Mono.Cecil.Cil; 1 using Mono.Cecil.Rocks; 1 using Mono.Cecil; 1 using NHibernate.SqlCommand; 1 using NHibernate; 1 using NUnit.Framework; 1 using Newtonsoft.Json; 1 using Spring.Context.Support; 1 using Spring.Context; 1 using StructureMap; 1 using System.Collections.Generic; 31 using System.Collections.ObjectModel; 2 using System.Collections; 3 using System.ComponentModel.DataAnnotations; 1 using System.ComponentModel.Design; 1 using System.ComponentModel; 7 using System.Configuration; 1 using System.Data.Entity; 1 using System.Data.Linq.Mapping; 1 using System.Data.SqlClient; 3 using System.Data.SqlTypes; 1 using System.Data; 4 using System.Diagnostics.Contracts; 3 using System.Diagnostics; 15 using System.Drawing.Design; 1 using System.Drawing; 3 using System.Dynamic; 3 using System.IO.Compression; 1 using System.IO.IsolatedStorage; 1 using System.IO; 11 using System.Linq.Expressions; 4 using System.Linq; 21 using System.Management; 1 using System.Net; 1 using System.Reflection.Emit; 6 using System.Reflection; 9 using System.Resources; 1 using System.Runtime.Caching; 1 using System.Runtime.CompilerServices; 3 using System.Runtime.ConstrainedExecution; 3 using System.Runtime.InteropServices.ComTypes; 3 using System.Runtime.InteropServices; 14 using System.Runtime.Remoting.Messaging; 1 using System.Runtime.Serialization; 1 using System.Security.Cryptography; 1 using System.Security.Principal; 1 using System.Security; 2 using System.ServiceModel; 1 using System.Text.RegularExpressions; 2 using System.Text; 17 using System.Threading.Tasks; 2 using System.Threading; 8 using System.Web.Caching; 1 using System.Web.Configuration; 1 using System.Web.DynamicData; 1 using System.Web.Mvc; 4 using System.Web.Routing; 1 using System.Web; 4 using System.Windows.Controls.Primitives; 1 using System.Windows.Controls; 4 using System.Windows.Forms.VisualStyles; 1 using System.Windows.Forms; 9 using System.Windows.Input; 3 using System.Windows.Media; 2 using System.Windows.Shapes; 1 using System.Windows.Threading; 1 using System.Windows; 4 using System.Xml.Serialization; 4 using System.Xml; 2 using System; 112 using WeifenLuo.WinFormsUI.Docking; 1 using base64; 1 using confusion; 1 using directives; 1 using mysql; 1 using pkg_ctx; 1 using threads; 1
结论
在上面的帖子中,我们研究了如何在 Azure 中设置 Hadoop 集群,更重要的是,如何将你的 C# Map Reduce 作业部署到 Hadoop 以完成有意义的事情。这里有巨大的潜力,你可以开始编写你的 C# Map/Reduce 作业来解决你组织的 Big Data 问题. 祝你编码愉快。
更新: 刚刚从 Stackoverflow 帖子中提取了 Top 500 MSDN 链接,详情请见此处。