摘要: MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的。本文将通过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map、reduce端的日志来验证所描述的处理流程的正确性。

概述

MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的。本文将通过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map、reduce端的日志来验证所描述的处理流程的正确性。

需求描述

输入数据:

1sort1    1
2sort2    3
3sort2    77
4sort2    54
5sort1    2
6sort6    22
7sort6    221
8sort6    20

目标输出

1sort1 1,2
2sort2 3,54,77
3sort6 20,22,221

解决思路

  1. 首先,在思考解决问题思路时,我们先应该深刻的理解MapReduce处理数据的整个流程,这是最基础的,不然的话是不可能找到解决问题的思路的。我描述一下MapReduce处理数据的大概简单流程:首先,MapReduce框架通过getSplit方法实现对原始文件的切片之后,每一个切片对应着一个map task,inputSplit输入到Map函数进行处理,中间结果经过环形缓冲区的排序,然后分区、自定义二次排序(如果有的话)和合并,再通过shuffle操作将数据传输到reduce task端,reduce端也存在着缓冲区,数据也会在缓冲区和磁盘中进行合并排序等操作,然后对数据按照Key值进行分组,然后没处理完一个分组之后就会去调用一次reduce函数,最终输出结果。大概流程我画了一下,如下图:
    image
  2. 具体解决思路
    (1) Map端处理:
    根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录合并,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对Key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的Key值和其对应的数据组合成一个新的Key值,然后新的Key值对应的还是之前的数字。那么我们就可以将原始数据的map输出变成类似下面的数据结构:
1{[sort1,1],1}
2{[sort2,3],3}
3{[sort2,77],77}
4{[sort2,54],54}
5{[sort1,2],2}
6{[sort6,22],22}
7{[sort6,221],221}
8{[sort6,20],20}

那么我们只需要对[]里面的新key值进行排序就ok了。然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,所以我们需要根据新key值中的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:

1Partition1:{[sort1,1],1}、{[sort1,2],2}
2Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
3Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}

分区操作完成之后,我调用自己的自定义排序器对新的Key值进行排序。

1{[sort1,1],1}
2{[sort1,2],2}
3{[sort2,3],3}
4{[sort2,54],54}
5{[sort2,77],77}
6{[sort6,20],20}
7{[sort6,22],22}
8{[sort6,221],221}

(2) Reduce端处理:
经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,并且没处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成类似下面的数据结构:

1{sort1,[1,2]}
2{sort2,[3,54,77]}
3{sort6,[20,22,221]}

具体实现

自定义组合键

 1package com.mr; 
 2import java.io.DataInput; 
 3import java.io.DataOutput; 
 4import java.io.IOException; 
 5import org.apache.hadoop.io.IntWritable; 
 6import org.apache.hadoop.io.Text; 
 7import org.apache.hadoop.io.WritableComparable; 
 8import org.slf4j.Logger; 
 9import org.slf4j.LoggerFactory; 
10/** 
11 * 自定义组合键 
12 */
13public class CombinationKey implements WritableComparable<CombinationKey>{ 
14    private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class); 
15    private Text firstKey; 
16    private IntWritable secondKey; 
17    public CombinationKey() { 
18        this.firstKey = new Text(); 
19        this.secondKey = new IntWritable(); 
20    } 
21    public Text getFirstKey() { 
22        return this.firstKey; 
23    } 
24    public void setFirstKey(Text firstKey) { 
25        this.firstKey = firstKey; 
26    } 
27    public IntWritable getSecondKey() { 
28        return this.secondKey; 
29    } 
30    public void setSecondKey(IntWritable secondKey) { 
31        this.secondKey = secondKey; 
32    } 
33    @Override
34    public void readFields(DataInput dateInput) throws IOException { 
35        // TODO Auto-generated method stub 
36        this.firstKey.readFields(dateInput); 
37        this.secondKey.readFields(dateInput); 
38    } 
39    @Override
40    public void write(DataOutput outPut) throws IOException { 
41        this.firstKey.write(outPut); 
42        this.secondKey.write(outPut); 
43    } 
44    /** 
45    * 自定义比较策略 
46    * 注意:该比较策略用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段, 
47    * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整) 
48    */
49    @Override
50    public int compareTo(CombinationKey combinationKey) { 
51        logger.info("-------CombinationKey flag-------"); 
52        return this.firstKey.compareTo(combinationKey.getFirstKey()); 
53    } 
54}

说明:在自定义组合键的时候,我们需要特别注意,一定要实现WritableComparable接口,并且实现compareTo方法的比较策略。这个用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整),但是其对我们最终的二次排序结果是没有影响的。我们二次排序的最终结果是由我们的自定义比较器决定的。

自定义分区器

 1package com.mr; 
 2import org.apache.hadoop.io.IntWritable; 
 3import org.apache.hadoop.mapreduce.Partitioner; 
 4import org.slf4j.Logger; 
 5import org.slf4j.LoggerFactory; 
 6/** 
 7 * 自定义分区 
 8 */
 9public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{ 
10    private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class); 
11    /** 
12    *  数据输入来源:map输出 
13    * @param key map输出键值 
14    * @param value map输出value值 
15    * @param numPartitions 分区总数,即reduce task个数 
16    */
17    @Override
18    public int getPartition(CombinationKey key, IntWritable value,int numPartitions) { 
19        logger.info("--------enter DefinedPartition flag--------"); 
20        /** 
21        * 注意:这里采用默认的hash分区实现方法 
22        * 根据组合键的第一个值作为分区 
23        * 这里需要说明一下,如果不自定义分区的话,mapreduce框架会根据默认的hash分区方法, 
24        * 将整个组合将相等的分到一个分区中,这样的话显然不是我们要的效果 
25        */
26        logger.info("--------out DefinedPartition flag--------"); 
27        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; 
28    } 
29}

说明:具体说明看代码注释。

自定义比较器

 1package com.mr; 
 2import org.apache.hadoop.io.WritableComparable; 
 3import org.apache.hadoop.io.WritableComparator; 
 4import org.slf4j.Logger; 
 5import org.slf4j.LoggerFactory; 
 6/** 
 7 * 自定义二次排序策略 
 8 */
 9public class DefinedComparator extends WritableComparator { 
10    private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class); 
11    public DefinedComparator() { 
12        super(CombinationKey.class,true); 
13    } 
14    @Override
15    public int compare(WritableComparable combinationKeyOne, 
16            WritableComparable CombinationKeyOther) { 
17        logger.info("---------enter DefinedComparator flag---------"); 
18                                                      
19        CombinationKey c1 = (CombinationKey) combinationKeyOne; 
20        CombinationKey c2 = (CombinationKey) CombinationKeyOther; 
21                                                      
22        /** 
23        * 确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序 
24        * 另外,这个判断是可以调整最终输出的组合键第一个值的排序 
25        * 下面这种比较对第一个字段的排序是升序的,如果想降序这将c1和c2反过来(假设1) 
26        */
27        if(!c1.getFirstKey().equals(c2.getFirstKey())){ 
28            logger.info("---------out DefinedComparator flag---------"); 
29            return c1.getFirstKey().compareTo(c2.getFirstKey()); 
30            } 
31        else{//按照组合键的第二个键的升序排序,将c1和c2倒过来则是按照数字的降序排序(假设2) 
32            logger.info("---------out DefinedComparator flag---------"); 
33            return c1.getSecondKey().get()-c2.getSecondKey().get();//0,负数,正数 
34        } 
35        /** 
36        * (1)按照上面的这种实现最终的二次排序结果为: 
37        * sort1    1,2 
38        * sort2    3,54,77 
39        * sort6    20,22,221 
40        * (2)如果实现假设1,则最终的二次排序结果为: 
41        * sort6    20,22,221 
42        * sort2    3,54,77 
43        * sort1    1,2 
44        * (3)如果实现假设2,则最终的二次排序结果为: 
45        * sort1    2,1 
46        * sort2    77,54,3 
47        * sort6    221,22,20 
48        */
49        } 
50}

说明:自定义比较器决定了我们二次排序的结果。自定义比较器需要继承WritableComparator类,并且重写compare方法实现自己的比较策略。

自定义分组策略

 1package com.mr; 
 2import org.apache.hadoop.io.WritableComparable; 
 3import org.apache.hadoop.io.WritableComparator; 
 4import org.slf4j.Logger; 
 5import org.slf4j.LoggerFactory; 
 6/** 
 7 * 自定义分组策略 
 8 * 将组合将中第一个值相同的分在一组 
 9 */
10public class DefinedGroupSort extends WritableComparator{ 
11    private static final Logger logger = LoggerFactory.getLogger(DefinedGroupSort.class); 
12    public DefinedGroupSort() { 
13        super(CombinationKey.class,true); 
14    } 
15    @Override
16    public int compare(WritableComparable a, WritableComparable b) { 
17        logger.info("-------enter DefinedGroupSort flag-------"); 
18        CombinationKey ck1 = (CombinationKey)a; 
19        CombinationKey ck2 = (CombinationKey)b; 
20        logger.info("-------Grouping result:"+ck1.getFirstKey(). 
21                compareTo(ck2.getFirstKey())+"-------"); 
22        logger.info("-------out DefinedGroupSort flag-------"); 
23        return ck1.getFirstKey().compareTo(ck2.getFirstKey()); 
24    } 
25}

主体程序实现

  1package com.mr; 
  2import java.io.IOException; 
  3import java.util.Iterator; 
  4import org.apache.hadoop.conf.Configuration; 
  5import org.apache.hadoop.conf.Configured; 
  6import org.apache.hadoop.fs.Path; 
  7import org.apache.hadoop.io.IntWritable; 
  8import org.apache.hadoop.io.Text; 
  9import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
 10import org.apache.hadoop.mapreduce.Job; 
 11import org.apache.hadoop.mapreduce.Mapper; 
 12import org.apache.hadoop.mapreduce.Reducer; 
 13import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
 14import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 15import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 16import org.apache.hadoop.util.Tool; 
 17import org.apache.hadoop.util.ToolRunner; 
 18import org.slf4j.Logger; 
 19import org.slf4j.LoggerFactory; 
 20/** 
 21 * 
 22 * 用途说明:二次排序mapreduce 
 23 * 需求描述: 
 24 * ---------------输入----------------- 
 25 * sort1,1 
 26 * sort2,3 
 27 * sort2,77 
 28 * sort2,54 
 29 * sort1,2 
 30 * sort6,22 
 31 * sort6,221 
 32 * sort6,20 
 33 * ---------------目标输出--------------- 
 34 * sort1 1,2 
 35 * sort2 3,54,77 
 36 * sort6 20,22,221 
 37 */
 38public class SecondSortMR extends Configured  implements Tool { 
 39    private static final Logger logger = LoggerFactory.getLogger(SecondSortMR.class); 
 40    public static class SortMapper extends Mapper<Text, Text, CombinationKey, IntWritable> { 
 41    //--------------------------------------------------------- 
 42        /** 
 43        * 这里特殊要说明一下,为什么要将这些变量写在map函数外边。 
 44        * 对于分布式的程序,我们一定要注意到内存的使用情况,对于mapreduce框架, 
 45        * 每一行的原始记录的处理都要调用一次map函数,假设,此个map要处理1亿条输 
 46        * 入记录,如果将这些变量都定义在map函数里边则会导致这4个变量的对象句柄编 
 47        * 程非常多(极端情况下将产生4*1亿个句柄,当然java也是有自动的gc机制的, 
 48        * 一定不会达到这么多),导致栈内存被浪费掉。我们将其写在map函数外边, 
 49        * 顶多就只有4个对象句柄。 
 50        */
 51        CombinationKey combinationKey = new CombinationKey(); 
 52        Text sortName = new Text(); 
 53        IntWritable score = new IntWritable(); 
 54        String[] inputString = null; 
 55    //--------------------------------------------------------- 
 56        @Override
 57        protected void map(Text key, Text value, Context context) 
 58                throws IOException, InterruptedException { 
 59            logger.info("---------enter map function flag---------"); 
 60            //过滤非法记录 
 61            if(key == null || value == null || key.toString().equals("") 
 62                    || value.equals("")){ 
 63                return; 
 64            } 
 65            sortName.set(key.toString()); 
 66            score.set(Integer.parseInt(value.toString())); 
 67            combinationKey.setFirstKey(sortName); 
 68            combinationKey.setSecondKey(score); 
 69            //map输出 
 70            context.write(combinationKey, score); 
 71            logger.info("---------out map function flag---------"); 
 72        } 
 73    } 
 74    public static class SortReducer extends
 75    Reducer<CombinationKey, IntWritable, Text, Text> { 
 76        StringBuffer sb = new StringBuffer(); 
 77        Text sore = new Text(); 
 78        /** 
 79        * 这里要注意一下reduce的调用时机和次数:reduce每处理一个分组的时候会调用一 
 80        * 次reduce函数。也许有人会疑问,分组是什么?看个例子就明白了: 
 81        * eg: 
 82        * {{sort1,{1,2}},{sort2,{3,54,77}},{sort6,{20,22,221}}} 
 83        * 这个数据结果是分组过后的数据结构,那么一个分组分别为{sort1,{1,2}}、 
 84        * {sort2,{3,54,77}}、{sort6,{20,22,221}} 
 85        */
 86        @Override
 87        protected void reduce(CombinationKey key, 
 88                Iterable<IntWritable> value, Context context) 
 89                throws IOException, InterruptedException { 
 90            sb.delete(0, sb.length());//先清除上一个组的数据 
 91            Iterator<IntWritable> it = value.iterator(); 
 92                                                    
 93            while(it.hasNext()){ 
 94                sb.append(it.next()+","); 
 95            } 
 96            //去除最后一个逗号 
 97            if(sb.length()>0){ 
 98                sb.deleteCharAt(sb.length()-1); 
 99            } 
100            sore.set(sb.toString()); 
101            context.write(key.getFirstKey(),sore); 
102            logger.info("---------enter reduce function flag---------"); 
103            logger.info("reduce Input data:{["+key.getFirstKey()+","+ 
104            key.getSecondKey()+"],["+sore+"]}"); 
105            logger.info("---------out reduce function flag---------"); 
106        } 
107    } 
108    @Override
109    public int run(String[] args) throws Exception { 
110        Configuration conf=getConf(); //获得配置文件对象 
111        Job job=new Job(conf,"SoreSort"); 
112        job.setJarByClass(SecondSortMR.class); 
113                                                
114        FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 
115        FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径 
116                                                                                                                                                                                    
117        job.setMapperClass(SortMapper.class); 
118        job.setReducerClass(SortReducer.class); 
119                                                
120        job.setPartitionerClass(DefinedPartition.class); //设置自定义分区策略 
121                                                                                                                                                                                    
122        job.setGroupingComparatorClass(DefinedGroupSort.class); //设置自定义分组策略 
123        job.setSortComparatorClass(DefinedComparator.class); //设置自定义二次排序策略 
124                                              
125        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置文件输入格式 
126        job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式 
127                                                
128        //设置map的输出key和value类型 
129        job.setMapOutputKeyClass(CombinationKey.class); 
130        job.setMapOutputValueClass(IntWritable.class); 
131                                                
132        //设置reduce的输出key和value类型 
133        job.setOutputKeyClass(Text.class); 
134        job.setOutputValueClass(Text.class); 
135        job.waitForCompletion(true); 
136        return job.isSuccessful()?0:1; 
137    } 
138                                            
139    public static void main(String[] args) { 
140        try { 
141            int returnCode =  ToolRunner.run(new SecondSortMR(),args); 
142            System.exit(returnCode); 
143        } catch (Exception e) { 
144            // TODO Auto-generated catch block 
145            e.printStackTrace(); 
146        } 
147                                                
148    } 
149}

这样便完成了自定义排序的需求,若相关项目有类似的自定义排序的需求,可参考上文修改自己的工程项目。


微信公众号

潘建锋

关于版权和转载

本文由 潘建锋 创作,采用 署名 4.0 国际 (CC BY 4.0) 国际许可协议进行授权。
本站文章除注明转载/出处外,均为本站原创或翻译,转载时请务必署名,否则,本人将保留一切追究责任的权利。
署名 4.0 国际 (CC BY 4.0)

转载规范

标题:MapReduce实现自定义二次排序
作者:潘建锋
原文:HTTPS://strikefreedom.top/mapreduce-customized-secondary-sort

关于留言和评论

如果您对本文《MapReduce实现自定义二次排序》的内容有任何疑问、补充或纠错,欢迎在下面的评论系统中留言,与作者一起交流进步,谢谢!(~ ̄▽ ̄)~