zoukankan      html  css  js  c++  java
  • kafka java代码实现消费者

    public class KafkaConsumer {
    
    	
    	public static void main(String[] args) {
    		Properties props = new Properties();
    		props.put("zookeeper.connect", "m6:2181,m7:2181,m8:2181");
    		props.put("group.id", "1111");
    		props.put("auto.offset.reset", "smallest");
    		ConsumerConfig conf = new ConsumerConfig(props);
    		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(conf);
    		Map<String, Integer> topicStrams = new HashMap<String, Integer>();
    		topicStrams.put("test", 1);
    		Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamsMap = consumer.createMessageStreams(topicStrams);
    		List<KafkaStream<byte[], byte[]>> messageStreams = messageStreamsMap.get("test");
    		for(final KafkaStream<byte[], byte[]> kafkaStream : messageStreams){
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
    						String msg = new String(mm.message());
    						System.out.println(msg);
    					}
    				}
    			
    			}).start();
    		
    		}
    	}
    }
    
    package cn.bigdata.kafka;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.javaapi.consumer.ZookeeperConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    public class KafkaConsumerDemo {
    	public static void main(String[] args) {
            Properties props = new Properties();
            // zookeeper地址
            props.put("zookeeper.connect", "m1:2181,m2:2181,m3:2181");
            // 消费者组id
            props.put("group.id", "22w2");
    		// smallest : 从头消费
    		// largest : 从最后消费
            props.put("auto.offset.reset", "smallest");
            ConsumerConfig conf = new ConsumerConfig(props);
            ConsumerConnector consumer = new ZookeeperConsumerConnector(conf);
            Map<String, Integer> topicStrams = new HashMap<String, Integer>();
            // 第二个数字是返回几个流,topic几个分区就陪几个流比较合理
            topicStrams.put("test2", 2);
            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreamsMap = consumer.createMessageStreams(topicStrams);
            List<KafkaStream<byte[], byte[]>> messageStreams = messageStreamsMap.get("test2");
            for(final KafkaStream<byte[], byte[]> kafkaStream : messageStreams){
                new Thread(new Runnable() {
                    public void run() {
                        for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                            String msg = new String(mm.message());
                            System.out.println(msg);
                        }
                    }
                 
                }).start();
             
            }
        }
    }
    

      

      

  • 相关阅读:
    2.2 建立示例数据库
    2.1.4 基本概念
    8. 在NOARCHIVELOG模式下用户管理的备份与恢复
    2.1.2 系统全局区
    考过042
    ORA1157错误解决手册(转)
    4. 用户管理的备份
    Kohana之LOG使用
    jq插件之easing
    Svn服务器之必须提交更改日志篇
  • 原文地址:https://www.cnblogs.com/heml/p/6074044.html
Copyright © 2011-2022 走看看