zoukankan      html  css  js  c++  java
  • java 与 CDH kafka集成

    本文主要是通过在网上找到的例子进行演示:
    一、说明
         开发环境如下:
         idea + jdk 1.8 + maven
         maven 中引用的架包如下:
      

    二、 生产者

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaProducer {
        private static KafkaProducer<String, String> producer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaProducer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //设置分区类,根据key进行数据分区
            producer = new KafkaProducer<String, String>(props);
        }
        public void produce(){
            for (int i = 50;i<100;i++){
                String key = String.valueOf(i);
                String data = "hello kafka message:"+key;
                producer.send(new ProducerRecord<String, String>(TOPIC,key,data));
                System.out.println(data);
            }
            producer.close();
        }
    
        public static void main(String[] args) {
            new SimpleKafkaProducer().produce();
        }
    }
    View Code

    三、消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.log4j.Logger;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaConsumer {
        private static KafkaConsumer<String, String> consumer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            //每个消费者分配独立的组号
            props.put("group.id", "test2");
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
            //自动重置offset
            props.put("auto.offset.reset","earliest");
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);
        }
    
        public void consume(){
            consumer.subscribe(Arrays.asList(TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                    System.out.println();
                }
            }
        }
    
        public static void main(String[] args) {
    
            new SimpleKafkaConsumer().consume();
        }
    }
    View Code

    四、结果

      生产者插入的数据,如下:

        

      消费者消费的数据,如下:

        

  • 相关阅读:
    JS-得到屏幕宽高、页面宽高
    CSS3-border-radius 属性
    从30岁到35岁:为你的生命多积累一些厚度【转载】
    HTML5-IOS WEB APP应用程序(IOS META)
    HTML-Meta中的viewport指令
    EasyUI-window包含一个iframe,在iframe中如何关闭window
    JS-为句柄添加监听函数
    EasyUI-EasyUI框架入门学习
    Linux下的C编程
    ***经典笔试题
  • 原文地址:https://www.cnblogs.com/xiqing/p/9759123.html
Copyright © 2011-2022 走看看