zoukankan      html  css  js  c++  java
  • 漫游Kafka实战篇之客户端API

    Kafka Producer APIs

    旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. class Producer {  
    2.       
    3.   /* 将消息发送到指定分区 */    
    4.   public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);  
    5.   
    6.   /* 批量发送一批消息 */    
    7.   public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);  
    8.   
    9.   /* 关闭producer */    
    10.   public void close();  
    11.   
    12. }  

    新版的Producer API提供了以下功能:

    1. 可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
    2. 自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
      [java] view plaincopy在CODE上查看代码片派生到我的代码片
       
      1. interface Encoder<T> {  
      2.   public Message toMessage(T data);  
      3. }  
    3. 提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
    4. 通过分区函数kafka.producer.Partitioner类对消息分区
      [java] view plaincopy在CODE上查看代码片派生到我的代码片
       
      1. interface Partitioner<T> {  
      2.    int partition(T key, int numPartitions);  
      3. }  
      分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

    新的api完整实例如下:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. import java.util.*;  
    2.    
    3. import kafka.javaapi.producer.Producer;  
    4. import kafka.producer.KeyedMessage;  
    5. import kafka.producer.ProducerConfig;  
    6.    
    7. public class TestProducer {  
    8.     public static void main(String[] args) {  
    9.         long events = Long.parseLong(args[0]);  
    10.         Random rnd = new Random();  
    11.    
    12.         Properties props = new Properties();  
    13.         props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");  
    14.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
    15.         props.put("partitioner.class", "example.producer.SimplePartitioner");  
    16.         props.put("request.required.acks", "1");  
    17.    
    18.         ProducerConfig config = new ProducerConfig(props);  
    19.    
    20.         Producer<String, String> producer = new Producer<String, String>(config);  
    21.    
    22.         for (long nEvents = 0; nEvents < events; nEvents++) {   
    23.                long runtime = new Date().getTime();    
    24.                String ip = “192.168.2.” + rnd.nextInt(255);   
    25.                String msg = runtime + “,www.example.com,” + ip;   
    26.                KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);  
    27.                producer.send(data);  
    28.         }  
    29.         producer.close();  
    30.     }  
    31. }  


    下面这个是用到的分区函数:

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. import kafka.producer.Partitioner;  
    2. import kafka.utils.VerifiableProperties;  
    3.    
    4. public class SimplePartitioner implements Partitioner<String> {  
    5.     public SimplePartitioner (VerifiableProperties props) {  
    6.    
    7.     }  
    8.    
    9.     public int partition(String key, int a_numPartitions) {  
    10.         int partition = 0;  
    11.         int offset = key.lastIndexOf('.');  
    12.         if (offset > 0) {  
    13.            partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;  
    14.         }  
    15.        return partition;  
    16.   }  
    17.    
    18. }  



    KafKa Consumer APIs

    Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

    高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

    低级别的API

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. class SimpleConsumer {  
    2.       
    3.   /*向一个broker发送读取请求并得到消息集 */   
    4.   public ByteBufferMessageSet fetch(FetchRequest request);  
    5.   
    6.   /*向一个broker发送读取请求并得到一个相应集 */   
    7.   public MultiFetchResponse multifetch(List<FetchRequest> fetches);  
    8.   
    9.   /** 
    10.    * 得到指定时间之前的offsets 
    11.    * 返回值是offsets列表,以倒序排序 
    12.    * @param time: 时间,毫秒, 
    13.    *              如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset. 
    14.    *              如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset. 
    15.    */  
    16.   public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);  
    17. }  

    低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

    高级别的API

    [java] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. /* 创建连接 */   
    2. ConsumerConnector connector = Consumer.create(consumerConfig);  
    3.   
    4. interface ConsumerConnector {  
    5.       
    6.   /** 
    7.    * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)   
    8.    *  Input: a map of <topic, #streams> 
    9.    *  Output: a map of <topic, list of message streams> 
    10.    */  
    11.   public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);   
    12.   
    13.   /**     
    14.    * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代, 
    15.    * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。 
    16.    */  
    17.   public List<KafkaStream> createMessageStreamsByFilter(  
    18.       TopicFilter topicFilter, int numStreams);  
    19.   
    20.   /* 提交目前消费到的offset */  
    21.   public commitOffsets()  
    22.     
    23.   /* 关闭连接 */  
    24.   public shutdown()  
    25. }  

    这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

    每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。

  • 相关阅读:
    Python爬取微博热搜榜,将数据存入数据库
    Python爬取网站文章数据并存到数据库
    在自己的框架系统中使用tp类
    conda环境下pip install 无法安装到指定conda环境中(conda环境的默认pip安装位置)
    jupyter notebook 加入conda虚拟环境(指定conda虚拟环境)
    本地打包好的项目如何运行在docker中
    测试
    SQL Server创建dblink跨库查询
    深入浅析BIO、NIO、AIO
    JavaWeb_html_js_css_javaweb
  • 原文地址:https://www.cnblogs.com/tonychai/p/4437023.html
Copyright © 2011-2022 走看看