zoukankan      html  css  js  c++  java
  • kafka java动态获取topic并动态创建消费者

    1.获取所有topic

    package com.example.demo;
    import java.io.IOException;
    import java.util.List;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    public class zookeeper {
    
    	 public static void main(String[] args) {
    	        String connectString = "172.16.10.211:2181";
    	        int sessionTimeout = 4000;
    	        Watcher watcher = new Watcher() {
    	            public void process(WatchedEvent event) {
    	            }
    	        };
    	        try {
    	            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
    	            List<String> list = zooKeeper.getChildren("/brokers/topics", false);
    	            int len = list.size();
    	            for(int i = 1;i < len;i++){
    	            	System.out.println(list.get(i));
                  //此处动态生成消费者 //JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(list.get(i), 1); //new Thread(example).start(); } } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

     2.参考http://www.cnblogs.com/liuming1992/p/6432626.html生成消费者,这里进行了小小的改造

    package com.example.demo;
    import kafka.consumer.*;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 自定义简单Kafka消费者, 使用高级API
     * Created by gerry on 12/21.
     */
    public class JavaKafkaConsumerHighAPI implements Runnable {
        /**
         * Kafka数据消费对象
         */
        private ConsumerConnector consumer;
    
        /**
         * Kafka Topic名称
         */
        private String topic;
    
        /**
         * 线程数量,一般就是Topic的分区数量
         */
        private int numThreads;
    
        /**
         * 线程池
         */
        private ExecutorService executorPool;
    
        /**
         * 构造函数
         *
         * @param topic      Kafka消息Topic主题
         * @param numThreads 处理数据的线程数/可以理解为Topic的分区数
         * @param zookeeper  Kafka的Zookeeper连接字符串
         * @param groupId    该消费者所属group ID的值
         */
        public JavaKafkaConsumerHighAPI(String topic, int numThreads) {
            // 1. 创建Kafka连接器
            this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig("172.16.10.211:2181", "test-consumer-group"));
            // 2. 数据赋值
            this.topic = topic;
            this.numThreads = numThreads;
        }
    
        @Override
        public void run() {
            // 1. 指定Topic
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(this.topic, this.numThreads);
    
            // 2. 指定数据的解码器
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
    
            // 3. 获取连接数据的迭代器对象集合
            /**
             * Key: Topic主题
             * Value: 对应Topic的数据流读取器,大小是topicCountMap中指定的topic大小
             */
            Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    
            // 4. 从返回结果中获取对应topic的数据流处理器
            List<KafkaStream<String, String>> streams = consumerMap.get(this.topic);
    
            // 5. 创建线程池
            this.executorPool = Executors.newFixedThreadPool(this.numThreads);
    
            // 6. 构建数据输出对象
            int threadNumber = 0;
            for (final KafkaStream<String, String> stream : streams) {
                this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber,topic));
                threadNumber++;
            }
        }
    
        public void shutdown() {
            // 1. 关闭和Kafka的连接,这样会导致stream.hashNext返回false
            if (this.consumer != null) {
                this.consumer.shutdown();
            }
    
            // 2. 关闭线程池,会等待线程的执行完成
            if (this.executorPool != null) {
                // 2.1 关闭线程池
                this.executorPool.shutdown();
    
                // 2.2. 等待关闭完成, 等待五秒
                try {
                    if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
                        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted during shutdown, exiting uncleanly!!");
                }
            }
    
        }
    
        /**
         * 根据传入的zk的连接信息和groupID的值创建对应的ConsumerConfig对象
         *
         * @param zookeeper zk的连接信息,类似于:<br/>
         *                  hadoop-senior01.ibeifeng.com:2181,hadoop-senior02.ibeifeng.com:2181/kafka
         * @param groupId   该kafka consumer所属的group id的值, group id值一样的kafka consumer会进行负载均衡
         * @return Kafka连接信息
         */
        private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
            // 1. 构建属性对象
            Properties prop = new Properties();
            // 2. 添加相关属性
            prop.put("group.id", groupId); // 指定分组id
            prop.put("zookeeper.connect", zookeeper); // 指定zk的连接url
            prop.put("zookeeper.session.timeout.ms", "400"); //
            prop.put("zookeeper.sync.time.ms", "200");
            prop.put("auto.commit.interval.ms", "1000");
            // 3. 构建ConsumerConfig对象
            return new ConsumerConfig(prop);
        }
    
    
        /**
         * Kafka消费者数据处理线程
         */
        public static class ConsumerKafkaStreamProcesser implements Runnable {
            // Kafka数据流
            private KafkaStream<String, String> stream;
            // 线程ID编号
            private int threadNumber;
            private String topic;
    
            public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber,String topic) {
                this.stream = stream;
                this.threadNumber = threadNumber;
                this.topic = topic;
            }
    
            @Override
            public void run() {
                // 1. 获取数据迭代器
                ConsumerIterator<String, String> iter = this.stream.iterator();
                // 2. 迭代输出数据
                while (iter.hasNext()) {
                    // 2.1 获取数据值
                    MessageAndMetadata value = iter.next();
    
                    // 2.2 输出
                    System.out.println(this.threadNumber + "____" + value.offset() +"_____"+ topic + "____" + value.message());
                }
                // 3. 表示当前线程执行完成
                System.out.println("Shutdown Thread:" + this.threadNumber);
            }
        }
    }  
    

     3.pom

    <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.11</artifactId>
         <version>0.8.2.1</version>
    </dependency>

     4.

    package com.example.text;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.example.es.Es;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    public class KafkaConsumer implements Runnable {
    	
    	
    	
    	private static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    	private Map<String, Integer> topicCountMap;
    	private Properties props;
    
    	public KafkaConsumer(Map<String, Integer> topicCountMap, Properties props) {
    		this.topicCountMap = topicCountMap;
    		this.props = props;
    	}
    
    	@Override
    	public void run() {
    		ConsumerConnector consumer = null;
    		ExecutorService executor = null;
    		try {
    			consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    			Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(topicCountMap);
    			for (String topic : topicCountMap.keySet()) {
    				List<KafkaStream<byte[], byte[]>> msgStreamList = msgStreams.get(topic);
    				// 使用ExecutorService来调度线程
    				executor = Executors.newFixedThreadPool(topicCountMap.get(topic));
    				for (int i = 0; i < msgStreamList.size(); i++) {
    					KafkaStream<byte[], byte[]> kafkaStream = msgStreamList.get(i);
    					executor.submit(new HanldMessageThread(kafkaStream, i, topic));
    				}
    			} 
    
    		} catch (Exception e) {
    			if (consumer != null) {
    				consumer.shutdown();
    			}
    			if (executor != null) {
    				executor.shutdown();
    			}
    			try {
    			if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
    				logger.error("Timed out waiting for consumer threads to shutdown, exiting uncleanly");
    				}
    			} catch (InterruptedException e1) {
    				logger.error("Interrupted during shutdown, exiting uncleanly");
    		 }
    			logger.error(e.getMessage());
    		}
    	}
    
    }
    
    /**
     * 具体处理message的线程
     * 
     * @author Administrator
     *
     */
    class HanldMessageThread implements Runnable {
    
    	private KafkaStream<byte[], byte[]> kafkaStream = null;
    	private int num = 0;
    	private String topic;
    
    	public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num, String topic) {
    		super();
    		this.kafkaStream = kafkaStream;
    		this.num = num;
    		this.topic = topic;
    	}
    
    	public void run() {
    		ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
    		
    	
    		
    		while (iterator.hasNext()) {
    			String message = new String(iterator.next().message());
    //			System.out.println(Thread.currentThread().getName());  
    //			System.out.println(this.num + "____" + topic + "____" + message);
    //			System.out.println(Thread.currentThread().getId());
    //			System.out.println("Thread no: " + num + ", message: " + message);
    			if (topic.startsWith("xrs") || topic.startsWith("meitan") || topic.startsWith("qiyexinxi")) {
    				Es.setData(message, "xrs_db", topic);
    			} else if (topic.startsWith("search")) {
    				Es.setData(message, "pholcus_news_v1", topic);
    			} else {
    				Es.setData(message, "pholcus_db", topic);
    			}
    		}
    	}
    
    }
    
    private static Properties props;
    
    	static {
    		props = new Properties();
    		props.put("zookeeper.connect", "172.16.10.211:2181");
    		props.put("group.id", "test-consumer-group");
    		props.put("zookeeper.session.timeout.ms", "400");
    		props.put("zookeeper.sync.time.ms", "200");
    		props.put("auto.commit.interval.ms", "1000");
    	}
    
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    
    欢迎指正,交流沟通,共同进步!对您有帮助的话点下推荐~~
  • 相关阅读:
    [已解决] MAVEN安装代码到本地库,安装jar, source, javadoc的方式
    [已解决]Eclipse 插件Maven在使用 add dependency,找不到包,解决办法
    [已解决] windows 下 git 免输密码
    [已解决] windows 80端口被占用
    [已解决]Tomcat启动报 java.net.BindException: Address already in use: JVM_Bind
    [已解决] java.net.InetAddress.getHostName() 阻塞问题
    [已解决] 日常开发中禁用Tomcat自动重启
    [已解决] MyBatis 中bind用法
    [转]SOCKET通信中TCP、UDP数据包大小的确定
    使用beautifulsoup与requests爬取数据
  • 原文地址:https://www.cnblogs.com/gaoyawei/p/7723974.html
Copyright © 2011-2022 走看看