65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 MapFiles 在 Hadoop Map-Reduce 中实现 Join

starIconstarIconstarIconemptyStarIconemptyStarIcon

3.00/5 (1投票)

2015年3月16日

CPOL

9分钟阅读

viewsIcon

18645

在 Map 阶段使用 MapFiles 的 Hadoop Map-Reduce 应用程序中实现 Join

背景

在我之前的文章 在 Hadoop Map-Reduce 中实现 Join 中,我介绍了在 Hadoop 中连接不同来源数据的概念,并提出了在 Map 阶段和 Reduce 阶段执行 Join 的技术。

这一次,我将讨论一种在 Map 阶段进行 Join 的替代技术:使用 MapFiles 进行 Join。在 Map 阶段进行 Join 速度更快,并且使用 MapFiles 克服了在 Map-Reduce 应用程序的 Map 阶段使用 Cache 文件进行 Join 的局限性。

Sequence Files 和 MapFiles

MapFiles 是 Hadoop 中一种支持对存储在 Sequence File 中的数据进行随机访问的 Sequence Files 类型。在继续深入之前,我将解释 Sequence Files。

Hadoop 中的 Sequence Files 直接读写到对象(而不是需要逐行读入 Text,然后在 Mapper 类中填充 Class 实例)。Sequence Files 使用 SequenceFileInputFormat 写入,并使用 SequenceFileOutputFormat 读取。Sequence Files 以二进制格式存储,并且存储在 Sequence Files 中的记录每隔几条记录就有同步标记,因此 Sequence Files 也是可分割的。除此之外,Sequence Files 支持记录级压缩和块级压缩。后者在每个压缩块之间都有同步标记。请查看 此链接 了解更多关于 Sequence Files 的细节。

现在回到 MapFile

MapFile 有两个部分:

1. 数据 (Data)

Data 文件是实际的 Sequence File,包含键值对形式的数据,并按键排序。Data 文件按键排序的事实由 MapFile.Writer 确保,当使用越序的键调用 MapFile.Writer.append 方法时,它会抛出 IOException

2. 索引 (Index)

Index 文件是一个较小的 Sequence File,其中包含 Data 文件中部分(或全部)键,按排序顺序排列,并带有键在 Data 文件中数据位置的字节偏移量。这个 Index 文件用于查找数据。

当执行查找时,会查找 Index 文件中的键。如果找到键,则使用字节偏移量从 Data 文件中提取数据;如果未找到确切的键,则使用前一个键(键已排序)的偏移量跳转到 Data 文件中该偏移量,然后顺序遍历找到键,并返回其对应的值。

Index 文件中的键的数量是可配置的,可以使用 setIndexInterval() 方法设置,该方法指定 Index 文件中每个键之间要跳过的键的数量。但是,应谨慎操作,因为值非常小可能会导致 Index 文件过大,这不利于 Map 阶段的 Join。

为了使用 MapFiles,键类应该继承自 WritableComparable 以支持键的排序。

MapFiles 也可以作为 Map-Reduce 程序的输入,使用 SequenceFileInputFormat,该格式会忽略 Index 文件并顺序读取 Data 文件。

请访问 此页面 了解更多关于 MapFiles 的信息。

在 Map 阶段进行 Join 的优势

此时,讨论在 Map-Reduce 应用程序的 Map 阶段而不是 Reduce 阶段进行 Join 的优缺点很重要。其缺点是需要 Join 的文件必须足够小才能放入内存,但使用 MapFiles 克服了这一限制,因为 MapFiles 是可分割的,可以压缩,并且取决于 IndexInterval 的值,它们相对较小。

在 Map 阶段进行 Join 更快,因为它跳过了耗时的排序和 Shuffle 阶段以及 Reduce 阶段。对于仅 Join 所提供数据的作业,如果它们是仅 Map 作业,速度会更快。

此外,由于 MapFiles 是 Sequence Files 的一种,如前所述是可压缩的,这进一步减小了其大小,使其成为 Cache 文件在需要 Join 的数据量相对较大时的合适替代品。这使得即使数据文件较大,也能获得 Map 阶段 Join 的优势。此外,由于 MapFiles 是已排序的,键的查找速度非常快,这一事实使得 MapFiles 非常适合快速 Join 数据。

本示例中实现的场景

我将使用上一篇文章中相同的数据文件,即 Microsoft 的 Adventure Works 数据。用于创建 Adventure Works Database 的 .csv 文件以及脚本文件可以从 此链接 下载。

下载 Adventure Works 2012 OLTP 脚本。在本示例中,使用了 .csv 文件 SalesOrderDetail.csvProducts.csv

Join 操作是在 SalesOrderDetail 数据和 Products 数据之间进行的,并且对于 SalesOrderDetail.csv 文件中的每一条记录,都会输出 Product 名称。这可以看作是以下 SQL 代码:

SELECT SalesOrderId, OrderQty, Product.Name, LineTotal
FROM SalesOrderDetail join Products on SalesOrderDetail.ProductId = Products.ProductId

从上面的 SQL 代码可以看出,本示例中的作业是一个仅 Map 作业。

初始项目设置

我将使用 Eclipse 和 Maven 来创建 Hadoop Map-Reduce 项目。在 Eclipse 中设置 Maven 插件的过程超出了本文的范围。

  • 打开 Eclipse,选择 New Project,然后选择 Maven Project。点击 Next
  • 选中复选框。Create a Simple Project.
  • 给项目命名。我将使用以下名称:
        Group Id: com.example.mapfileexample
        Artifact Id: MapFileExample
        Name: MapFileExample
  • 添加 Hadoop 依赖项。在 Eclipse 中,在项目结构(通常在左侧)中,选择 pom.xml 文件,在 Dependencies 选项卡中,选择 Add 按钮 并输入以下值:
        Group Id: org.apache.hadoop
        Artifact id: hadoop-client
        Version: 2.5.1
  • 创建名为 Driver 的类,这是此应用程序的主类。将以下代码粘贴到类中:
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.MapFile;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.MapFile.Writer.Option;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Driver extends Configured implements Tool {
    
        public int run(String[] allArgs) throws Exception {
            return 0;
        }
        
        public static void main(String[] args) throws Exception
        {
            
            Configuration conf = new Configuration();
            int output = ToolRunner.run(new Driver(), args);
        }
    }

创建和填充 MapFile 的代码

可以使用 MapFile.Writer 创建 MapFile。在本示例中,我将使用 FileInputStream 逐行读取 Products.csv 文件,并将其输出到 MapFile,其中 ProductId(第一列)作为键,Name(第二列)作为值。请注意,创建 MapFile 的这部分不在 map-reduce 范围之内,但 MapFile 可以是 MapReduce 作业的输出。

执行此操作的函数是:

private static void CreateMapFile(Configuration conf, String inputFilePath, 
String outputFilePath, int keyIndex, int valueIndex) throws IOException {
    Path outputLocation = new Path(outputFilePath);
    
    Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
    SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
    MapFile.Writer.setIndexInterval(conf, 1);
    MapFile.Writer writer = new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
        
    File file = new File(inputFilePath);
    FileInputStream fis = new FileInputStream(file);
    BufferedReader br = new BufferedReader(new InputStreamReader(fis));
    String line;
    br.readLine();
    int i = 0;
    while ((line = br.readLine()) != null){
        String[] lineItems = line.split("\\t");
        IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
        Text value = new Text(lineItems[valueIndex]);
        writer.append(key, value);
        i++;
    }
    br.close();
    writer.close();
}

从代码可以看出,该函数接受输入文件路径,并读取 Products.csv 文件,其中值以制表符分隔。我们将 IndexInterval 设置为 1,因为这是一个小文件,并且我们可以拥有大量键在 Index 文件中。

MapFile.Writer 构造函数接受 Configuration 实例、文件要创建的位置,以及用 org.apache.hadoop.io.SequenceFile.Writer.Option 实例包装的键类和值类。请注意,在语句中

Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);

Option 类是 org.apache.hadoop.io.SequenceFile.Writer.Option,它继承自 org.apache.hadoop.io.SequenceFile.Writer.Option 类。

这里,MapFile 的键是 IntWritable 的实例,值是 Text 的实例。接下来,创建 BufferedReader 并逐行读取文件。每行按 Tab (\t) 字符分割,并将第一列作为 key,第二列(名称列)作为 value。最后一步是调用 MapFile.Writer.append 方法将键和值追加到 MapFile

CreateMapFile 函数可以从 main 调用,如下所示:

createMapFile(conf, "/home/hduser/Desktop/sales_data/Product.csv",
"/home/hduser/Desktop/sales_data/ProductMapFile/", 0, 1);

请注意,此函数不是 Map 阶段的一部分,在作业执行期间不使用。应单独调用此方法来创建 MapFile,尽管我将其保留在 Driver 类中,但保留在此处并非必需。

在扩展 Map 类之前,最后一步是在注释掉对 ToolRunner.run 方法的调用后运行程序,并在 main 方法中调用上述调用。请记住将源文件和目标文件路径替换为您计算机上的实际路径。

运行文件后,打开目标文件夹以查看创建的 Index 和 Data 文件(MapFile 的一部分)。

Mapper (映射器)

Mapper 中,读取创建的 Map 文件,并使用 Product Ids 进行查找以获取 Product Names。这些 Product Names 然后从 map 函数输出。名为 MapFileExampleMapperMapper 类的代码如下:

public static class MapFileExampleMapper extends Mapper<LongWritable, Text, NullWritable, Text>
{
    private MapFile.Reader reader = null;
        
    public void setup(Context context) throws IOException{
        Configuration conf = context.getConfiguration();
        FileSystem fs = FileSystem.get(conf);
        Path dir = new Path(context.getCacheFiles()[0]);
        this.reader = new MapFile.Reader(fs, dir.toString(), conf);
    }
        
    private Text findKey(IntWritable key) throws IOException{
        Text value = new Text();
        this.reader.get(key, value);
        return value;
    }
        
    public void map(LongWritable key, Text value, Context context) 
                    throws IOException, InterruptedException{
        String[] values = value.toString().split("\\t");
        IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
        Text productName = findKey(productId);
            
        context.write(NullWritable.get(), new Text(values[0] + "," + 
                      values[3] +  ", " + productName.toString() + " , " + values[8]));       
    }
        
    public void cleanup(Context context) throws IOException{
        reader.close();
    }
}

Mapper 类的 setup 方法中检索 Configuration 中设置的 MapFile 位置。setup 方法为每个 Map 任务调用一次。在这里,使用 MapFile.Reader 实例读取 MapFile,该实例提供 get 方法来检索键对应的值。

接下来,在 Mapper 类 map 方法的每次调用中,将 Product Id(SalesOrderDetail.csv 文件中的第四列)传递给 findKey 方法,该方法使用 MapFile.Reader.get 方法搜索每个传递键的对应值。检索到的 Products 名称将成为 Mapper 类的输出的一部分。

run 方法

下一步是 run 方法,它将 reducers 的数量设置为零,因为这是一个仅 Map 作业。如果您之前从 main 调用了 CreateMapFile 方法,也请不要忘记将其删除。

run 方法的代码如下:

public int run(String[] allArgs) throws Exception {
    Job job = Job.getInstance(getConf());
    job.setJarByClass(Driver.class);
        
    job.setMapperClass(MapFileExampleMapper.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[2]));
    URI[] cacheFileURI = { new URI(args[1]) };
    job.setCacheFiles(cacheFileURI);
        
    job.setNumReduceTasks(0);
        
    boolean result = job.waitForCompletion(true);
        
    if (result)    {
        return 1;
    } else {
        return 0;
    }
}

它将传入的 MapFile 的路径设置为 cache 文件,该文件在 Mapper 类中使用。最后,是时候编译并运行作业了。

在本地模式下运行作业

从 Eclipse 中在本地模式下运行作业:

  • 构建项目,然后右键单击 Package Explorer 中的项目,选择 Run asRun Configurations
  • 右键单击 Java Run Configurations,然后选择 New
  • 为新的配置命名。我称之为 MapFileExample Java Run config。
  • Arguments 选项卡中,在 Program Arguments 字段中输入 SalesOrderDetails.csv 文件的路径、包含 Index 和 Data 文件的 MapFile 文件夹的路径以及输出文件夹的路径。在我的计算机上,如下所示:
    /home/hduser/Desktop/sales_data/SalesOrderDetail.csv 
    /home/hduser/Desktop/sales_data/ProductMapFile 
    /home/hduser/Desktop/sales_data/mapfileOutput1

  • 点击 Apply,然后点击 Run
  • 执行完成后,检查输出。

在集群上运行作业

首先,在 Hadoop 文件系统中为与此示例相关的项创建一个文件夹。我将文件夹命名为 mapfileexample

hadoop fs -mkdir /hduser/mapfileexample/

接下来,使用 copyFromLocal 命令将 SalesOrderDetail.csv 复制到其中:

hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/SalesOrderDetail.csv 
/hduser/mapfileexample/

接下来,复制 MapFile

hadoop fs -mkdir /hduser/mapfileexample/ProductMapFile
hadoop fs -copyFromLocal /home/hduser/Desktop/sales_data/ProductMapFile/* 
/hduser/mapfileexample/ProductMapFile/

然后,运行作业:

hadoop jar /home/hduser/Desktop/sales_data/MapFileExample-0.0.1-SNAPSHOT.jar Driver 
/hduser/mapfileexample/SalesOrderDetail.csv /hduser/mapfileexample/ProductMapFile 
/hduser/mapfileexample/output1

最后,将输出复制到本地文件系统,并检查输出。运行以下命令:

hadoop fs -copyToLocal /hduser/mapfileexample/output1/* 
/home/hduser/Desktop/sales_data/output/fromCluster/

打开 part-m-00000 文件,可以看到每个 Product 的名称都出现在结果输出的每一行中。

最终代码

最终的 Driver.java 文件代码如下:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer.Option;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Driver extends Configured implements Tool {
    
    public static class MapFileExampleMapper 
           extends Mapper<LongWritable, Text, NullWritable, Text>
    {
        private MapFile.Reader reader = null;
        
        public void setup(Context context) throws IOException{
            Configuration conf = context.getConfiguration();
            FileSystem fs = FileSystem.get(conf);
            Path dir = new Path(context.getCacheFiles()[0]);
            this.reader = new MapFile.Reader(fs, dir.toString(), conf);
        }
        
        private Text findKey(IntWritable key) throws IOException{
            Text value = new Text();
            this.reader.get(key, value);
            return value;
        }
        
        public void map(LongWritable key, Text value, Context context) 
                        throws IOException, InterruptedException{
            String[] values = value.toString().split("\\t");
            IntWritable productId = new IntWritable(Integer.parseInt(values[4]));
            Text productName = findKey(productId);
            
            context.write(NullWritable.get(), new Text(values[0] + "," + 
            values[3] +  ", " + productName.toString() + " , " + values[8]));
            
        }
        
        public void cleanup(Context context) throws IOException{
            reader.close();
        }
    }

    public int run(String[] allArgs) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(Driver.class);
        
        job.setMapperClass(MapFileExampleMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        URI[] cacheFileURI = { new URI(args[1]) };
        job.setCacheFiles(cacheFileURI);
        
        job.setNumReduceTasks(0);
        
        boolean result = job.waitForCompletion(true);
        
        if (result)
        {
            return 1;
        }
        else
        {
            return 0;
        }
    }
    
    public static void main(String[] args) throws Exception
    {
        
        Configuration conf = new Configuration();
        int output = ToolRunner.run(new Driver(), args);
        
        /*CreateMapFile(conf, "/home/hduser/Desktop/sales_data/Product.csv",
                "/home/hduser/Desktop/sales_data/ProductMapFile/", 0, 1);*/
    }
    
    private static void CreateMapFile(Configuration conf, 
                        String inputFilePath, String outputFilePath,
            int keyIndex, int valueIndex) throws IOException
    {
        Path outputLocation = new Path(outputFilePath);
        
        Option keyClass = (Option)MapFile.Writer.keyClass(IntWritable.class);
        SequenceFile.Writer.Option valueClass = MapFile.Writer.valueClass(Text.class);
        MapFile.Writer.setIndexInterval(conf, 1);
        MapFile.Writer writer = 
                new MapFile.Writer(conf, outputLocation, keyClass, valueClass);
        
        File file = new File(inputFilePath);
        FileInputStream fis = new FileInputStream(file);
        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
        String line;
        br.readLine();
        int i = 0;
        while ((line = br.readLine()) != null){
            String[] lineItems = line.split("\\t");
            IntWritable key = new IntWritable(Integer.parseInt(lineItems[keyIndex]));
            Text value = new Text(lineItems[valueIndex]);
            writer.append(key, value);
            i++;
        }
        br.close();
        writer.close();
    }
}

结论

本文解释了如何使用 MapFiles 执行 Map 阶段的 Join。在 Map 阶段而不是 Reduce 阶段执行 Join 通常很有用,可以提高作业速度。使用 MapFiles 进行 Join 的优势在于 MapFiles 是可分割的,它们可以被压缩,并且索引文件很小。所有这些因素都使 MapFiles 适合在 Map 阶段 Join 大量数据。

本文是我关于在 Hadoop Map-Reduce 作业中执行 Join 的前一篇关于该主题的文章的续篇,作为一项练习,此技术可以应用于上一篇文章中的示例。

欢迎提供反馈、建议和更正。

历史

  • 2015年3月16日:初始版本
© . All rights reserved.