当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3868
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

一致性事务(3)

我来了! 发表于 2年前 (2014-10-29 11:00:08)  |  评论(0)  |  阅读次数(399)| 0 人收藏此文章,   我要收藏   

下面是UpdateGlobalCount类的定义:

public static class UpdateGlobalCount extends BaseTransactionalBolt 
 
implements ICommitter { 
 
        TransactionAttempt _attempt; 
 
        BatchOutputCollector _collector; 
 
        int _sum = 0; 
 
        @Override 
 
        public void prepare(Map conf, TopologyContext context, 
 
BatchOutputCollector collector, TransactionAttempt attempt) { 
 
            _collector = collector; 
 
            _attempt = attempt; 
 
        } 
 
        @Override 
 
        public void execute(Tuple tuple) { 
 
            _sum+=tuple.getInteger(1); 
 
        } 
 
        @Override 
 
        public void finishBatch() { 
 
            Value val = DATABASE.get(GLOBAL_COUNT_KEY); 
 
            Value newval; 
 
            if(val == null || !val.txid.equals(_attempt.getTransactionId())) { 
 
                newval = new Value(); 
 
                newval.txid = _attempt.getTransactionId(); 
 
                if(val==null) { 
 
                    newval.count = _sum; 
 
                } else { 
 
                    newval.count = _sum + val.count; 
 
                } 
 
                DATABASE.put(GLOBAL_COUNT_KEY, newval); 
 
            } else { 
 
                newval = val; 
 
            } 
 
            _collector.emit(new Values(_attempt, newval.count)); 
 
        } 
 
        @Override 
 
        public void declareOutputFields(OutputFieldsDeclarer declarer) { 
 
            declarer.declare(new Fields(“id“, “sum“)); 
 
        } 
 
} 


UpdateGlobalCount实现了ICommitter接口,所以storm只会在commit阶段执行finishBatch方法。而execute方法可以在任何阶段完成。

在UpdateGlobalCount的finishBatch方法中,将当前的transaction id与数据库中存储的id做比较。如果相同,则忽略这个batch;如果不同,则把这个batch的计算结果加到总结果中,并更新数据库。

Transactional Topolgy运行示意图如下:

transactional topology

下面总结一下Transactional Topology的一些特性

  •  Transactional Topology将事务性机制都封装好了,其内部使用CoordinateBolt来保证一个batch中的tuple被处理完。
  •  TransactionalSpout只能有一个,它将所有tuple分为一个一个的batch,而且保证同一个batch的transaction id始终一样。
  •  BatchBolt处理batch在一起的tuples。对于每一个tuple调用execute方法,而在整个batch处理完成的时候调用finishBatch方法。
  •  如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。
  •  Transactional Topology隐藏了anchor/ack框架,它提供一个不同的机制来fail一个batch,从而使得这个batch被replay。

5.2 Trident介绍

Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。如果你使用过Pig或Cascading,对这些接口就不会陌生。

Trident将stream中的tuples分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。

Trident事务性原理这里不详细介绍,有兴趣的读者请自行查阅资料。

参考:http://xumingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/

http://xumingming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bolt/

https://github.com/nathanmarz/storm/wiki/Trident-tutorial

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183