zoukankan      html  css  js  c++  java
  • java 与 CDH kafka集成

    本文主要是通过在网上找到的例子进行演示:
    一、说明
         开发环境如下:
         idea + jdk 1.8 + maven
         maven 中引用的架包如下:
      

    二、 生产者

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaProducer {
        private static KafkaProducer<String, String> producer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaProducer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //设置分区类,根据key进行数据分区
            producer = new KafkaProducer<String, String>(props);
        }
        public void produce(){
            for (int i = 50;i<100;i++){
                String key = String.valueOf(i);
                String data = "hello kafka message:"+key;
                producer.send(new ProducerRecord<String, String>(TOPIC,key,data));
                System.out.println(data);
            }
            producer.close();
        }
    
        public static void main(String[] args) {
            new SimpleKafkaProducer().produce();
        }
    }
    View Code

    三、消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.log4j.Logger;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaConsumer {
        private static KafkaConsumer<String, String> consumer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            //每个消费者分配独立的组号
            props.put("group.id", "test2");
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
            //自动重置offset
            props.put("auto.offset.reset","earliest");
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);
        }
    
        public void consume(){
            consumer.subscribe(Arrays.asList(TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                    System.out.println();
                }
            }
        }
    
        public static void main(String[] args) {
    
            new SimpleKafkaConsumer().consume();
        }
    }
    View Code

    四、结果

      生产者插入的数据,如下:

        

      消费者消费的数据,如下:

        

  • 相关阅读:
    4.2Python数据处理篇之Matplotlib系列(二)---plt.scatter()散点图
    4.1Python数据处理篇之Matplotlib系列(一)---初识Matplotlib
    3.8Python数据处理篇之Numpy系列(八)---Numpy的梯度函数
    3.7Python数据处理篇之Numpy系列(七)---Numpy的统计函数
    3.6Python数据处理篇之Numpy系列(六)---Numpy随机函数
    3.5Python数据处理篇之Numpy系列(五)---numpy文件的存取
    3.4Python数据处理篇之Numpy系列(四)---ndarray 数组的运算
    3.3Python数据处理篇之Numpy系列(三)---数组的索引与切片
    3.2Python数据处理篇之Numpy系列(二)--- ndarray数组的创建与变换
    3.1Python数据处理篇之Numpy系列(一)---ndarray对象的属性与numpy的数据类型
  • 原文地址:https://www.cnblogs.com/xiqing/p/9759123.html
Copyright © 2011-2022 走看看