zoukankan      html  css  js  c++  java
  • Consumer Group Example

     

    面向kafka编程

    Consumer Group Example

    https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

    The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.

    The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.

     https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

    http://kafka.apache.org/documentation.html

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    

    【多对多】

    package com.test.groups;
     
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
     
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ConsumerGroupExample {
        private final ConsumerConnector consumer;
        private final String topic;
        private  ExecutorService executor;
     
        public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig(a_zookeeper, a_groupId));
            this.topic = a_topic;
        }
     
        public void shutdown() {
            if (consumer != null) consumer.shutdown();
            if (executor != null) executor.shutdown();
            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted during shutdown, exiting uncleanly");
            }
       }
     
        public void run(int a_numThreads) {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(a_numThreads));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
     
            // now launch all the threads
            //
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            // now create an object to consume the messages
            //
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new ConsumerTest(stream, threadNumber));
                threadNumber++;
            }
        }
     
        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
        }
     
        public static void main(String[] args) {
            String zooKeeper = args[0];
            String groupId = args[1];
            String topic = args[2];
            int threads = Integer.parseInt(args[3]);
     
            ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
            example.run(threads);
     
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ie) {
     
            }
            example.shutdown();
        }
    }
    
    group.id A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy. string
  • 相关阅读:
    elasticsearch的安装
    default_scope and unscoped
    RSpec + Spork + Autotest 给Rails 3添加快速自动化测试
    ubuntu收过带个winmail.dat的邮件
    网站链接
    github
    js笔记
    mba首页js
    mba精锐视角js
    mongodb常用命令
  • 原文地址:https://www.cnblogs.com/rsapaper/p/7660631.html
Copyright © 2011-2022 走看看