zoukankan      html  css  js  c++  java
  • kafka生产者、消费者java示例

    1. 生产者

    import java.util.Properties; 
       
    import kafka.javaapi.producer.Producer; 
    import kafka.producer.KeyedMessage; 
    import kafka.producer.ProducerConfig; 
       
    public class MyProducer {   
         
            public static void main(String[] args) {   
                Properties props = new Properties();   
                props.setProperty("metadata.broker.list","localhost:9092");   
                props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
                props.put("request.required.acks","1");   
                ProducerConfig config = new ProducerConfig(props);   
                //创建生产这对象
                Producer<String, String> producer = new Producer<String, String>(config);
                //生成消息
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
                try {   
                    int i =1; 
                    while(i < 100){    
                        //发送消息
                        producer.send(data);   
                    } 
                } catch (Exception e) {   
                    e.printStackTrace();   
                }   
                producer.close();   
            }   
    }
    View Code

    2. 消费者

    import java.util.HashMap; 
    import java.util.List;   
    import java.util.Map;   
    import java.util.Properties;   
         
    import kafka.consumer.ConsumerConfig;   
    import kafka.consumer.ConsumerIterator;   
    import kafka.consumer.KafkaStream;   
    import kafka.javaapi.consumer.ConsumerConnector;  
       
    public class MyConsumer extends Thread{ 
            //消费者连接
            private final ConsumerConnector consumer;   
            //要消费的话题
            private final String topic;   
         
            public MyConsumer(String topic) {   
                consumer =kafka.consumer.Consumer   
                        .createJavaConsumerConnector(createConsumerConfig());   
                this.topic =topic;   
            }   
         
        //配置相关信息
        private static ConsumerConfig createConsumerConfig() {   
            Properties props = new Properties();   
    //        props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
            //配置要连接的zookeeper地址与端口
            //The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster.
            //Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group
            props.put("zookeeper.connect","localhost:2181");
            
            //配置zookeeper的组id (The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.)
            props.put("group.id", "0");
            
            //配置zookeeper连接超时间隔
            //The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for 
            //ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.
            props.put("zookeeper.session.timeout.ms","10000"); 
     
            //The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.
            props.put("zookeeper.sync.time.ms", "200");
    
            //The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. 
            //Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);   
        }   
         
        public void run(){ 
            
            Map<String,Integer> topickMap = new HashMap<String, Integer>();   
            topickMap.put(topic, 1);   
            Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
            
            KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
            ConsumerIterator<byte[],byte[]> it =stream.iterator();   
            System.out.println("*********Results********");   
            while(true){   
                if(it.hasNext()){ 
                    //打印得到的消息   
                    System.err.println(Thread.currentThread()+" get data:" +new String(it.next().message()));   
                } 
                try {   
                    Thread.sleep(1000);   
                } catch (InterruptedException e) {   
                    e.printStackTrace();   
                }   
            }   
        }  
        
        
        public static void main(String[] args) {   
            MyConsumer consumerThread = new MyConsumer("mykafka");   
            consumerThread.start();   
        }   
    }
    View Code

    3. 消费者的线程执行器实现

      首先建立一个处理消息的类Consumer

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    public class Consumer implements Runnable {
        
        private KafkaStream stream;
        private int threadNumber;
     
        public Consumer(KafkaStream a_stream, int a_threadNumber) {
            threadNumber = a_threadNumber;
            stream = a_stream;
        }
     
        public void run() {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
            System.out.println("Shutting down Thread: " + threadNumber);
        }
    }
    View Code

      其次实现多线程的调用

    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    import java.util.concurrent.*; 
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ConsumerGroup {
        private final ConsumerConnector consumer;
        private final String topic;
        private  ExecutorService executor;
     
        public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig(a_zookeeper, a_groupId));
            this.topic = a_topic;
        }
     
        public void shutdown() {
            if (consumer != null) consumer.shutdown();
            if (executor != null) executor.shutdown();
            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted during shutdown, exiting uncleanly");
            }
       }
     
        public void run(int a_numThreads) {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(a_numThreads));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
     
            // now launch all the threads
            //
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            // now create an object to consume the messages
            //
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new Consumer(stream, threadNumber));
                threadNumber++;
            }
        }
     
        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
        }
     
        public static void main(String[] args) {
            String zooKeeper = "localhost:2181";
            String groupId = "0";
            String topic = "mykafka";
            int threads = 2;  //启动的线程数
     
            ConsumerGroup group = new ConsumerGroup(zooKeeper, groupId, topic);
            group.run(threads);
     
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ie) {
     
            }
            group.shutdown();
        }
    }
    View Code
  • 相关阅读:
    nginx 服务器重启命令,关闭
    eclipse实现热部署和热启动
    Intellij IDEA 文件修改提示星号
    IntelliJ IDEA 自动编译功能无法使用,On 'update' action:选项里面没有update classes and resources这项
    idea最常使用的快捷键
    centos 切换用户显示bash-4.2$,不显示用户名路径的问题
    汉诺塔
    C语言笔记
    @org.springframework.beans.factory.annotation.Autowired(required=true)
    Error creating bean with name 'xxxx' defined in URL
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/4950232.html
Copyright © 2011-2022 走看看