65.9K
CodeProject 正在变化。 阅读更多。
Home

Mapreduce 上的 InputSplit 索引

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (4投票s)

2013年5月3日

CPOL

9分钟阅读

viewsIcon

27323

创建自定义索引以提高 Mapreduce 性能

引言

使用Mapreduce框架,雅虎公司能够在60秒内对1TB数据进行排序(1TB排序)。不幸的是,并非所有公司都有能力构建一个千节点的集群来实现近乎实时查询。假设你有一个10节点的集群,这已经是一个很好的大数据处理起点,对1TB数据进行排序大约需要1小时。考虑到节点数量有限的限制,可能需要重新组织HDFS上的数据(例如,分区、排序)和/或实现索引以提高整体性能。我将在本报告中描述我创建的基于Hadoop InputSplits的自定义索引的实现,我遇到的问题,以及最终与非索引Mapreduce作业相比所带来的价值(如果有的话)。

背景

读者应该理解Hadoop、HDFS和mapreduce的概念,并应具有使用Hadoop New API实现基本Mapper和Reducer类的经验。如果你需要巩固知识,Tom White的著名权威指南将非常有用。

Input Split

一旦你提交了一个mapreduce作业,JobTracker将计算你的数据集需要执行的Map任务数量。这个数量取决于你的HDFS中可用的文件数量、文件大小以及块大小。任何文件实际上都被分成一个或多个InputSplit,它基本上是1个或多个块的组合。据我理解,每个InputSplit都会获得一个专用的mapper实例,因此mapper的数量实际上应直接与InputSplit的数量相关。

一个InputSplit基于3个不同的值

  1. 文件名
  2. 偏移量(InputSplit的起始位置)
  3. 长度(InputSplit的结束位置)

InputSplittoString()方法将返回以下模式

dfs://server.domain:8020/path/to/my/file:0+100000

通过使用MD5Hash哈希该值,你可以获得一个唯一标识任何InputSplit的ID。

911c058fbd1e60ee710dcc41fff16b27

主要假设:只要数据集不变,JobTracker将始终返回完全相同的InputSplit

索引

我们可以识别3个不同的级别:文件(File)、InputSplit和块(Block)。理论上,至少有3种不同的创建索引的方式:基于文件URI的索引、基于InputSplit的索引,或基于块的索引。我必须承认,我非常害怕手动处理块,所以我主要只关注文件和InputSplit索引。

让我们看两个不同的数据集示例。

在第一个示例中,你的数据集中有2个文件,它们包含25个块,并被识别为7个不同的InputSplit。你要查找的目标(灰色高亮)位于文件#1(块#2、#8和#13)以及文件#2(块#17)上。

  • 使用基于文件的索引,你将得到2个文件(这里是完整数据集),这意味着你的索引查询将等同于全扫描查询。
  • 使用基于InputSplit的索引,你将得到7个可用InputSplit中的4个。性能肯定会比执行全扫描查询好。

让我们看第二个例子。这一次,相同的数据集已按你要索引的列进行了排序。你要查找的目标(灰色高亮)现在位于文件#1(块#1、#2、#3和#4)上。

  • 使用基于文件的索引,你将只得到数据集中的1个文件。
  • 使用基于InputSplit的索引,你将得到7个可用InputSplit中的1个。

对于这项具体研究,我决定使用基于自定义InputSplit的索引。我认为这种方法在实现难度、可能带来的性能优化价值以及无论数据如何分布的预期适用性之间能取得很好的平衡。

实现

实现索引的流程很简单,包括以下3个步骤:

  1. 从你的完整数据集中构建索引
  2. 查询索引以获取你要查找的值的InputSplit(s)
  3. 仅在已索引的InputSplit上执行实际的mapreduce作业

只要完整数据集不变,第一步只需要执行一次。

构建索引

构建Mapreduce索引可能需要很长时间才能完成,因为你要将要索引的每个值与其实际的InputSplit位置一起输出。这里需要估计的关键参数是使用的Reducer数量。使用单个reducer,你的所有索引将写入单个文件,但将所有数据从mapper复制到一个reducer所需的时间肯定太长。使用数千个reducer,你将把索引输出到数千个不同的文件中,但这可能会显著加快执行速度。我认为,正确的值是在你的集群中可用的reduce插槽数量和你的最终索引的预期大小之间取得平衡。为了准确估计,我对我数据集的10%进行了简单随机抽样(SRS)。如果你期望一个100GB大的索引,你可以设置多达约50个reducer,这样你将得到50个2GB大小的文件。

Mapper类

使用Hadoop上下文,你可以从正在运行的mapper实例中检索当前的InputSplit。我提取我要索引的值,并将其与InputSplit的MD5哈希一起输出。

public class RebuildIndexMapper extends Mapper<Object, Text, Text, Text> {
    
    private String splitId;
    
    public void setup(Context context) {
        // Retrieve current inputSplit from Context
        InputSplit is = context.getInputSplit();
        splitId = MD5Hash.digest(is.toString()).toString();
    }

    public void map(Object object, Text line, Context context) {
        // Retrieve key to index
        String key = Utils.getValueToIndex(line.toString());
        // Output value to index + current Split Id
        context.write(new Text(key), new Text(splitId));
    }
}

假设你正在索引IP地址,中间键值对如下。来自第一个mapper:

192.168.0.1     60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     60390c7e429e38e8449519011a24f79d
192.168.0.1     60390c7e429e38e8449519011a24f79d

来自第二个mapper:

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4

Combiner类

由于你将为数据集中的每一行输出键/值对,使用一个可以去除重复项的Combiner可能非常有用。实现非常直观,此处不作描述。

Reducer类

Reducer的目标只是获取任何索引值的不同InputSplit,并将它们输出到1行中。

public class RebuildIndexReducer extends Reducer<Text, Text, Text, Text> {
    
    public void reduce(Text key, Iterable<text> values, Context context) {
        
        // Remove duplicated SplitId for same target
        List<string> list = new ArrayList<string>();
        for (Text value : values) {
               String str = value.toString();
               if (!list.contains(str)) {
                    list.add(str);
               }
        }

        // Concatenate distinct SplitId
        StringBuilder sb = new StringBuilder();
        for (String value : list) {
             sb.append(value);
             sb.append(",");
        }

        context.write(key, new Text(sb.toString()));
    }
}

给定与前面相同的示例,最终输出将如下所示:

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

由于索引输出通常很大,使用SequenceOutputFile可能非常有帮助。

查询索引

每次你需要为给定的索引值(在我示例中的IP地址)执行mapreduce作业时,你必须首先查询你在上一步中创建的索引,以检索该值所属的不同InputSplit

Mapper类

Map任务很简单。对于每个匹配你正在查找的目标的索引值,输出所有其索引的InputSplit

public class FetchIndexMapper extends Mapper<text,> {

    private String indexLookup;

    public void setup(Context context) {
        // Get the value to look up
        indexLookup = context.getConfiguration().get("target.lookup");
    }

    public void map(Text indexKey, Text indexValue, Context context) {
        
        String strKey = indexKey.toString();
        if (!strKey.equals(indexLookup)) {
            // Ignore record if it does not match target
            return;
        } else {
            for (String index : indexValue.toString().split(",")) {
                // Output each single InputSplit location
                context.write(new Text(index), NullWritable.get());
            }
        }
    }
}

Reducer类

Reduce任务的目的只是去除重复项。实现非常直观,此处不作描述。给定与前面相同的示例:

192.168.0.1     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.2     60390c7e429e38e8449519011a24f79d
192.168.0.3     ccc6decd3d361c3d651807a0c1a665e4,60390c7e429e38e8449519011a24f79d
192.168.0.5     ccc6decd3d361c3d651807a0c1a665e4
192.168.0.6     ccc6decd3d361c3d651807a0c1a665e4

对IP地址192.168.0.1进行索引查询将输出以下内容:

ccc6decd3d361c3d651807a0c1a665e4
60390c7e429e38e8449519011a24f79d

这些InputSplit的MD5Hash应该写在HDFS的temp文件夹中的某个位置,以便在实际的mapreduce作业(下一节)中读取。如果文件很大,使用SequenceOutputFormat——再次——会非常有用。

执行你的mapreduce作业

现在我们已经构建了索引表并检索了目标值的实际InputSplit(s),是时候设置实际的mapreduce作业了。

自定义FileInputFormat

使用默认配置,Hadoop能够使用FileInputFormat类检索要使用的InputSplit的数量。我们将创建自己的FileInputFormat类,继承默认类,并覆盖其getSplits()方法。你必须读取你在上一步创建的文件,将所有已索引的InputSplit添加到列表中,然后将此列表与超类返回的列表进行比较。你将只向JobTracker返回在你的索引中找到的InputSplit

public class IndexFileInputFormat extends FileInputFormat<LongWritable, Text> {
    
.../...

    @Override
    public List getSplits(JobContext job) throws IOException {

        // Retrieve all default InputSplits
        List<InputSplit> totalIs = super.getSplits(job);    
        
        // Keep only the InputSplits matching our indexed InputSplits
        List<InputSplit> indexedIs = Utils.removeNonIndexedIS(totalIs);
        
        // Return list of Indexed InputSplits
        return indexedIs;
        
    }
}

在某个Utils类中

public static List removeNonIndexedIS(List<InputSplit> totalIs){
    
    // Read your previous file and list all your indexed MD5 hash
    List<string> md5Is = readInputSplitsFromFile();    
    
    // Initialize new list of InputSplit
    List<InputSplit> indexedIs = new ArrayList<InputSplit>();
    
    // Filter out InputSplit that are not found in our indexed list
    for (InputSplit split : totalIs) {
        String str = MD5Hash.digest(split.toString()).toString();
        if (md5Is.contains(str)) {
            indexedIs.add(split);
        }
    }    
    
    // Return list of input Split to query
    // Return empty list if target does not exist
    return indexedIs;
    }    
}

Driver代码

我们现在必须在驱动程序代码中使用这个IndexFileInputFormat类,而不是默认的(FileInputFormat)。在JobTracker初始化期间,Hadoop将仅使用我们指定的InputSplits,最终的map任务数量将少于“全扫描查询”所需的数量。

public class TestIndex(){

    public void main(String[] args) {
    
    .../...
    
    // Create a new Job and configuration
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    
        // Configure your mapreduce Job
    job.setJarByClass(MyCustomMapper.class);
    job.setMapperClass(MyCustomMapper.class);
    job.setReducerClass(MyCustomReducer.class);
    
    // Use our custom IndexFileInputFormat
    job.setInputFormatClass(IndexFileInputFormat.class);
    
    .../...
    
    job.waitForCompletion(true);    
    }
}

测试

测试环境

对于这项具体测试,我设置了一个包含3个虚拟节点(Virtualbox)的小型集群,描述如下。Hadoop集群是从Cloudera Manager 4.5.2(免费版)安装的。

宿主

OS: Mac OS X 10.7.5
Processor: Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz
Memory: 16Gb

1个Namenode + JobTracker

OS: Ubuntu server 12.04.2 LTS
memory : 5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0

2个Datanodes + TaskTrackers

OS: Ubuntu server 12.04.2 LTS
memory : 2.5Gb
storage : 50Gb
jdk: Java SDK 1.6.0_31
Hadoop 2.0.0-cdh4.2.0

测试数据

我使用一个简单的perl脚本生成了一个测试数据集。虽然30GB数据远远达不到真正的大数据环境应有的规模,但我认为这个数据集足够大,可以显示出使用索引带来的任何潜在改进。

30Gb
100 bytes per records
1Gb per file

我生成了5,000,000个不同的目标(IP地址),并将它们随机分布在这30,000,000条记录中。平均而言,同一个IP地址会出现60次。

结果

我对我的数据集的5GB、10GB、15GB、20GB和25GB的子集进行了完全相同的索引和非索引作业的多次测试。因为我的索引值分布均匀,我预计我的大部分数据将遵循线性趋势。

我在下面的图表中表示了实际索引大小与数据集大小的关系。正如预期的那样,索引大小呈线性增长,其重建执行时间也随之增长。对于25GB的数据集,我的索引大小约为7.5GB(占30%),并且已在约30分钟内完全重建。

我在下面的图表中表示了索引和非索引作业的执行时间随数据集大小的变化。这显然是每个人对索引的期望:数据集越大,索引查询的速度就越快(与非索引查询成比例)。尽管我的测试环境非常小,但我已经能够看到索引为mapreduce性能带来的巨大潜力。

在上一张图中,我没有考虑重建索引所需的时间。尽管这只执行一次(例如,每天一次),但这个过程非常繁重,可能需要很长时间才能完成。在预测索引性能时必须考虑到这一点。假设你每天只构建一次索引,然后执行10个mapreduce作业,所有索引查询的总执行时间将是:

Time total = rebuild_time + 10 x (indexed_mapreduce_time)

这正是我在下面的图中表示的。在25GB数据集上,执行X个每日请求(使用索引和非索引查询)所需的时间。

对于我的特定测试环境,我每天至少需要执行5个mapreduce作业才能从使用自定义索引中获益。

结论

Mapreduce索引的性能很大程度上取决于你的数据分布,并且可能非常强大,尤其是在处理大型数据集时。如果你需要预测此实现对你的特定用例有多强大,我建议你在生产环境中使用生产数据子集进行基准测试。为此,你可以对简单随机抽样进行几次测试,以便能够将这些结果推断到你的整个数据集。

历史

  • 2013年5月3日:初始版本
© . All rights reserved.