zoukankan      html  css  js  c++  java
  • kafka java代码实现消费者

    public class KafkaConsumer {
    
    	
    	public static void main(String[] args) {
    		Properties props = new Properties();
    		props.put("zookeeper.connect", "m6:2181,m7:2181,m8:2181");
    		props.put("group.id", "1111");
    		props.put("auto.offset.reset", "smallest");
    		ConsumerConfig conf = new ConsumerConfig(props);
    		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(conf);
    		Map<String, Integer> topicStrams = new HashMap<String, Integer>();
    		topicStrams.put("test", 1);
    		Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamsMap = consumer.createMessageStreams(topicStrams);
    		List<KafkaStream<byte[], byte[]>> messageStreams = messageStreamsMap.get("test");
    		for(final KafkaStream<byte[], byte[]> kafkaStream : messageStreams){
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
    						String msg = new String(mm.message());
    						System.out.println(msg);
    					}
    				}
    			
    			}).start();
    		
    		}
    	}
    }
    
    package cn.bigdata.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.javaapi.consumer.ZookeeperConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    public class KafkaConsumerDemo {
    	public static void main(String[] args) {
            Properties props = new Properties();
            // zookeeper地址
            props.put("zookeeper.connect", "m1:2181,m2:2181,m3:2181");
            // 消费者组id
            props.put("group.id", "22w2");
    		// smallest : 从头消费
    		// largest : 从最后消费
            props.put("auto.offset.reset", "smallest");
            ConsumerConfig conf = new ConsumerConfig(props);
            ConsumerConnector consumer = new ZookeeperConsumerConnector(conf);
            Map<String, Integer> topicStrams = new HashMap<String, Integer>();
            // 第二个数字是返回几个流,topic几个分区就陪几个流比较合理
            topicStrams.put("test2", 2);
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamsMap = consumer.createMessageStreams(topicStrams);
            List<KafkaStream<byte[], byte[]>> messageStreams = messageStreamsMap.get("test2");
            for(final KafkaStream<byte[], byte[]> kafkaStream : messageStreams){
                new Thread(new Runnable() {
                    public void run() {
                        for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                            String msg = new String(mm.message());
                            System.out.println(msg);
                        }
                    }
                 
                }).start();
             
            }
        }
    }
    

      

      

  • 相关阅读:
    [转]回车和换行
    计算机常见缩略词备忘录
    Linux多线程编程阅读链接
    字符串匹配KMP算法
    k8s测试集群部署
    搭建Vmware Harbor 镜像仓库
    GitLab搭建
    Gerrit2安装配置
    linux文件系统问题:wrong fs type, bad option, bad superblock
    Docker容器内不能联网的6种解决方案
  • 原文地址:https://www.cnblogs.com/heml/p/6074044.html
Copyright © 2011-2022 走看看