当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3868
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

mapreduce Top K算法

我来了! 发表于 2年前 (2015-01-01 18:13:40)  |  评论(0)  |  阅读次数(610)| 0 人收藏此文章,   我要收藏   

                                                                                    mapreduce Top K算法

1、需求分析


#orderid,userid,payment,productid
[root@x00 hd]# cat seventeen_a.txt
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
[root@x00 hd]# cat seventeen_b.txt
100,3333,10,100
101,9321,1000,293
102,3881,701,20
103,6791,910,30
104,8888,11,39


预测结果:(求 Top N=5 的结果)
1 9000
2 2000
3 1234
4 1000
5 910


public class TopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
     int len;
     int top[];
@Override
protected void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException {
String line = value.toString();
//1,9819,100,121
if(line.length()>0){
//根据逗号分割
String []splited = line.split(",");
if(splited.length==4){
int payment = Integer.parseInt(splited[2]);
add(payment);
}
}

}
    
private void add(int payment){
top[0] = payment;
Arrays.sort(top);
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for(int i =1;i<len+1;i++){
context.write(new IntWritable(top[i]), new IntWritable(top[i]));
}

}


//初始化
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
len = context.getConfiguration().getInt("N", 10);
top = new int[len+1];
}



}


public class TopNReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {


int len ;
int[] top;
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
add(key.get());
}

private void add(int payment){
top[0] = payment;
Arrays.sort(top);
}


@Override
protected void setup(Context context)
throws IOException, InterruptedException {
len = context.getConfiguration().getInt("N", 10);
top = new int[len+1];
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for(int i = len;i>0;i--){
context.write(new IntWritable(len-i+1), new IntWritable(top[i]));
}
}
}


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();

configuration.setInt("N", Integer.parseInt(args[2]));

Job job = new Job(configuration,"topn_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(TopNMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(TopNReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);


}


}


运行:hadoop jar four.jar  com.hadoop.six.JobMain /user/six /user/six/out 5




分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183