zoukankan      html  css  js  c++  java
  • 设计Kafka的High Level Consumer

    原文:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

    为什么使用High Level Consumer

    1. 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们只关心数据能被消费即可。High Level 就是用于抽象这类消费动作的。

    2. 消息消费已Consumer Group为单位,每一个Consumer Group中能够有多个consumer。每一个consumer是一个线程,topic的每一个partition同一时候仅仅能被某一个consumer读 取,Consumer Group相应的每一个partition都有一个最新的offset的值,存储在zookeeper上的。所以不会出现反复消费的情况。

    3. 由于consumer的offerset并非实时的传送到zookeeper(通过配置来制定更新周期)。所以Consumer假设突然Crash,有可能会读取反复的信息

    设计High Level Consumer

    High Level Consumer 能够而且应该被使用在多线程的环境。线程模型中线程的数量(也代表group中consumer的数量)和topic的partition数量有关。以下列举一些规则:

    1. 当提供的线程数量多于partition的数量,则部分线程将不会接收到消息。
    2. 当提供的线程数量少于partition的数量,则部分线程将从多个partition接收消息。
    3. 当某个线程从多个partition接收消息时,不保证接收消息的顺序;可能出现从partition3接收5条消息。从partition4接收6条消息。接着又从partition3接收10条消息;
    4. 当加入很多其它线程时。会引起kafka做re-balance, 可能改变partition和线程的相应关系。
    5. 由于突然停止Consumer以及Broker会导致消息反复读的情况,为了避免这样的情况在shutdown之前通过Thread.sleep(10000)让Consumer有时间将offset同步到zookeeper

    样例

    Maven依赖

          <!--Kafka 消息依赖-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.8.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.8.2.0</version>
            </dependency>


    Consumer 线程


    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.message.MessageAndMetadata;
    
    public class ConsumerThread implements Runnable {
     private KafkaStream kafkaStream;
     //线程编号
     private int threadNumber;
     public ConsumerThread(KafkaStream kafkaStream, int threadNumber) {
      this.threadNumber = threadNumber;
      this.kafkaStream = kafkaStream;
     }
     public void run() {
      ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
      StringBuffer sb = new StringBuffer();
    //该循环会持续从Kafka读取数据,直到手工的将进程进行中断
      while (it.hasNext()) {
       MessageAndMetadata metaData = it.next();
       sb.append("Thread: " + threadNumber + " ");
       sb.append("Part: " + metaData.partition() + " ");
       sb.append("Key: " + metaData.key() + " ");
       sb.append("Message: " + metaData.message() + " ");
       sb.append("
    ");
       System.out.println(sb.toString());
      }
      System.out.println("Shutting down Thread: " + threadNumber);
     }
    }
    


    其余程序


    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
     
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ConsumerGroupExample {
        private final ConsumerConnector consumer;
        private final String topic;
        private  ExecutorService executor;
     
        public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig(a_zookeeper, a_groupId));
            this.topic = a_topic;
        }
     
        public void shutdown() {
            if (consumer != null) consumer.shutdown();
            if (executor != null) executor.shutdown();
        }
     
        public void run(int a_numThreads) {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(a_numThreads));
            //返回的Map包括全部的Topic以及相应的KafkaStream
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
     
            //创建Java线程池
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            // 创建 consume 线程消费messages
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new ConsumerTest(stream, threadNumber));
                threadNumber++;
            }
        }
     
        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            //指定连接的Zookeeper集群。通过该集群来存储连接到某个Partition的Consumer的Offerset
            props.put("zookeeper.connect", a_zookeeper);
           //consumer group 的ID
            props.put("group.id", a_groupId);
            //Kafka等待Zookeeper的响应时间(毫秒)
            props.put("zookeeper.session.timeout.ms", "400");
           //ZooKeeper 的‘follower’能够落后Master多少毫秒
            props.put("zookeeper.sync.time.ms", "200");
          //consumer更新offerset到Zookeeper的时间
            props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
        }
     
        public static void main(String[] args) {
            String zooKeeper = args[0];
            String groupId = args[1];
            String topic = args[2];
            int threads = Integer.parseInt(args[3]);
     
            ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
            example.run(threads);
             //由于consumer的offerset并非实时的传送到zookeeper(通过配置来制定更新周期),所以shutdown Consumer的线程,有可能会读取反复的信息
            //添加sleep时间,让consumer把offset同步到zookeeper
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ie) {
     
            }
            example.shutdown();
        }
    }


  • 相关阅读:
    比较Maven和Ant
    解决浏览器缓存
    Servlet--HttpServletResponse的2个操作流的方法
    Servlet--j2e中文乱码解决
    java乱码详解(java中byte与char的转换)
    linux中操作java进程
    Servlet--超链接,表单提交,重定向,转发4种情况的路径
    物理路径,相对路径,绝对路径以及根目录
    Servlet--转发和重定向
    Servlet--传参和接参
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/5093267.html
Copyright © 2011-2022 走看看