当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3947
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

storm集群 + kafka单机性能测试

我来了! 发表于 2年前 (2014-10-27 21:52:46)  |  评论(0)  |  阅读次数(5065)| 0 人收藏此文章,   我要收藏   

    storm与kafka单机功能整合很顺利,但是到了storm集群环境和数据处理性能时则出现了一些问题,现将测试过程和问题简单记录如下:

    性能指标:每分钟处理至少100万的信息(csv格式,100bytes左右),信息解析后持久化到DB中。

    架构设计:flume读取文件缓存到kafka队列后消费到storm中

    问题:

    一、storm集群任务调度时出现如下问题,具体日志见下:

2014-09-24 16:47:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-computer7-62/ip:6706... [8]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-computer7-62/ip:6706, [id: 0x0b596170, /ip:34836 => computer7-62/ip:6706]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-computer7-60/ip:6706
2014-09-24 16:47:38 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-computer7-60/ip:6706
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) [netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] failed to send requests to computer7-60/ip:6706: 
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) ~[netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-computer7-60/ip:6706..., timeout: 600000ms, pendings: 0
2014-09-24 16:47:38 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927.invoke(worker.clj:320) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 6 common frames omitted
2014-09-24 16:47:38 b.s.m.n.Client [INFO] New Netty Client, connect to computer7-60, 6706, config: , buffer_size: 52



    解决方式:问题比较低级,解决过程也是比较曲折,最终发现hosts文件配置错误(机器名中的小写L写成了数字1),导致worker节点间数据通讯出现问题,影响任务调度。

二、kafka性能瓶颈

    kafka与storm整合时数据处理性能不是很好,未达到预期要求。一开始怀疑是kafkaspout代码问题,但是storm external中已经将其收录进来,感觉问题应该不是出在这里。后来看了一下kafkaspout实现,找到了可能的性能瓶颈点。kafka在设计时,为了增加并发访问及处理性能,在topic中加入了partitions属性,也就是将数据打散,提高并发与处理性能。由于队列信息offset是在客户端维护,kafkaspout在解决并发互斥时采用task与partitions一一对应的方式来解决互斥访问。topology在使用时,kafkaspout的并发度可以根据具体topic的partitions属性来设定。这样通过增加topic partitions和并发度(8),达到了预期的处理性能。

    由此联想,之前遇到的flume缓存到kafka队列的问题也可能是partitions设定方式问题导致,后续再测试验证一下。

    

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183