当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:4038
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

Flume+Kafka+Strom基于伪分布式环境的结合使用

我来了! 发表于 2年前 (2014-11-12 15:04:52)  |  评论(0)  |  阅读次数(1214)| 0 人收藏此文章,   我要收藏   
目录:
  一、Flume、Kafka、Storm是什么,如何安装?
  二、Flume、Kafka、Storm如何结合使用?
    1) 原理是什么?
    2) Flume和Kafka的整合 
    3) Kafka和Storm的整合 
    4) Flume、Kafka、Storm的整合 
 
  一、Flume、Kafka、Storm是什么,如何安装?
  Flume的介绍,请参考这篇文章《 Flume1.5.0的安装、部署、简单应用》
  Kafka的介绍,请参考这篇文章《 kafka2.9.2的分布式集群安装和demo(java api)测试》
  Storm的介绍,请参考这篇文章《 ubuntu12.04+storm0.9.2分布式集群的搭建》
   在后面的例子中,我们也是使用以上三篇文章中的配置进行测试。
 
  二、Flume、Kafka、Storm如何结合使用?
    1) 原理是什么?
  如何你仔细阅读过关于Flume、Kafka、Storm的介绍,就会知道,在他们各自之间对外交互发送消息的原理。
   在后面的例子中,我们主要对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出。
 
    2) flume和kafka的整合
     #复制flume要用到的kafka相关jar到flume目录下的lib里面。
 
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
     #编写sink.java文件,然后在eclipse导出jar包,放到flume-1.5.1-bin/lib目录中,项目中要引用flume-ng-configuration-1.5.0.jar,flume-ng-sdk-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,如果找不到就用find命令搜一下。
 
packageidoall.cloud.flume.sink;
 
importjava.util.Properties;
 
importkafka.javaapi.producer.Producer;
importkafka.producer.KeyedMessage;
importkafka.producer.ProducerConfig;
 
importorg.apache.commons.logging.Log;
importorg.apache.commons.logging.LogFactory;
importorg.apache.flume.Channel;
importorg.apache.flume.Context;
importorg.apache.flume.Event;
importorg.apache.flume.EventDeliveryException;
importorg.apache.flume.Transaction;
importorg.apache.flume.conf.Configurable;
importorg.apache.flume.sink.AbstractSink;
 
 
publicclassKafkaSinkextendsAbstractSinkimplementsConfigurable {
    privatestaticfinalLog logger = LogFactory.getLog(KafkaSink.class);
 
    privateString topic;
    privateProducer<String, String> producer;
 
    publicvoidconfigure(Context context) {
        topic ="idoall_testTopic";
        Properties props =newProperties();
        props.setProperty("metadata.broker.list","m1:9092,m2:9092,s1:9092,s2:9092");
        props.setProperty("serializer.class","kafka.serializer.StringEncoder");
        props.put("partitioner.class","idoall.cloud.kafka.Partitionertest");
        props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181/kafka");
        props.setProperty("num.partitions","4");//
        props.put("request.required.acks","1");
        ProducerConfig config =newProducerConfig(props);
        producer =newProducer<String, String>(config);
        logger.info("KafkaSink初始化完成.");
 
    }
 
    publicStatus process()throwsEventDeliveryException {
        Channel channel = getChannel();
        Transaction tx = channel.getTransaction();
        try{
            tx.begin();
            Event e = channel.take();
            if(e ==null) {
                tx.rollback();
                returnStatus.BACKOFF;
            }
            KeyedMessage<String, String> data =newKeyedMessage<String, String>(topic,newString(e.getBody()));
            producer.send(data);
            logger.info("flume向kafka发送消息:"+newString(e.getBody()));
            tx.commit();
            returnStatus.READY;
        }catch(Exception e) {
            logger.error("Flume KafkaSinkException:", e);
            tx.rollback();
            returnStatus.BACKOFF;
        }finally{
            tx.close();
        }
    }
}
     #在m1上配置flume和kafka交互的agent
 
root@m1:/home/hadoop/flume-1.5.0-bin# vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type= syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
 
# Describe the sink
a1.sinks.k1.type= idoall.cloud.flume.sink.KafkaSink
 
# Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
     #在m1,m2,s1,s2的机器上,分别启动kafka(如果不会请参考这篇文章介绍了kafka的安装、配置和启动《 kafka2.9.2的分布式集群安装和demo(java api)测试》),然后在s1机器上再启动一个消息消费者consumer
?
1
root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
     #在m1启动flume
 
root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
#下面只截取部分日志信息
14/08/1911:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14/08/1911:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14/08/1911:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: {source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14/08/1911:36:34 INFO node.Application: Starting Channel c1
14/08/1911:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: CHANNEL, name: c1: Successfully registered new MBean.
14/08/1911:36:34 INFO instrumentation.MonitoredCounterGroup: Componenttype: CHANNEL, name: c1 started
14/08/1911:36:34 INFO node.Application: Starting Sink k1
14/08/1911:36:34 INFO node.Application: Starting Source r1
14/08/1911:36:34 INFOsource.SyslogTcpSource: Syslog TCP Source starting...
     #在m1上再打开一个窗口,测试向flume中发送syslog
?
1
root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140
     #m1打开的flume窗口中看最后一行的信息,Flume已经向kafka发送了消息
 
14/08/1911:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14/08/1911:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14/08/1911:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: {source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14/08/1911:36:34 INFO node.Application: Starting Channel c1
14/08/1911:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter groupfortype: CHANNEL, name: c1: Successfully registered new MBean.
14/08/1911:36:34 INFO instrumentation.MonitoredCounterGroup: Componenttype: CHANNEL, name: c1 started
14/08/1911:36:34 INFO node.Application: Starting Sink k1
14/08/1911:36:34 INFO node.Application: Starting Source r1
14/08/1911:36:34 INFOsource.SyslogTcpSource: Syslog TCP Source starting...
14/08/1911:38:05 WARNsource.SyslogUtils: Event created from Invalid Syslog data.
14/08/1911:38:05 INFO client.ClientUtils$: Fetching metadata from brokerid:3,host:s2,port:9092 with correlationid0for1 topic(s) Set(idoall_testTopic)
14/08/1911:38:05 INFO producer.SyncProducer: Connected to s2:9092forproducing
14/08/1911:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092
14/08/1911:38:05 INFO producer.SyncProducer: Connected to m1:9092forproducing
14/08/1911:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog
     #在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息, 说明flume和kafka已经调试成功了。
 
root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcherforpartitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)
[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlationid2 from client  on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or isinthe process of being deleted (kafka.server.KafkaApis)
[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)
[2014-08-11 14:22:12,250] INFO Created logforpartition [flume-kafka-storm-001,1]in/home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logswith properties {segment.index.bytes -> 10485760,file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is foundforpartition [flume-kafka-storm-001,1] (kafka.cluster.Partition)
[2014-08-11 14:22:12,375] INFO Closing socket connection to/192.168.1.50. (kafka.network.Processor)
hello idoall.org syslog
    3) kafka和storm的整合 
     #我们先在eclipse中写代码,在写代码之前,我们要先对maven进行配置,pom.xml配置文件内容如下:
 
<?xmlversion="1.0"encoding="utf-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
  <modelVersion>4.0.0</modelVersion> 
  <groupId>idoall.cloud</groupId> 
  <artifactId>idoall.cloud</artifactId> 
  <version>0.0.1-SNAPSHOT</version> 
  <packaging>jar</packaging> 
  <name>idoall.cloud</name> 
  <url>http://maven.apache.org</url> 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties> 
  <repositories>
    <repository>
      <id>github-releases</id> 
      <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
    </repository> 
    <repository>
      <id>clojars.org</id> 
      <url>http://clojars.org/repo</url>
    </repository>
  </repositories> 
  <dependencies>
    <dependency>
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.11</version> 
      <scope>test</scope>
    </dependency> 
    <dependency>
      <groupId>com.sksamuel.kafka</groupId> 
      <artifactId>kafka_2.10</artifactId> 
      <version>0.8.0-beta1</version>
    </dependency> 
    <dependency>
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.14</version>
    </dependency> 
    <dependency>
      <groupId>storm</groupId> 
      <artifactId>storm</artifactId> 
      <version>0.9.0.1</version> 
      <!-- keep storm out of the jar-with-dependencies --> 
      <scope>provided</scope>
    </dependency> 
    <dependency>
      <groupId>commons-collections</groupId> 
      <artifactId>commons-collections</artifactId> 
      <version>3.2.1</version>
    </dependency>
  </dependencies>
</project>
     #编写KafkaSpouttest.java文件
 
packageidoall.cloud.storm;
 
importjava.text.SimpleDateFormat;
importjava.util.Date;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;
importkafka.consumer.ConsumerConfig;
importkafka.consumer.ConsumerIterator;
importkafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
importbacktype.storm.spout.SpoutOutputCollector;
importbacktype.storm.task.TopologyContext;
importbacktype.storm.topology.IRichSpout;
importbacktype.storm.topology.OutputFieldsDeclarer;
importbacktype.storm.tuple.Fields;
importbacktype.storm.tuple.Values;
 
publicclassKafkaSpouttestimplementsIRichSpout {
     
    privateSpoutOutputCollector collector;
    privateConsumerConnector consumer;
    privateString topic;
 
    publicKafkaSpouttest() {
    }
     
    publicKafkaSpouttest(String topic) {
        this.topic = topic;
    }
 
    publicvoidnextTuple() {
    }
 
    publicvoidopen(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
 
    publicvoidack(Object msgId) {
    }
 
    publicvoidactivate() {
         
<span style="font-size: 9pt; line-height: 25.2000007629395px;">     </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
         
<span style="font-size: 9pt; line-height: 25.2000007629395px;">     </span>Map<String,Integer> topickMap =newHashMap<String, Integer>(); 
        topickMap.put(topic,1); 
 
        System.out.println("*********Results********topic:"+topic); 
 
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap); 
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); 
        ConsumerIterator<byte[],byte[]> it =stream.iterator();  
        while(it.hasNext()){ 
             String value =newString(it.next().message());
             SimpleDateFormat formatter =newSimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS"); 
             Date curDate =newDate(System.currentTimeMillis());//获取当前时间      
             String str = formatter.format(curDate);  
                
             System.out.println("storm接收到来自kafka的消息------->"+ value);
 
             collector.emit(newValues(value,1,str), value);
        } 
    }
     
    privatestaticConsumerConfig createConsumerConfig() { 
        Properties props =newProperties(); 
        // 设置zookeeper的链接地址
        props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181"); 
        // 设置group id
        props.put("group.id","1"); 
        // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
        props.put("auto.commit.interval.ms","1000");
        props.put("zookeeper.session.timeout.ms","10000"); 
        returnnewConsumerConfig(props); 
    } 
 
    publicvoidclose() {
    }
 
    publicvoiddeactivate() {
    }
 
    publicvoidfail(Object msgId) {
    }
 
    publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(newFields("word","id","time"));
    }
 
    publicMap<String, Object> getComponentConfiguration() {
        System.out.println("getComponentConfiguration被调用");
        topic="idoall_testTopic";
        returnnull;
    }
}
     #编写KafkaTopologytest.java文件
 
packageidoall.cloud.storm;
 
importjava.util.HashMap;
importjava.util.Map;
importbacktype.storm.Config;
importbacktype.storm.LocalCluster;
importbacktype.storm.topology.BasicOutputCollector;
importbacktype.storm.topology.OutputFieldsDeclarer;
importbacktype.storm.topology.TopologyBuilder;
importbacktype.storm.topology.base.BaseBasicBolt;
importbacktype.storm.tuple.Fields;
importbacktype.storm.tuple.Tuple;
importbacktype.storm.tuple.Values;
importbacktype.storm.utils.Utils;
 
publicclassKafkaTopologytest {
 
    publicstaticvoidmain(String[] args) {
        TopologyBuilder builder =newTopologyBuilder();
 
        builder.setSpout("spout",newKafkaSpouttest(""),1);
        builder.setBolt("bolt1",newBolt1(),2).shuffleGrouping("spout");
        builder.setBolt("bolt2",newBolt2(),2).fieldsGrouping("bolt1",newFields("word"));
 
        Map conf =newHashMap();
        conf.put(Config.TOPOLOGY_WORKERS,1);
        conf.put(Config.TOPOLOGY_DEBUG,true);
 
        LocalCluster cluster =newLocalCluster();
        cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());
         
        Utils.sleep(1000*60*5);// local cluster test ...
        cluster.shutdown();
    }
     
    publicstaticclassBolt1extendsBaseBasicBolt {
         
        publicvoidexecute(Tuple input, BasicOutputCollector collector) {
            try{
                String msg = input.getString(0);
                intid = input.getInteger(1);
                String time = input.getString(2);
                msg = msg+"bolt1";
                System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
                if(msg !=null) {
                    collector.emit(newValues(msg));
                }
            }catch(Exception e) {
                e.printStackTrace();
            }
        }
  
        
        publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(newFields("word"));
        }
    }
     
    publicstaticclassBolt2extendsBaseBasicBolt {
        Map<String, Integer> counts =newHashMap<String, Integer>();
  
        
        publicvoidexecute(Tuple tuple, BasicOutputCollector collector) {
            String msg = tuple.getString(0);
            msg = msg +"bolt2";
            System.out.println("对消息加工第2次---------->"+msg);
            collector.emit(newValues(msg,1));
        }
  
       
        publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(newFields("word","count"));
        }
    }
}
     #测试kafka和storm的结合
  打开两个窗口(也可以在两台机器上分别打开,下面的例子中,我会打开m2和s1机器 ),分别m2上运行kafka的producer,在s1上运行kafka的consumer(如果刚才打开了就不用再打开),先测试kafka自运行是否正常。
  如下所示,我在m2上运行producer,输入“hello welcome idoall.org”,在s1的机器上consumer同样收到了消息。说明kafka已经运行正常,并且消息通讯也没有问题。
 
  m2机器输出的消息:
 
root@m2:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopic
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello welcome idoall.org
  s1机器接收的消息:
 
root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
hello welcome idoall.org
     #我们再在Eclipse中运行KafkaTopologytest.java,可以看到在控制台,同样收到了刚才在m2上kafka发送的消息。说明kafka和storm也打通了。
 
#信息太多,我只截取重要部分:
*********Results********topic:idoall_testTopic
storm接收到来自kafka的消息------->hello welcome idoall.org
5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
对消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt1
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received messagesource: spout:6, stream: default,id: {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1]
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501]
5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received messagesource: bolt1:3, stream: default,id: {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1]
对消息加工第2次---------->hello welcome idoall.orgbolt1bolt2
5270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1]
    3) flume、kafka、storm的整合 
  从上面两个例子我们可以看到,flume和kafka之前已经完成了通讯和部署,kafka和storm之间可以正常通讯,只差把storm的相关文件打包成jar部署到storm中即可完成三者的通讯。
  Storm的安装、配置、部署,如果不了解,可以参考这篇文章《 ubuntu12.04+storm0.9.2分布式集群的搭建》
 
     #复制kafka相关的jar包到storm的lib里面。(因为在上面我们已经说过,kafka和storm的整合,主要是重写storm的spout,调用kafka的Consumer来接收消息并打印,所在需要用到这些jar包)
 
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1:/home/hadoop# cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib
     #在m1上启动storm nimbus
 
root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus &
     #在s1,s2上启动storm supervisor
 
root@s1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor &
     #在m1上启动storm ui
 
root@m1:/home/hadoop# /home/hadoop/storm-0.9.2-incubating/bin/storm ui &
     #将Eclipse中的文件打包成jar复制到做任意目录,然后用storm来运行
 
root@m1:/home/hadoop/storm-0.9.2-incubating# ll
总用量 25768
drwxr-xr-x 11 root   root       4096 Aug 19 11:53 ./
drwxr-xr-x 46 hadoop hadoop     4096 Aug 17 15:06 ../
drwxr-xr-x  2 root   root       4096 Aug  1 14:38 bin/
-rw-r--r--  1    502 staff     34239 Jun 13 08:46 CHANGELOG.md
drwxr-xr-x  2 root   root       4096 Aug  2 12:31 conf/
-rw-r--r--  1    502 staff       538 Mar 13 11:17 DISCLAIMER
drwxr-xr-x  3    502 staff      4096 May  6 03:13 examples/
drwxr-xr-x  3 root   root       4096 Aug  1 14:38 external/
-rw-r--r--  1 root   root   26252342 Aug 19 11:36 idoall.cloud.jar
drwxr-xr-x  3 root   root       4096 Aug  2 12:51 ldir/
drwxr-xr-x  2 root   root       4096 Aug 19 11:53 lib/
-rw-r--r--  1    502 staff     22822 Jun 12 04:07 LICENSE
drwxr-xr-x  2 root   root       4096 Aug  1 14:38 logback/
drwxr-xr-x  2 root   root       4096 Aug  1 15:07 logs/
-rw-r--r--  1    502 staff       981 Jun 11 01:10 NOTICE
drwxr-xr-x  5 root   root       4096 Aug  1 14:38 public/
-rw-r--r--  1    502 staff      7445 Jun 10 02:24 README.markdown
-rw-r--r--  1    502 staff        17 Jun 17 00:22 RELEASE
-rw-r--r--  1    502 staff      3581 May 30 00:20 SECURITY.md
root@m1:/home/hadoop/storm-0.9.2-incubating# /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest
     #在flume中发消息,在storm中看是否有接收到
 
   在flume中发送的消息:
 
root@m1:/home/hadoop# echo "flume->kafka->storm message" | nc localhost 5140                      
root@m1:/home/hadoop#
   storm中显示的内容:
 
#内容太多,只截取重要部分
storm接收到来自kafka的消息------->flume->kafka->storm message
174218 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
174220 [Thread-10-bolt1] INFO  backtype.storm.daemon.executor - Processing received messagesource: spout:6, stream: default,id: {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
对消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1
174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1]
174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397]
174222 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received messagesource: bolt1:3, stream: __ack_ack,id: {}, [-2345821945306343027 -2191137958679040397]
174222 [Thread-12-bolt2] INFO  backtype.storm.daemon.executor - Processing received messagesource: bolt1:3, stream: default,id: {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1]
对消息加工第2次---------->flume->kafka->storm messagebolt1bolt2
174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1]
174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671]
174224 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received messagesource: bolt2:4, stream: __ack_ack,id: {}, [-2345821945306343027 8433871885621516671]
174228 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6]
174228 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received messagesource: spout:6, stream: __ack_init,id: {}, [-2345821945306343027 -7738131487327750388 6]
174228 [Thread-20-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027]
    通过以上实例,我们完成了flume、kafka、storm之间的通讯,结合之前介绍的《Flume1.5.0的安装、部署、简单应用(含分布式、与hadoop2.2.0、hbase0.96的案例)》和《Golang、Php、Python、Java基于Thrift0.9.1实现跨语言调用》.如果相互结合,相信在基于大数据实时计算,以及多语言之间的相互调用,能够解决你在项目中的大部分问题。希望最近一系列的文章能够对你有帮助。

---------------------------------------

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183