zoukankan      html  css  js  c++  java
  • java 与 CDH kafka集成

    本文主要是通过在网上找到的例子进行演示:
    一、说明
         开发环境如下:
         idea + jdk 1.8 + maven
         maven 中引用的架包如下:
      

    二、 生产者

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaProducer {
        private static KafkaProducer<String, String> producer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaProducer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            //设置分区类,根据key进行数据分区
            producer = new KafkaProducer<String, String>(props);
        }
        public void produce(){
            for (int i = 50;i<100;i++){
                String key = String.valueOf(i);
                String data = "hello kafka message:"+key;
                producer.send(new ProducerRecord<String, String>(TOPIC,key,data));
                System.out.println(data);
            }
            producer.close();
        }
    
        public static void main(String[] args) {
            new SimpleKafkaProducer().produce();
        }
    }
    View Code

    三、消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.log4j.Logger;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Created by liuxiaolei on 2018/9/29.
     */
    public class SimpleKafkaConsumer {
        private static KafkaConsumer<String, String> consumer;
        private final static String TOPIC = "adienTest2";
        public SimpleKafkaConsumer(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.103:9092,192.168.0.105:9092,192.168.0.107:9092");
            //每个消费者分配独立的组号
            props.put("group.id", "test2");
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
            //自动重置offset
            props.put("auto.offset.reset","earliest");
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<String, String>(props);
        }
    
        public void consume(){
            consumer.subscribe(Arrays.asList(TOPIC));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value());
                    System.out.println();
                }
            }
        }
    
        public static void main(String[] args) {
    
            new SimpleKafkaConsumer().consume();
        }
    }
    View Code

    四、结果

      生产者插入的数据,如下:

        

      消费者消费的数据,如下:

        

  • 相关阅读:
    HTML_项目符号使用图片
    字符串查找和替换接口
    AOP代理分析
    3星|董藩《房地产的逻辑》:应该鼓励开发商多盖房而不是惩罚开发商
    2.5星|郎咸平《拯救世界的经济学》:各发达国家与中国的福利政策、经济干预政策的前世今生
    3.5星|科特勒《营销革命4.0》:打造无缝衔接的线上和线下体验
    3星|《韩国式资本主义》:财阀祸害韩国,韩国需要正义的资本主义
    4星|《特朗普时代的全球化战略》:管理学界和管理者可能严重低估了核心管理实践的价值
    2星|《内容创业》:知识付费行业的公开资料整理汇编
    3星|《身边的博弈》:10年旧书,博弈论科普和习题讲解
  • 原文地址:https://www.cnblogs.com/xiqing/p/9759123.html
Copyright © 2011-2022 走看看