当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3958
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

分布式消息订阅发布系统Apache Kafka本机环境搭建和简单测试

我来了! 发表于 2年前 (2014-11-10 16:40:38)  |  评论(0)  |  阅读次数(1032)| 0 人收藏此文章,   我要收藏   

kafka是什么

http://blog.csdn.net/yfkiss/article/details/17348693

http://blog.csdn.net/yfkiss/article/details/17348693

一言以蔽之:分布式、高吞吐量 的 订阅、发布 消息系统

kafka的四方:producer     broker      consumer                  由 zookeeper协调转发

安装搭建:  http://blog.csdn.net/yfkiss/article/details/17381351

本机环境描述

本机ip:10.10.113.120

操作系统:Linux bogon 2.6.32-358.23.2.el6.i686 #1 SMP Wed Oct 16 17:21:31 UTC 2013 i686 i686 i386 GNU/Linux,CentOS release 6.4 (Final)

节点情况:zookeeper-1个 ,broker-3个

测试点:

  • 添加 topic 
  • producer 产生消息, 消费者 消费的
  • 当非leader broker挂掉时 生产消费情况
  • 一个leader broker 挂掉时 生产消费情况

关键步骤

1、下载编译好的kafka包    我试过该包提供的 window下bat处理文件,总是提示 classnotfound异常。于是转到centos下。

2、jdk安装什么的自行解决

3、解压后的目录结构

drwxr-xr-x. 3 xxx xxx  4096 Jun 13 10:01 bin
drwxr-xr-x. 2 xxx xxx  4096 Jun 12 21:30 config
drwxr-xr-x. 2 xxx xxx  4096 Apr 23 03:26 libs
-rw-rw-r--. 1 xxx xxx 11358 Apr 23 02:37 LICENSE 
-rw-rw-r--. 1 xxx xxx   162 Apr 23 02:37 NOTICE
4、kafka会用到hostname,所以 需要修改操作系统的hostname,否者后面执行kafka的shell命令时会报 unknownhostname的异常
centos的hostname修改方式:
#1 root权限
hostname bogon
#2 修改/etc/hosts
127.0.0.1       localhost.localdomain localhost
::1             localhost6.localdomain6 localhost6

10.10.113.120 bogon bogon
#3 修改/etc/sysconfig/network
NETWORKING=yes
NETWORKING_IPV6=no
HOSTNAME=bogon
5、 jvm参数中-XX标识的是实验性参数,kafka用了很多用来优化运行的jvm参数,而你安装的 jdk所带的jvm不一定支持这些参数,比如: -XX:+UseCompressedOops
如果你遇到
Unrecognized VM option '+UseCompressedOops'
的错误,请在bin/kafka-run-class.sh中移除相关参数

上述两个步骤比较重要。

6、配置broker server的参数 config/server1.properties

# 关键的四个 配置
broker.id=1
port=9092
host.name=10.10.113.120

zookeeper.connect=10.10.113.120:2181
server2.properties 

# 关键的四个 配置
broker.id=2
port=9093
host.name=10.10.113.120

zookeeper.connect=10.10.113.120:2181
server3.properties 

# 关键的四个 配置
broker.id=3
port=9094
host.name=10.10.113.120

zookeeper.connect=10.10.113.120:2181

测试实验

在解压包的bin同级目录,编写启动zookeeper 和 broker server 的startall.sh,赋予执行权限

#!/bin/bash
#start zookeeper server
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#start 3 broker servers 
JMX_PORT=9997 bin/kafka-server-start.sh config/server1.properties &
JMX_PORT=9998 bin/kafka-server-start.sh config/server2.properties &
JMX_PORT=9999 bin/kafka-server-start.sh config/server3.properties &
运行 startall.sh,进行测试实验,执行脚本的命令行的使用方法和用 --help显示,每次更新命令行参数都有些改变

1、添加3个topic

bin/kafka-topics.sh --create --topic funckXXX1 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181
bin/kafka-topics.sh --create --topic funckXXX2 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181
bin/kafka-topics.sh --create --topic funckXXX3 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181
查看已添加的topic
bin/kafka-topics.sh --create --describe --zookeeper 10.10.113.120:2181
如,主broker 为 Leader 2
Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX1        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
Topic: funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX2        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
Topic: funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX3        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
说明:
 partiton: partion id,由于此处只有一个partition,因此partition id 为0 leader:
当前负责读写的lead broker id relicas:
当前partition的所有replication broker list isr:
relicas的子集,只包含出于活动状态的broker

2、启动消费者控制台

看看某个topic的消费情况

bin/kafka-console-consumer.sh --topic fuckXXX1 --zookeeper 10.10.113.120:2181 --from-beginning

3、添加消息到 top观察 消费者的消费情况

启动生产者控制台

bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,10.10.113.120:9093,10.10.113.120:9094

在其中输入一些消息 回车,如:

[bogon@bogon kafaka0811]$ bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,110.113.120:9093,10.10.113.120:9094
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
message1
message2
message3   



那么在消费者端应该有动作:

[bogon@bogon kafaka0811]$ bin/kafka-console-consumer.sh --zookeeper 10.10.113.120:2181 --from-beginning --topic fuckXXX1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-06-13 15:55:16,203] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)
[2014-06-13 15:56:12,871] INFO Topic creation {"version":1,"partitions":{"1":[3],"0":[2]}} (kafka.admin.AdminUtils$)
[2014-06-13 15:56:12,879] INFO [KafkaApi-3] Auto creation of topic fuckXXX1 with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2014-06-13 15:56:12,954] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [fuckXXX1,0] (kafka.server.ReplicaFetcherManager)
[2014-06-13 15:56:12,958] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [fuckXXX1,1] (kafka.server.ReplicaFetcherManager)
[2014-06-13 15:56:12,973] INFO Completed load of log fuckXXX1-1 with log end offset 0 (kafka.log.Log)
[2014-06-13 15:56:12,985] INFO Created log for partition [fuckXXX1,1] in /tmp/kafka-broke3-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-06-13 15:56:12,986] WARN Partition [fuckXXX1,1] on broker 3: No checkpointed highwatermark is found for partition [fuckXXX1,1] (kafka.cluster.Partition)
[2014-06-13 15:56:12,997] INFO Completed load of log fuckXXX1-0 with log end offset 0 (kafka.log.Log)
[2014-06-13 15:56:13,017] INFO Created log for partition [fuckXXX1,0] in /tmp/kafka-broke2-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-06-13 15:56:13,021] WARN Partition [fuckXXX1,0] on broker 2: No checkpointed highwatermark is found for partition [fuckXXX1,0] (kafka.cluster.Partition)
[2014-06-13 15:56:13,059] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)
[2014-06-13 15:56:13,288] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)
[2014-06-13 15:56:13,914] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)
message1
message2
message3

4、杀掉非leaderbroker 观察

查看所有java进程   : ps axf | grep java

然后kill掉除Leader 标识之外的任一  broker的进程

pkill -9 -f server1.properties



此时topic信息为:

[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe  --zookeeper 10.10.113.120:2181

Topic:fuckXXX1  PartitionCount:2        ReplicationFactor:1     Configs:
Topic: fuckXXX1 Partition: 0    Leader: 2       Replicas: 2     Isr: 2
Topic: fuckXXX1 Partition: 1    Leader: 3       Replicas: 3     Isr: 3
Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX1        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3
Topic:funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX2        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3
Topic:funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX3        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3



此时生产和消费消息依旧正常,请自己发布消息

5、杀掉leader broker 观察

杀掉主broker的进程,然后连续观察 topic详情

pkill -9 -f server2.properties
topic为,可看出 Leader 已经改变为3了

[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe  --zookeeper 10.10.113.120:2181
Topic:fuckXXX1  PartitionCount:2        ReplicationFactor:1     Configs:
Topic: fuckXXX1 Partition: 0    Leader: -1      Replicas: 2     Isr: 
Topic: fuckXXX1 Partition: 1    Leader: 3       Replicas: 3     Isr: 3
Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX1        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3
Topic:funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX2        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3
Topic:funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:
Topic: funckXXX3        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3



此时生产和消费消息依旧正常,请自己发布消息

TODO:生产者消费者客户端功能编写

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183