当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:4038
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

storm starter学习(一)

我来了! 发表于 2年前 (2014-10-27 21:39:32)  |  评论(0)  |  阅读次数(509)| 0 人收藏此文章,   我要收藏   

    官方提供的storm starter示例中,有很多应用的例子,对storm的应用场景理解很有帮助。本文结合源码来进行功能分解,记录一下,作为记忆索引吧。

    先来看一个比较简单的示例:WordCountTopology,原版代码该示例是为了说明多语言适配而做的应用场景,主要功能是随机生成一些String,将这些String划分分组,统计各单词出现数量。后来修改了一下,去掉了py调用的地方。使用java来进行词组划分。

    先来看一下Topology:

public static void main(String[] args) throws Exception {
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("spout", new RandomSentenceSpout(), 5); // 数据源
	builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 单词划分
	builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",new Fields("word")); // 统计单词出现个数
	Config conf = new Config();
	conf.setDebug(true);

	if (args != null && args.length > 0) { 
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
					builder.createTopology());
	} else {
	    conf.setMaxTaskParallelism(3);
	    LocalCluster cluster = new LocalCluster();
	    cluster.submitTopology("word-count", conf, builder.createTopology());

	    Thread.sleep(10000);
	    cluster.shutdown();
	}
}

    RandomSentenceSpout功能很简单,随机生成字符串到tuple中,key为word。

    再看一下bolt,该示例中使用了两类分组方式shuffleGrouping(随机分组),fieldsGrouping(按字段分组)。使用shuffleGrouping来进行单词划分,为了保证单词统计时都在一个bolt中进行,使用fieldsGrouping来进行word划分统计,运行时会看到相同key值的tuple会分配到同一线程上。

public static class SplitSentence implements IBasicBolt {
	public void prepare(Map conf, TopologyContext context) {
	}

	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String sentence = tuple.getString(0);
		for (String word : sentence.split(" ")) { // 将Spout接收到的tuple按空格进行分解,产生单词数据流
			collector.emit(new Values(word));
		}

	}

	public void cleanup() {
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word")); // 定义key值
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}
    单词统计bolt:
public static class WordCount extends BaseBasicBolt {
	Map<String, Integer> counts = new HashMap<String, Integer>();

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String word = tuple.getString(0);
		Integer count = counts.get(word);
		if (count == null)
			count = 0;
		count++;
		counts.put(word, count);
		collector.emit(new Values(word, count)); // 输出结果word + word number
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count")); 
	}
}

    总结:

    通常情况下,为了保证数据可靠性与完整性,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,相当于自动处理了prepare方法和collector.emit.ack(inputTuple);

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183