zoukankan      html  css  js  c++  java
  • kafka生产者消费者示例代码

    生产者:

     package com.unimas.test;
     
    import java.util.Properties;
     
    import kafka.javaapi.producer.Producer;
     import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
     
     public class ProducerDemo {
         public static void main(String[] args) throws Exception {
             Properties props = new Properties();
             props.put("zk.connect", "11.11.165.81:2181,11.11.165.82:2181,11.11.165.83:2181");
            props.put("metadata.broker.list","11.11.165.81:6667,11.11.165.82:6667,11.11.165.83:6667");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
    
            // 发送业务消息
            // 读取文件 读取内存数据库 读socket端口
            for (int i = 1; i <= 100; i++) {

    Thread.sleep(500);

    producer.send(new KeyedMessage<String, String>("ANALYSIS-RM-SCREEN", "kafka is a perfect message queue system ----"+ i)); } } }

    消费者:

     package com.unimas.test;
     
     import java.util.HashMap;
    import java.util.List;
     import java.util.Map;
     import java.util.Properties;
     
     import kafka.consumer.Consumer;
     import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    public class ConsumerDemo {
        private static final String topic = "ANALYSIS-RM-SCREEN";
        private static final Integer threads = 1;
    
        public static void main(String[] args) {
            
            Properties props = new Properties();
            props.put("zookeeper.connect", "f14cp-kf1-02:2181,f14cp-kf1-03:2181,f14cp-kf1-04:2181");
            props.put("group.id", "2222");
            props.put("auto.offset.reset", "smallest");
    
            ConsumerConfig config = new ConsumerConfig(props);
            
            ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 2);
    
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
            
            for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                            String msg = new String(mm.message());
                            
                            System.out.println(msg);
                        }
                    }
                
                }).start();
            
            }
        }
    }
  • 相关阅读:
    迭代器接口
    实现Promise
    学学springboot吧!!!!
    了解一下连接池!!!!
    Tomcat version 6.0 only supports J2EE 1.2, 1.3, 1.4, and Java EE 5 Web modules ???报错!!!
    requset和response的区别????
    百度也太神奇了吧
    这个svn啊,真的是有点看不懂
    BootStrap???确实厉害
    突然看到原来除了jar包还有war包啊?????
  • 原文地址:https://www.cnblogs.com/ygwx/p/5075803.html
Copyright © 2011-2022 走看看