zoukankan      html  css  js  c++  java
  • kafka查询指定时间数据的偏移量

    package com.unimas.test;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.TreeMap;
    
    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.consumer.SimpleConsumer;
    
    public class KafkaOffsetSearch {
    public Map<String,String> getPartitonAndOffset() {
          int port = 6667;    //端口
          private static  long timestamp = 1459209600000l; //要查询的时间
          String topic = "topic1";  //指定主题
          List<String> seeds = new ArrayList<String>(); //kafka broke地址
          seeds.add("11.11.184.172");
          seeds.add("11.11.184.174");
          seeds.add("11.11.184.183");
          seeds.add("11.11.184.167");
          seeds.add("11.11.184.177");
    
            KafkaOffsetSearch kot = new KafkaOffsetSearch();
    
            TreeMap<Integer,PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic);
            Map<String,String> map = new HashMap<String,String>();
            List<Long> offSetList = new ArrayList<>();
            for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) {
                int partition = entry.getKey();
                String leadBroker = entry.getValue().leader().host();
                String clientName = "Client_" + topic + "_" + partition;
                SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000,64 * 1024, clientName);
                long readOffset = getLastOffset(consumer, topic, partition,
                        timestamp, clientName);
                offSetList.add(readOffset);
                map.put(partition+"", readOffset+"");
                System.out.println(partition+":"+readOffset);
                if(consumer!=null)consumer.close();
            }
            
            return map;
    
        }
    
    
    
        public static long getLastOffset(SimpleConsumer consumer, String topic,
                int partition, long whichTime, String clientName) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
                    partition);
            Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
                    whichTime, 1));
            kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                    requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
                    clientName);
            OffsetResponse response = consumer.getOffsetsBefore(request);
    
            if (response.hasError()) {
                System.out
                        .println("Error fetching data Offset Data the Broker. Reason: "
                                + response.errorCode(topic, partition));
                return 0;
            }
            long[] offsets = response.offsets(topic, partition);
            return offsets[0];
        }
    
        private TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers,
                int a_port, String a_topic) {
            TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
            loop: for (String seed : a_seedBrokers) {
                SimpleConsumer consumer = null;
                try {
                    consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
                            "leaderLookup"+new Date().getTime());
                    List<String> topics = Collections.singletonList(a_topic);
                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
                    List<TopicMetadata> metaData = resp.topicsMetadata();
                    for (TopicMetadata item : metaData) {
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            map.put(part.partitionId(), part);
                        }
                    }
                } catch (Exception e) {
                    System.out.println("Error communicating with Broker [" + seed
                            + "] to find Leader for [" + a_topic + ", ] Reason: " + e);
                } finally {
                    if (consumer != null)
                        consumer.close();
                }
            }
            return map;
        }
    
    }
  • 相关阅读:
    hibernate常用配置
    hibernate快速入门
    【转】Struts2中json插件的使用
    【转】Struts2解决表单重复提交问题
    OGNL表示式使用和值栈
    Python就是为了方便生活,比如看VIP电影
    使用python进行面部合成,比PS好用多了
    Python黑科技,教你学会Django系统错误监控
    Python这么厉害的么?一次爬完整站小说
    Linux优化不知如何下手?那你的看看这篇文章了
  • 原文地址:https://www.cnblogs.com/ygwx/p/5337587.html
Copyright © 2011-2022 走看看