zoukankan      html  css  js  c++  java
  • 漫游Kafka实战篇clientAPI

    原文地址:http://blog.csdn.net/honglei915/article/details/37697655

    Kafka Producer APIs

    旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

    class Producer {
    	
      /* 将消息发送到指定分区 */  
      public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
    
      /* 批量发送一批消息 */  
      public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
    
      /* 关闭producer */	
      public void close();
    
    }

    新版的Producer API提供了下面功能:
    1. 能够将多个消息缓存到本地队列里。然后异步的批量发送到broker,能够通过參数producer.type=async做到。缓存的大小能够通过一些參数指定:queue.timebatch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也能够通过參数event.handler定制handler。在producer端处理数据的不同的阶段注冊处理器,比方能够对这一过程进行日志追踪。或进行一些监控。仅仅需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
    2. 自己编写Encoder来序列化消息,仅仅需实现以下这个接口。默认的Encoder是kafka.serializer.DefaultEncoder
      interface Encoder<T> {
        public Message toMessage(T data);
      }
    3. 提供了基于Zookeeper的broker自己主动感知能力,能够通过參数zk.connect实现。假设不使用Zookeeper。也能够使用broker.list參数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。

    4. 通过分区函数kafka.producer.Partitioner类对消息分区
      interface Partitioner<T> {
         int partition(T key, int numPartitions);
      }
      分区函数有两个參数:key和可用的分区数量。从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.假设key是null,就随机的选择一个。

      能够通过參数partitioner.class定制分区函数。

    新的api完整实比例如以下:

    import java.util.*;
     
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
     
    public class TestProducer {
        public static void main(String[] args) {
            long events = Long.parseLong(args[0]);
            Random rnd = new Random();
     
            Properties props = new Properties();
            props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("partitioner.class", "example.producer.SimplePartitioner");
            props.put("request.required.acks", "1");
     
            ProducerConfig config = new ProducerConfig(props);
     
            Producer<String, String> producer = new Producer<String, String>(config);
     
            for (long nEvents = 0; nEvents < events; nEvents++) { 
                   long runtime = new Date().getTime();  
                   String ip = “192.168.2.” + rnd.nextInt(255); 
                   String msg = runtime + “,www.example.com,” + ip; 
                   KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
                   producer.send(data);
            }
            producer.close();
        }
    }

    以下这个是用到的分区函数:

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
     
    public class SimplePartitioner implements Partitioner<String> {
        public SimplePartitioner (VerifiableProperties props) {
     
        }
     
        public int partition(String key, int a_numPartitions) {
            int partition = 0;
            int offset = key.lastIndexOf('.');
            if (offset > 0) {
               partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
            }
           return partition;
      }
     
    }


    KafKa Consumer APIs

    Consumer API有两个级别。低级别的和一个指定的broker保持连接。并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

    高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还能够自己维护消费状态。并能够通过一些条件指定订阅特定的topic,比方白名单黑名单或者正則表達式。

    低级别的API

    class SimpleConsumer {
    	
      /*向一个broker发送读取请求并得到消息集 */ 
      public ByteBufferMessageSet fetch(FetchRequest request);
    
      /*向一个broker发送读取请求并得到一个对应集 */ 
      public MultiFetchResponse multifetch(List<FetchRequest> fetches);
    
      /**
       * 得到指定时间之前的offsets
       * 返回值是offsets列表。以倒序排序
       * @param time: 时间,毫秒,
       *              假设指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
       *              假设指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
       */
      public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    }
    低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比方Hadoop consumer这种离线consumer。


    高级别的API

    /* 创建连接 */ 
    ConsumerConnector connector = Consumer.create(consumerConfig);
    
    interface ConsumerConnector {
    	
      /**
       * 这种方法能够得到一个流的列表。每一个流都是MessageAndMetadata的迭代,通过MessageAndMetadata能够拿到消息和其它的元数据(眼下之后topic)  
       *  Input: a map of <topic, #streams>
       *  Output: a map of <topic, list of message streams>
       */
      public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); 
    
      /**    
       * 你也能够得到一个流的列表,它包括了符合TopicFiler的消息的迭代,
       * 一个TopicFilter是一个封装了白名单或黑名单的正則表達式。
       */
      public List<KafkaStream> createMessageStreamsByFilter(
          TopicFilter topicFilter, int numStreams);
    
      /* 提交眼下消费到的offset */
      public commitOffsets()
      
      /* 关闭连接 */
      public shutdown()
    }

    这个API环绕着由KafkaStream实现的迭代器展开,每一个流代表一系列从一个或多个分区多和broker上汇聚来的消息。每一个流由一个线程处理。所以client能够在创建的时候通过參数指定想要几个流。一个流是多个分区多个broker的合并。可是每一个分区的消息仅仅会流向一流。

    每次通话createMessageStreams会consumer注册到topic上,此consumer和brokers负载平衡将之间调节。

    API每次调用创建激励许多人topic流动,以减少这种调整。createMessageStreamsByFilter方法来注册监听器可以感知一个新雅阁filter的tipic。

  • 相关阅读:
    WHERE col1=val1 AND col2=val2;index exists on col1 and col2, the appropriate rows can be fetched directly
    MySQL 交集 实现方法
    MBProgressHUD的使用
    Xcode4 使用 Organizer 分析 Crash logs(转)
    SimpleXML 使用详细例子
    PHP的XML Parser(转)
    iPhone,iPhone4,iPad程序启动画面的总结 (转)
    Pop3得到的Email 信件格式介绍
    yii总结
    隐藏Tabbar的一些方法
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/4590435.html
Copyright © 2011-2022 走看看