zoukankan      html  css  js  c++  java
  • Kafka 生产者和消费者入门代码基础

    这篇博文讲解Kafka 的生产者和消费者实例。

    基础版本一

    生产者

    ProducerFastStart.java
    package com.xingyun.tutorial_1;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class ProducerFastStart {
    
        public static final String brokerList="192.168.10.137:9092";
    
        // public static final String topic="topic-demo";
        public static final String topic="ODS-PSR-P.*";
    
        public static void main(String args[]){
    
           //配置生产者客户端参数
            //将配置序列化
            Properties properties=new Properties();
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("bootstrap.servers",brokerList);
    
            //创建KafkaProducer 实例
            KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
    
            //构建待发送的消息
            ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,"hello Kafka!");
            try {
                //尝试发送消息
                producer.send(record);
                //打印发送成功
                System.out.println("send success from producer");
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //关闭生产者客户端实例
                producer.close();
            }
        }
    }
    消费者
    ConsumerFastStart.java
    package com.xingyun.tutorial_1;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class ConsumerFastStart {
    
      //  public static final String brokerList="192.168.10.137:9092";
        public static final String brokerList="10.221.148.217:9092 ,10.221.148.217:9093 ,10.221.148.217:9094";
    
       // public static final String topic="topic-demo";
        public static final String topic="ODS-PSR-P.*";
    
    
        public static final String groupId="group.demo";
    
        public static void main(String args[]){
    
            //设置消费组的名称
            //将属性值反序列化
            Properties properties=new Properties();
            properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("bootstrap.servers",brokerList);
            properties.put("group.id",groupId);
    
            //创建一个消费者客户端实例
            KafkaConsumer<String,String> consumer=new KafkaConsumer<>(properties);
    
            //订阅主题
            consumer.subscribe(Collections.singletonList(topic));
    
            //循环消费消息
            while (true){
                ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record:records){
                    System.out.println("receiver a message from consumer client:"+record.value());
                }
            }
        }
    }

    升级版本一:
    生产者

    KafkaProducerAnalysis.java
    package com.xingyun.tutorial_2;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProducerAnalysis {
    
        public static final String brokerList="192.168.10.137:9092";
    
        public static final String topic="topic-demo";
    
        public static void main(String args[]){
    
            //配置生产者客户端参数
            Properties properties=initConfig();
    
            //创建相应的生产者实例
            KafkaProducer<String,String> producer=new KafkaProducer<>(properties);
    
            //构建待发送的消息  topic 和value 是必填项
            ProducerRecord<String,String> record=new ProducerRecord<>(topic,"hello,Kafka!");
    
            try {
                //发送消息
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //关闭生产者实例
                producer.close();
            }
    
        }
    
    
        /**
         * 配置生产者客户端参数
         * */
        private static Properties initConfig() {
            Properties properties=new Properties();
           // properties.put("bootstrap.servers",brokerList);
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
            // properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            //properties.put("client.id","producer.client.id.demo");//指定客户端ID
            properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");//指定客户端ID
            return properties;
        }
    
    
    }
    消费者
    KafkaConsumerAnalysis.java
    package com.xingyun.tutorial_2;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.Deserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class KafkaConsumerAnalysis {
    
        public static final String brokerList="";
        public static final String topic="topic-demo";
        public static final String groupId="group.demo";
    
        public static final AtomicBoolean isRunning=new AtomicBoolean(true);
    
        public static void main(String[] args){
    
            //配置消费者客户端参数
            Properties properties=initConfig();
    
            //创建相应的消费者实例
            KafkaConsumer<String,String> consumer=new KafkaConsumer<>(properties);
    
            //订阅主题
            consumer.subscribe(Arrays.asList(topic));
    
            try {
                //拉取消息并消费
                while(isRunning.get()){
    
                    ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
    
                    for (ConsumerRecord<String,String> record:records){
    
                       System.out.println("topic="+record.topic()+",partition="+record.partition()+",offset="+record.offset());
                       System.out.println("key="+record.key()+",value="+record.value());
                       //do something to processor record.
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    
    
        /**
         * 配置消费者客户端参数
         * */
        private static Properties initConfig() {
            Properties properties=new Properties();
    
    //        properties.put("key.deserializer","org.apache.kafka.common.serialization.Deserializer");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName());
    
    //        properties.put("value.deserializer","org.apache.kafka.common.serialization.Deserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class.getName());
    
            //指定连接Kafka集群所需的broker地址清单,中间用逗号隔开,默认值为""
    //        properties.put("bootstrap.server",brokerList);
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
    
            //消费者所属的消费组的名称,默认值为""
    //        properties.put("group.id",groupId);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
    
    //        properties.put("client.id","consumer.client.id.demo");
            //指定消费者客户端Id,如果不设置,则自动生成consumer-1,consumer-2
            properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"consumer.client.id.demo");
    
            return properties;
        }
    }
  • 相关阅读:
    比较器 Comparable 与compartor 的区别及理解
    事务特性、事务隔离级别、spring事务传播特性
    分布式文件上传-FastDFS
    spring-cloud 组件总结以及搭建图示 (六)
    springCloud zuul网关(五)
    hashCode与equals 通过面试题一窥究竟
    【原】那年30岁
    【原】Hyper-V虚拟机设置外部网络访问
    【原】win10 .net framework 3.5安装
    【原】做梦有感
  • 原文地址:https://www.cnblogs.com/xingyunblog/p/10536984.html
Copyright © 2011-2022 走看看