当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3961
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

Storm事务型(transactional)spout介绍(一)

我来了! 发表于 2年前 (2014-10-29 11:44:25)  |  评论(0)  |  阅读次数(619)| 0 人收藏此文章,   我要收藏   

针对目前主流的消息中间件都有分区的概念,Storm专门提供了backtype.storm.transactional.partitioned包。这里主要介绍一下IPartitionedTransactionalSpout IOpaquePartitionedTransactionalSpout 这两个接口。如果使用Storm 0.8之后版本的Trident新特性,对应的有storm.trident.spout包下面的IPartitionedTridentSpoutIOpaquePartitionedTridentSpout接口,用法和普通的事务型spout类似。

IPartitionedTransactionalSpoutIOpaquePartitionedTransactionalSpout都是把tuple封装成batch进行处理,同时可以保证每一个tuple都被完整地处理,都支持消息重发。为了支持事务性,它们为每一个批次(batch)提供一个唯一的事务IDtransaction idtxid),txid是顺序递增的,而且保证对批次的处理是强有序的,即必须完整处理完txid=1才能再接着处理txid=2

这里主要介绍一下二者的区别以及用法。IPartitionedTransactionalSpout的每一个tuple都会绑定在固定的批次中。无论一个tuple重发多少次,它都在同一个批次里面,都有同样的事务ID;一个tuple不会出现在两个以上的批次里。一个批次无论重发多少次,它也只有一个唯一且相同的事务ID,不会改变。这也就是说,一个批次无论重发多少次,它所包含的内容都是完全一致的。

但是IPartitionedTransactionalSpout会有一个问题,虽然这种问题非常罕见:假设一批消息在被bolt消费过程中失败了,需要spout重发,此时如果正巧遇到消息发送中间件故障,例如某一个分区不可读,spout为了保证重发时每一批次包含的tuple一致,它只能等待消息中间件恢复,也就是卡在那里无法再继续发送给bolt消息了,直至消息中间件恢复。IOpaquePartitionedTransactionalSpout可以解决这个问题,后面会详细介绍IOpaquePartitionedTransactionalSpout。通常情况下,IPartitionedTransactionalSpout已经可以满足大部分的事务型需求了。

以统计文本中词的出现次数为例:
假设目前统计的结果为:
man => [count=3]
dog => [count=4]
apple => [count=10]

新的一批次消息为:
["man"]
["man"]
["dog"]

也许这时候应该理所当然的得到新的统计结果:
man => [count=5]
dog => [count=5]
apple => [count=10]

其实没这么简单,也许该批次消费在写数据库时或其他地方出现了异常,会导致spout重发该批次消息,最终会导致重复计算。所以事务型应用最重要的一点是要判断一批消息是新的还是已来过的。IPartitionedTransactionalSpout通过为每个批次赋予一个唯一的事务ID来解决此问题。我们在统计词的出现次数时,除了保存count,还应该一起保存事务ID。例如假设目前的统计结果为:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

新的一批次txid=3的消息为:
["man"]
["man"]
["dog"]

计算逻辑应该是:如果保存的txid和发来的txid相等,说明是已处理过的消息,对这样的消息不做处理。如果保存txid和发过来的txid不等,说明是新的消息,对这样的消息做正常的计算。保证这个计算逻辑正确的基础正是txid的顺序递增以及处理批次的强有序特征。

那么这一批次处理后的结果应该是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

IPartitionedTransactionalSpout接口有两个嵌套类:

IPartitionedTransactionalSpout.Coordinator 
IPartitionedTransactionalSpout.Emitter <X

查看下面的文档可以很容易理解两个嵌套类的使用方法:

http://nathanmarz.github.io/storm/doc-0.8.1/index.html

重点解释一下IPartitionedTransactionalSpout.Emitter <X,这里面的X通常是我们自定义的一个存储在Zookeeper中的对象,通过backtype.storm.Config.registerSerialization方法进行注册。每批次X都会在ZK中更新一次。一般我们使用这个对象来保存中间件发送消息的偏移量。例如我们定义了一个这样的对象:

public class BatchMeta {
   public long offset; //
本批次的偏移量    
   public long nextOffset; //
下一批次的偏移量    
}

IPartitionedTransactionalSpout.Emitter <X>类的两个主要方法是:

X emitPartitionBatchNew(TransactionAttempt tx,
                       BatchOutputCollector collector,
                       int partition,
                       X lastPartitionMeta)

 

void emitPartitionBatch(TransactionAttempt tx,
                       BatchOutputCollector collector,
                       int partition,
                       X partitionMeta)

emitPartitionBatchNew用于发送一批新的消息,而如果这批消息消费失败了,emitPartitionBatch负责重发这批消息。

类型X可以使用我们自定义的BatchMetaemitPartitionBatchNew在处理发送逻辑时,要记录两个偏移量,一个是本批次开始的偏移量,存储在BatchMeta.offset中;一个是下一批次开始的偏移量,存储在BatchMeta.nextOffset中。emitPartitionBatchNew的第4个参数lastPartitionMeta其实就是emitPartitionBatchNew上一批次返回并保存在ZK中的BatchMetaemitPartitionBatchNew使用lastPartitionMeta.nextOffset做为本批次的开始偏移量。

emitPartitionBatch在重发消息时,第4个参数partitionMeta也是emitPartitionBatchNew本批次返回并保存在ZK中的BatchMetaemitPartitionBatch使用partitionMeta.offset做为本批次的开始偏移量,进行重发消息。
分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183