当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:4038
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

Hadoop 数据去重

我来了! 发表于 2年前 (2015-01-01 18:00:20)  |  评论(0)  |  阅读次数(469)| 0 人收藏此文章,   我要收藏   

                                                                          数据去重

1、原始数据
1)file1:


 


2012-3-1 a


2012-3-2 b


2012-3-3 c


2012-3-4 d


2012-3-5 a


2012-3-6 b


2012-3-7 c


2012-3-3 c 


 


     2)file2:


 


2012-3-1 b


2012-3-2 a


2012-3-3 b


2012-3-4 d


2012-3-5 a


2012-3-6 c


2012-3-7 d


2012-3-3 c 


    数据输出:
    2012-3-1 a


    2012-3-1 b


    2012-3-2 a


    2012-3-2 b


    2012-3-3 b


    2012-3-3 c


    2012-3-4 d


    2012-3-5 a


    2012-3-6 b


    2012-3-6 c


    2012-3-7 c


    2012-3-7 d




2、说明
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,
无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,
而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。





代码测试:

public class DeMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text val = new Text("");
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.trim().length()>0){
context.write(new Text(line.trim()),val );
}
}
}


public class DeReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {

context.write(key, NullWritable.get());
}
}


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception{
Configuration configuration = new Configuration();
Job job= new Job(configuration, "de-job");
job.setJarByClass(JobMain.class);

job.setMapperClass(DeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(DeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

Path outputDir = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(outputDir)){
fs.delete(outputDir, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
job.setNumReduceTasks(1);

System.exit(job.waitForCompletion(true)?0:1);



}


}



运行结果为:






分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183