当前访客身份:游客 [ 登录  | 注册加入尚学堂]
直播

我来了!

拥有积分:3866
尚学堂雄起!!威武。。。

博客分类

笔记中心

课题中心

提问中心

答题中心

解答题中心

漫游Kafka实战篇之客户端API

我来了! 发表于 2年前 (2014-11-10 17:05:48)  |  评论(0)  |  阅读次数(534)| 0 人收藏此文章,   我要收藏   

原文地址:http://blog.csdn.net/honglei915/article/details/37697655

Kafka Producer APIs

旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

class Producer {
	
  /* 将消息发送到指定分区 */  
  public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

  /* 批量发送一批消息 */  
  public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

  /* 关闭producer */	
  public void close();

}

新版的Producer API提供了以下功能:
  1. 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
  2. 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
    interface Encoder<T> {
      public Message toMessage(T data);
    }
  3. 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  4. 通过分区函数kafka.producer.Partitioner类对消息分区
    interface Partitioner<T> {
       int partition(T key, int numPartitions);
    }
    分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

新的api完整实例如下:

import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}

下面这个是用到的分区函数:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
 
public class SimplePartitioner implements Partitioner<String> {
    public SimplePartitioner (VerifiableProperties props) {
 
    }
 
    public int partition(String key, int a_numPartitions) {
        int partition = 0;
        int offset = key.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}


KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

低级别的API

class SimpleConsumer {
	
  /*向一个broker发送读取请求并得到消息集 */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /*向一个broker发送读取请求并得到一个相应集 */ 
  public MultiFetchResponse multifetch(List<FetchRequest> fetches);

  /**
   * 得到指定时间之前的offsets
   * 返回值是offsets列表,以倒序排序
   * @param time: 时间,毫秒,
   *              如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
   *              如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

高级别的API

/* 创建连接 */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)  
   *  Input: a map of <topic, #streams>
   *  Output: a map of <topic, list of message streams>
   */
  public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 

  /**    
   * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
   * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
   */
  public List<KafkaStream> createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* 提交目前消费到的offset */
  public commitOffsets()
  
  /* 关闭连接 */
  public shutdown()
}

这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

分享到:0
关注微信,跟着我们扩展技术视野。每天推送IT新技术文章,每周聚焦一门新技术。微信二维码如下:
微信公众账号:尚学堂(微信号:bjsxt-java)
声明:博客文章版权属于原创作者,受法律保护。如果侵犯了您的权利,请联系管理员,我们将及时删除!
(邮箱:webmaster#sxt.cn(#换为@))
北京总部地址:北京市海淀区西三旗桥东建材城西路85号神州科技园B座三层尚学堂 咨询电话:400-009-1906 010-56233821
Copyright 2007-2015 北京尚学堂科技有限公司 京ICP备13018289号-1 京公网安备11010802015183