65.9K
CodeProject 正在变化。 阅读更多。
Home

构建推荐引擎 - 使用 Azure、Hadoop 和 Mahout 进行机器学习

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.74/5 (17投票s)

2013年7月14日

CPOL

13分钟阅读

viewsIcon

73155

进行一些“大数据”处理,并使用 Azure、Hadoop 和 Mahout 构建推荐引擎

今天想帮助某人吗?

image让我们来帮助 Stack Exchange 的用户根据他们的回答历史推荐他们可能可以回答的问题,这很像亚马逊根据您以前的购买历史推荐产品的方式。  如果您不知道 Stack Exchange 是做什么的——他们运行了许多问答网站,包括极其流行的 Stack Overflow。 

我们的目标是分析用户过去的回答,以预测他将来可能回答的问题。虽然 Stack Exchange 当前的推荐逻辑可能比我们的更好,但这不会阻止我们为了我们自己的学习目的而帮助他们;)

我们将执行以下任务。

  • 从 Stack Exchange 数据集中提取所需信息
  • 使用所需信息构建推荐器

但让我们先从基础开始。   如果您对 Apache Hadoop 和 Azure 上的 Hadoop 完全陌生,我建议您阅读这些入门文章,我在其中详细解释了 HDInsight 和 Map Reduce 模型。 

幕后花絮 

好了,让我们开始一些“数据科学”的魔术吧。太棒了!分布式机器学习主要用于

  • 推荐  - 还记得亚马逊推荐吗?通常用于根据历史预测偏好。
  • 聚类  - 用于查找将一组文档中相关的文档分组,或在社区中找到志同道合的人等任务。
  • 分类  - 用于识别新项目属于哪个类别。这通常包括先训练系统,然后要求系统检测一个项目。

“大数据”术语通常在需要对非常大的数据集执行操作时使用。在本文中,我们将从大型数据集中提取一些数据,并使用我们提取的数据构建一个推荐器。

什么是推荐器?

广义上讲,我们可以通过以下方式构建推荐器:

  • 根据其他用户(与他相似的用户)回答过的问题,找到用户可能感兴趣回答的问题。
  • 找到与他已回答的问题相似的其他问题。

image


第一种技术称为基于用户的推荐,第二种技术称为基于项目的推荐。  

在第一种情况下,品味可以通过您与该用户共同回答了多少问题来确定(两人都回答过的问题)。例如,考虑 User1、User2、User3 和 User4——回答了几个问题 Q1、Q2、Q3 和 Q4。此图显示了用户回答的问题。

根据上图,User1 和 User2 回答了 Q1、Q2 和 Q3——而 User2 回答了 Q2 和 Q3 但没有回答 Q1。现在,在某种程度上,我们可以安全地假设 User3 将会对他回答 Q1 感兴趣——因为与他一起回答 Q2 和 Q3 的两个用户已经回答了 Q1。这里有一些品味匹配,不是吗?  所以,如果您有一个 {UserId, QuestionId} 的数组——似乎这些数据足以让我们构建一个推荐器。

逻辑层面

那么,我们到底将如何构建一个问题推荐器呢?实际上很简单。

首先,我们需要找出在一对用户中,一对问题出现的次数。请注意,此矩阵与用户无关。例如,如果 Q1 和 Q2 一起出现 2 次(如上图所示),{Q1,Q2} 处的共现值为 2。这是共现矩阵(希望我没弄错)。

  • Q1 和 Q2 共现 2 次(User1 和 User2 回答了 Q1、Q2)
  • Q1 和 Q3 共现 2 次(User1 和 User2 回答了 Q1、Q2)
  • Q2 和 Q3 共现 3 次(User1、User2 和 User3 回答了 Q2、Q3)
  • 等等……

image

上面的矩阵只是捕获了一对问题共现(回答)的次数,如上所述。还没有与用户进行映射。现在,我们如何将其与查找用户的偏好联系起来?要找出某个问题与用户的“匹配”程度,我们只需要

  • 找出该问题与用户已回答的其他问题共现的频率。
  • 排除用户已回答的问题。

对于第一步,我们需要将上面的矩阵乘以用户的偏好矩阵。

例如,让我们看看 User3。对于 User3,他与问题 [Q1,Q2,Q3,Q4] 的偏好映射为 [0,1,1,0],因为他已经回答了 Q2 和 Q3,但没有回答 Q1 和 Q4。所以,让我们将其乘以上面的共现矩阵。请记住,这是矩阵乘法/点积。结果指示一个问题与用户已回答的其他问题共现的频率(权重)。

image

我们可以从结果中省略 Q2 和 Q3,因为我们知道 User 3 已经回答了它们。现在,在剩余的 Q1 和 Q4 中——Q1 的值(4)较高,因此与 User3 的品味匹配度更高。直观地说,这表明 Q1 与 User 3 已回答的问题(Q2 和 Q3)共现的次数比 Q4 与 Q2 和 Q3 共现的次数多——所以 User3 对回答 Q1 的兴趣会比对 Q4 的兴趣更大。在实际实现中,请注意,用户的品味矩阵将是一个稀疏矩阵(大部分为零),因为用户过去只会回答非常有限的问题。上述逻辑的优点是,我们可以使用分布式 Map Reduce 模型与多个 Map-Reduce 任务进行计算——构建共现矩阵,为每个用户查找点积等。

也许您应该看看我在这里的Map-Reduce 入门和示例

实施

从实施角度来看,

  1. 我们需要配置一个 Hadoop 集群。
  2. 我们需要下载并提取要分析的数据(Stack Overflow 数据)。
  3. 作业 1 - 提取数据 - 从每一行中,提取用户回答的所有问题的 {UserId, QuestionId}。
  4. 作业 2 - 构建推荐器 - 使用上述 Map Reduce 的输出构建推荐模型,其中为每个用户列出了可能的项目。

我们开始吧!

步骤 1 - 配置您的集群

请记住,Stack Exchange 数据量很大。因此,我们需要一个分布式环境来处理它。让我们前往Windows Azure。如果您还没有账户,请注册免费试用。现在,前往预览页面,并申请 HDInsight(Azure 上的 Hadoop)预览。

一旦您有了 HD Insight,就可以轻松创建 Hadoop 集群。我正在创建一个名为 stackanalyzer 的集群。

 

image

集群准备就绪后,您将在仪表板中看到“连接”和“管理”按钮(此处未显示)。单击“连接”按钮连接到集群的头节点,这将打开到头节点的远程桌面连接。您也可以单击“管理”按钮打开基于 Web 的管理仪表板。(如果您愿意,可以在此处了解更多关于 HD Insight 的信息

步骤 2 - 获取要分析的数据

连接到集群的头节点后,您可以下载 Stack Exchange 数据。您可以在Clear Bits 下的 Creative Commons 中下载 Stack Exchange 站点数据。我在头节点上安装了 Mu-Torrent 客户端,然后下载并解压了http://cooking.stackexchange.com/ 的数据——解压后的文件看起来像这样——一堆 XML 文件。

image

我们感兴趣的是 Posts XML 文件。每一行代表一个问题或一个回答。如果是一个问题,PostTypeId =1,如果是一个回答,PostTypeId=2。ParentId 代表回答的问题 ID,OwnerUserId 代表回答该问题的用户 ID。

<row Id="16" PostTypeId="2" ParentId="2" CreationDate="2010-07-09T19:13:37.540" Score="3"
     Body="&lt;p&gt;... shortenedforbrevity...  &lt;/p&gt;&#xA;"
     OwnerUserId="34" LastActivityDate="2010-07-09T19:13:37.540" />

所以,对我们而言,我们需要提取 PostTypeId=2(回答)的所有帖子的 {OwnerUserId, ParentId},这代表 {用户,问题,投票数}。稍后我们将使用的 Mahout Recommender 作业将采用这些数据,并构建推荐结果。

现在,提取这些数据本身是一项巨大的任务,当您考虑到 Posts 文件非常大时。对于 Cooking 站点,它不是那么大——但如果您正在分析整个 Stack Overflow,Posts 文件可能会有 GB 级别。为了提取这些数据,让我们利用 Hadoop 并编写一个自定义 Map Reduce 作业。

步骤 3 - 从转储中提取所需数据(用户,问题)

为了提取数据,我们将利用 Hadoop 来分布式处理。让我们编写一个简单的 Mapper。如前所述,我们需要找出 PostTypeId=2 的所有帖子的 {OwnerUserId, ParentId}。这是因为,我们稍后运行的 Recommender 作业的输入是 {user, item}。为此,首先将 Posts.XML 加载到 HDFS。您可以使用 hadoop fs 命令将本地文件复制到指定的输入路径。

image

现在,是时候编写一个自定义 Mapper 来为我们提取数据了。我们将使用 Hadoop On Azure .NET SDK 来编写我们的 Map Reduce 作业。  请注意,我们在配置部分指定了输入文件夹和输出文件夹。启动 Visual Studio,并创建一个 C# 控制台应用程序。如果您还记得我之前的文章,hadoop fs <yourcommand> 用于访问 HDFS 文件系统,了解一些 基本 *nix 命令,如 lscat 等会很有帮助。

注意:请参阅我之前关于 HDInsight 初步知识的文章,以了解更多关于 Map Reduce 模型和 Azure 上的 Hadoop 的信息。

您需要通过 Nuget 包管理器安装 Hadoop SDK for .NET 中的 Hadoop Map Reduce 包。

install-package Microsoft.Hadoop.MapReduce 

现在,这里有一些代码,我们将在其中

  • 创建 Mapper
  • 创建 Job
  • 将 Job 提交到集群

开始吧。

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Xml.Linq;
using Microsoft.Hadoop.MapReduce;

namespace StackExtractor
{

    //Our Mapper that takes a line of XML input and spits out the {OwnerUserId,ParentId,Score} 
    //i.e, {User,Question,Weightage}
    public class UserQuestionsMapper : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            try
            {
                var obj = XElement.Parse(inputLine);
                var postType = obj.Attribute("PostTypeId");
                if (postType != null && postType.Value == "2")
                {
                    var owner = obj.Attribute("OwnerUserId");
                    var parent = obj.Attribute("ParentId");
		   
                    // Write output data. Ignore records will null values if any
                    if (owner != null && parent != null )
                    {
                        context.EmitLine(string.Format("{0},{1}", owner.Value, parent.Value));
                    }
                }
            }
            catch
            {
                //Ignore this line if we can't parse
            }
        }
    }


    //Our Extraction Job using our Mapper
    public class UserQuestionsExtractionJob : HadoopJob<UserQuestionsMapper>
    {
        public override HadoopJobConfiguration Configure(ExecutorContext context)
        {
            var config = new HadoopJobConfiguration();
            config.DeleteOutputFolder = true;
            config.InputPath = "/input/Cooking";
            config.OutputFolder = "/output/Cooking";
            return config;
        }

       
    }

    //Driver that submits this to the cluster in the cloud
    //And will wait for the result. This will push your executables to the Azure storage
    //and will execute the command line in the head node (HDFS for Hadoop on Azure uses Azure storage)
    public class Driver
    {
        public static void Main()
        {
            try
            {
                var azureCluster = new Uri("https://{yoururl}.azurehdinsight.net:563");
                const string clusterUserName = "admin";
                const string clusterPassword = "{yourpassword}";

                // This is the name of the account under which Hadoop will execute jobs.
                // Normally this is just "Hadoop".
                const string hadoopUserName = "Hadoop";

                // Azure Storage Information.
                const string azureStorageAccount = "{yourstorage}.blob.core.windows.net";
                const string azureStorageKey =
                    "{yourstoragekey}";
                const string azureStorageContainer = "{yourcontainer}";
                const bool createContinerIfNotExist = true;
                Console.WriteLine("Connecting : {0} ", DateTime.Now);

                var hadoop = Hadoop.Connect(azureCluster,
                                            clusterUserName,
                                            hadoopUserName,
                                            clusterPassword,
                                            azureStorageAccount,
                                            azureStorageKey,
                                            azureStorageContainer,
                                            createContinerIfNotExist);

                Console.WriteLine("Starting: {0} ", DateTime.Now);
                var result = hadoop.MapReduceJob.ExecuteJob<UserQuestionsExtractionJob>();
                var info = result.Info;

                Console.WriteLine("Done: {0} ", DateTime.Now);
                Console.WriteLine("\nInfo From Server\n----------------------");
                Console.WriteLine("StandardError: " + info.StandardError);
                Console.WriteLine("\n----------------------");
                Console.WriteLine("StandardOut: " + info.StandardOut);
                Console.WriteLine("\n----------------------");
                Console.WriteLine("ExitCode: " + info.ExitCode);
            }
            catch(Exception ex)
            {
                Console.WriteLine("Error: {0} ", ex.StackTrace.ToString(CultureInfo.InvariantCulture)); 
            }
            Console.WriteLine("Press Any Key To Exit..");
            Console.ReadLine();
        }
    }


}

现在,编译并运行上述程序。ExecuteJob 将把所需的二进制文件上传到您的集群,并启动一个 Hadoop Streaming Job,该作业将在集群上运行我们的 Mappers,输入来自我们之前存储在 Input 文件夹中的 Posts 文件。我们的控制台应用程序将作业提交到云端,并等待结果。Hadoop SDK 将 Map Reduce 二进制文件上传到 blob,并构建执行作业所需的命令行(请参阅我之前的文章,了解如何手动执行此操作)。  您可以通过单击头节点上的桌面快捷方式中的 Hadoop Map Reduce 状态跟踪器来检查作业。

如果一切顺利,您将看到如下所示的结果。

image

如上所示,您可以在 /output/Cooking 文件夹中找到输出。如果您 RDP 到集群的头节点,并检查输出文件夹,现在您应该会看到我们的 Map Reduce 作业创建的文件。

image

正如预期的那样,文件包含提取的数据,这些数据代表 UserId、QuestionId——对于用户已回答的所有问题。如果您愿意,可以将数据从 HDFS 加载到 Hive,然后使用 ODBC for Hive 与 Microsoft Excel 一起查看。请参阅我之前的文章。

步骤 4 - 构建推荐器并生成推荐

下一步,我们需要构建共现矩阵并运行推荐器作业,将我们的 {UserId,QuestionId} 数据转换为推荐。幸运的是,我们不需要为此编写 Map Reduce 作业。我们可以利用 Mahout 库以及 Hadoop。 在此处了解 Mahout

RDP 到我们集群的头节点,因为我们需要安装 Mahout。下载 Mahout 的最新版本(撰写本文时为 0.7),并将其复制到集群头节点上的 c:\app\dist 文件夹。

image

Mahout 的 Recommender Job 支持多种算法来构建推荐——在本例中,我们将使用 SIMILARITY_COOCCURRENCE。Mahout 网站上的算法页面有更多关于推荐、聚类和分类算法的信息。我们将使用 /output/Cooking 文件夹中的文件来构建我们的推荐。

是时候运行 Recommender 作业了。创建一个 users.txt 文件,并将您需要推荐的用户 ID 放入该文件中,然后将其复制到 HDFS。

image

现在,以下命令应该会启动 Recommendation Job。请记住,我们将使用我们上面 Map Reduce 作业的输出文件作为 Recommender 的输入。让我们启动 Recommendation Job。这将为 users.txt 文件中指定的所有用户在 /recommend/ 文件夹中生成输出。您可以使用 –numRecommendations 开关来指定每个用户需要的推荐数量。如果用户和项目之间存在偏好关系(例如,用户播放歌曲的次数),您可以将推荐器的输入数据集保留为 {user,item,preferencevalue}——在本例中,我们省略了偏好权重。

注意:如果以下命令在重新运行时因输出目录已存在而失败,请尝试使用 hadoop fs –rmr temphadoop fs –rmr /recommend/ 删除 temp 文件夹和 output 文件夹。

hadoop jar c:\Apps\dist\mahout-0.7\mahout-core-0.7-job.jar 
	org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -s SIMILARITY_COOCCURRENCE 
	--input=/output/Cooking 
	--output=/recommend/ 
	--usersFile=/data/users.txt 

作业完成后,检查 /recommend/ 文件夹,并尝试打印生成文件中的内容。您应该会看到 users.txt 中用户 ID 的顶部推荐。

image

因此,推荐引擎认为 用户 1393 可能会回答 641916897 等问题,如果我们向他推荐的话。您可以尝试其他相似度类,如 SIMILARITY_LOGLIKELIHOOD、SIMILARITY_PEARSON_CORRELATION 等,以找到最佳结果。不断迭代和优化,直到您满意为止。

作为一个思想实验,这里还有另一个练习——检查 Stack Exchange 数据集,并找出如何构建一个推荐器来显示“您可能还喜欢”的问题,该推荐器基于用户收藏的问题?

结论

在此示例中,我们进行了大量的手动工作来将所需的输入文件上传到 HDFS,并手动触发 Recommender Job。实际上,您可以使用 Hadoop For Azure SDK 自动化整个工作流程。但这将是另一篇博文的内容,敬请关注。实际分析还有更多工作要做,包括编写 Map/Reducer 来提取数据并将其转储到 HDFS,自动化 Hive 表的创建,使用 HiveQL 或 PIG 执行操作等。但是,我们只是考察了使用 Azure、Hadoop 和 Mahout 进行有意义操作的步骤。

您也可以在您的移动应用或 ASP.NET Web 应用程序中访问这些数据,方法是使用 Sqoop 将其导出到 SQL Server,或者像我之前解释的那样,将其加载到 Hive 表中。 正如我之前解释的那样。祝您编码和机器学习愉快!另外,如果您对将现有应用程序与 HD Insight 连接以构建端到端工作流的场景感兴趣,请与我联系

我建议您继续阅读。

© . All rights reserved.