数据去重:
数据去重,只是让出现的数据仅一次,所以在reduce阶段key作为输入,而对于values-in没有要求,即输入的key直接作为输出的key,并将value置空。具体步骤类似于wordcount:

Tip:输入输出路径配置。

 1import java.io.IOException;
 2import org.apache.hadoop.conf.Configuration;
 3import org.apache.hadoop.fs.Path;
 4import org.apache.hadoop.io.Text;
 5import org.apache.hadoop.mapreduce.Job;
 6import org.apache.hadoop.mapreduce.Mapper;
 7import org.apache.hadoop.mapreduce.Reducer;
 8import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10import org.apache.hadoop.util.GenericOptionsParser;
11
12public class Dedup {
13
14	/**
15	 * @param XD
16	 */
17	public static class Map extends Mapper<Object,Text,Text,Text>{
18		private static Text line = new Text();
19		//map function
20		public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
21			line  =  value;
22			context.write(line, new Text(""));
23		}
24	}
25	public static class Reduce extends Reducer<Text,Text,Text,Text>{
26		public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
27			context.write(key, new Text(""));
28		}
29	}
30	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
31		// TODO Auto-generated method stub
32		//初始化配置
33		Configuration conf = new Configuration();
34		
35		/*类比与之前默认的args,只是在程序中实现配置,这样不必去eclipse的arguments属性添加参数,
36		**但是认为作用一样根据个人喜好设置,如下图所示:
37		*/
38		//设置输入输出路径
39		String[] ioArgs = new String[]{"hdfs://localhost:9000/home/xd/hadoop_tmp/DedupIn",
40															"hdfs://localhost:9000/home/xd/hadoop_tmp/DedupOut"};
41		
42		String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
43		
44		if(otherArgs.length!=2){
45			System.err.println("Usage:Data Deduplication <in> <out>");
46			System.exit(2);
47		}
48		//设置作业
49		Job job = new Job(conf,"Dedup Job");
50		job.setJarByClass(Dedup.class);
51		
52		//设置处理map,combine,reduce的类
53		job.setMapperClass(Map.class);
54		job.setCombinerClass(Reduce.class);
55		job.setReducerClass(Reduce.class);
56		
57		//设置输入输出格式的处理
58		job.setOutputKeyClass(Text.class);
59		job.setOutputValueClass(Text.class);
60		
61		//设定路径
62		FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
63		FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
64		/*
65		 * 对应于自动的寻找路径
66		 * FileInputFormat.addInputPath(job,new Path(args[0]));
67		 * FileOutputFormat.setOutputPath(job,new Path(args[1])); 
68		 * */
69		
70		job.waitForCompletion(true);
71		
72		//打印相关信息
73		System.out.println("任务名称: "+job.getJobName());
74		System.out.println("任务成功: "+(job.isSuccessful()?"Yes":"No"));	
75	}
76}

image

数据排序:

数据排序的时候,在map的阶段已经处理好了, 只是reduce在输出的时候用行号去标记一下,样例如下:

 1import java.io.IOException;
 2import org.apache.hadoop.conf.Configuration;
 3import org.apache.hadoop.fs.Path;
 4import org.apache.hadoop.io.IntWritable;
 5import org.apache.hadoop.io.Text;
 6import org.apache.hadoop.mapreduce.Job;
 7import org.apache.hadoop.mapreduce.Mapper;
 8import org.apache.hadoop.mapreduce.Reducer;
 9import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11import org.apache.hadoop.util.GenericOptionsParser;
12
13public class DataSort {
14
15	/**
16	 * @param XD
17	 */
18	public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{
19		private static IntWritable data = new IntWritable();
20		public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
21			String line = value.toString();
22			data.set(Integer.parseInt(line));
23			context.write(data, new IntWritable(1));
24		}
25	}
26	public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
27		private static IntWritable linenum = new IntWritable(1);
28		public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
29			for(IntWritable val:values){
30				context.write(linenum,key);
31				linenum = new IntWritable(linenum.get()+1);
32			}
33		}
34	}
35	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
36		// TODO Auto-generated method stub
37		//初始化配置
38		Configuration conf = new Configuration();
39		
40		/*类比与之前默认的args,只是在程序中实现配置,这样不必去eclipse的arguments属性添加参数,
41		**但是认为作用一样根据个人喜好设置,如下图所示:
42		*/
43		//设置输入输出路径
44		String[] ioArgs = new String[]{"hdfs://localhost:9000/home/xd/hadoop_tmp/Sort_in",
45															"hdfs://localhost:9000/home/xd/hadoop_tmp/Sort_out"};
46		
47		String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
48		
49		if(otherArgs.length!=2){
50			System.err.println("Usage:Data Deduplication <in> <out>");
51			System.exit(2);
52		}
53		//设置作业
54		Job job = new Job(conf,"Datasort Job");
55		job.setJarByClass(DataSort.class);
56		
57		//设置处理map,reduce的类
58		job.setMapperClass(Map.class);
59		job.setReducerClass(Reduce.class);
60		
61		//设置输入输出格式的处理
62		job.setOutputKeyClass(IntWritable.class);
63		job.setOutputValueClass(IntWritable.class);
64		
65		//设定路径
66		FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
67		FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
68		/*
69		 * 对应于自动的寻找路径
70		 * FileInputFormat.addInputPath(job,new Path(args[0]));
71		 * FileOutputFormat.setOutputPath(job,new Path(args[1])); 
72		 * */	
73		job.waitForCompletion(true);
74		
75		//打印相关信息
76		System.out.println("任务名称: "+job.getJobName());
77		System.out.println("任务成功: "+(job.isSuccessful()?"Yes":"No"));	
78	}
79}

结果:

 11    1
 2
 32    2
 4
 53    3
 6
 74    4
 8
 95    4
10
116    5
12
137    7
14
158    10
16
179    11
18
1910    15
20
2111    23
22
2312    45
24
2513    56

微信公众号

潘建锋

关于版权和转载

本文由 潘建锋 创作,采用 署名 4.0 国际 (CC BY 4.0) 国际许可协议进行授权。
本站文章除注明转载/出处外,均为本站原创或翻译,转载时请务必署名,否则,本人将保留一切追究责任的权利。
署名 4.0 国际 (CC BY 4.0)

转载规范

标题:mapreduce之数据去重和数据排序实例
作者:潘建锋
原文:HTTPS://strikefreedom.top/mapreduce-de-duplication-and-sort

关于留言和评论

如果您对本文《mapreduce之数据去重和数据排序实例》的内容有任何疑问、补充或纠错,欢迎在下面的评论系统中留言,与作者一起交流进步,谢谢!(~ ̄▽ ̄)~