zoukankan      html  css  js  c++  java
  • spark记录(19)SparkStreaming之从kafkaBroker和zookeeper获取offset,和使用zookeeper管理offset

    一、从kafkaBroker获取offset

    /**
     * 测试之前需要启动kafka
     * @author root
     *
     */
    public class GetTopicOffsetFromKafkaBroker {
        public static void main(String[] args) {
            
            Map<TopicAndPartition, Long> topicOffsets = getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
            Set<Entry<TopicAndPartition, Long>> entrySet = topicOffsets.entrySet();
            for(Entry<TopicAndPartition, Long> entry : entrySet) {
                TopicAndPartition topicAndPartition = entry.getKey();
                Long offset = entry.getValue();
                String topic = topicAndPartition.topic();
                int partition = topicAndPartition.partition();
                System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
            }
        
        }
        
        /**
         * 从kafka集群中得到当前topic,生产者在每个分区中生产消息的偏移量位置
         * @param KafkaBrokerServer
         * @param topic
         * @return
         */
        public static Map<TopicAndPartition,Long> getTopicOffsets(String KafkaBrokerServer, String topic){
            Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
            
            for(String broker:KafkaBrokerServer.split(",")){
                
                SimpleConsumer simpleConsumer = new SimpleConsumer(broker.split(":")[0],Integer.valueOf(broker.split(":")[1]), 64*10000,1024,"consumer"); 
                TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));
                TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
                
                for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
                    for (PartitionMetadata part : metadata.partitionsMetadata()) {
                        Broker leader = part.leader();
                        if (leader != null) { 
                            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); 
                            
                            PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 10000); 
                            OffsetRequest offsetRequest = new OffsetRequest(ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); 
                            OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); 
                            
                            if (!offsetResponse.hasError()) { 
                                long[] offsets = offsetResponse.offsets(topic, part.partitionId()); 
                                retVals.put(topicAndPartition, offsets[0]);
                            }
                        }
                    }
                }
                simpleConsumer.close();
            }
            return retVals;
        }
    }

    二、从zookeeper获取offset

    public class GetTopicOffsetFromZookeeper {
        
        public static Map<TopicAndPartition,Long> getConsumerOffsets(String zkServers,String groupID, String topic) { 
            Map<TopicAndPartition,Long> retVals = new HashMap<TopicAndPartition,Long>();
            
            ObjectMapper objectMapper = new ObjectMapper();
            CuratorFramework  curatorFramework = CuratorFrameworkFactory.builder()
                    .connectString(zkServers).connectionTimeoutMs(1000)
                    .sessionTimeoutMs(10000).retryPolicy(new RetryUntilElapsed(1000, 1000)).build();
            
            curatorFramework.start();
            
            try{
                String nodePath = "/consumers/"+groupID+"/offsets/" + topic;
                if(curatorFramework.checkExists().forPath(nodePath)!=null){
                    List<String> partitions=curatorFramework.getChildren().forPath(nodePath);
                    for(String partiton:partitions){
                        int partitionL=Integer.valueOf(partiton);
                        Long offset=objectMapper.readValue(curatorFramework.getData().forPath(nodePath+"/"+partiton),Long.class);
                        TopicAndPartition topicAndPartition=new TopicAndPartition(topic,partitionL);
                        retVals.put(topicAndPartition, offset);
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
            }
            curatorFramework.close();
            
            return retVals;
        } 
        
        
        public static void main(String[] args) {
            Map<TopicAndPartition, Long> consumerOffsets = getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
            Set<Entry<TopicAndPartition, Long>> entrySet = consumerOffsets.entrySet();
            for(Entry<TopicAndPartition, Long> entry : entrySet) {
                TopicAndPartition topicAndPartition = entry.getKey();
                String topic = topicAndPartition.topic();
                int partition = topicAndPartition.partition();
                Long offset = entry.getValue();
                System.out.println("topic = "+topic+",partition = "+partition+",offset = "+offset);
            }
        }
    }

    三、使用zookeeper管理offset

    public class UseZookeeperManageOffset {
        /**
         * 使用log4j打印日志,“UseZookeeper.class” 设置日志的产生类
         */
        static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
        
        
        public static void main(String[] args) {
            /**
             * 加载log4j的配置文件,方便打印日志
             */
            ProjectUtil.LoadLogConfig();
            logger.info("project is starting...");
            
            /**
             * 从kafka集群中得到topic每个分区中生产消息的最大偏移量位置
             */
            Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
            
            /**
             * 从zookeeper中获取当前topic每个分区 consumer 消费的offset位置
             */
            Map<TopicAndPartition, Long> consumerOffsets = 
                    GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
            
            /**
             * 合并以上得到的两个offset ,
             *     思路是:
             *         如果zookeeper中读取到consumer的消费者偏移量,那么就zookeeper中当前的offset为准。
             *         否则,如果在zookeeper中读取不到当前消费者组消费当前topic的offset,就是当前消费者组第一次消费当前的topic,
             *             offset设置为topic中消息的最大位置。
             */
            if(null!=consumerOffsets && consumerOffsets.size()>0){
                topicOffsets.putAll(consumerOffsets);
            }
            /**
             * 如果将下面的代码解开,是将topicOffset 中当前topic对应的每个partition中消费的消息设置为0,就是从头开始。
             */
    //        for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){
    //          item.setValue(0l);
    //        }
            
            /**
             * 构建SparkStreaming程序,从当前的offset消费消息
             */
            JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy");
            jsc.start();
            jsc.awaitTermination();
            jsc.close();
            
        }
    }
  • 相关阅读:
    【原创】(四)Linux进程调度-组调度及带宽控制
    【原创】(三)Linux进程调度器-进程切换
    【原创】(一)Linux进程调度器-基础
    【原创】(十六)Linux内存管理之CMA
    【原创】(十五)Linux内存管理之RMAP
    【原创】(十四)Linux内存管理之page fault处理
    我是如何学习和工作的(3)
    亲人的离去、爱情与婚姻
    工作机会少有时反而是一件好事
    Hong Kong Azure / .NET club first meetup
  • 原文地址:https://www.cnblogs.com/kpsmile/p/10490283.html
Copyright © 2011-2022 走看看