zoukankan      html  css  js  c++  java
  • kafka producer consumer demo(三)

      我们在前面把集群搭建起来了,也设置了kafka broker的配置,下面我们用代码来实现一下客户端向kafka发送消息,consumer端从kafka消费数据。大家先不要着急着了解

    各种参数的配置,先感受一下整个流程,我会跟大家逐一深入的学习和探讨。

    1.引入的maven依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.1</version>
    </dependency>

    2.创建topic

    bin/kafka-topics.sh --create --topic hello_kafka --partitions 3 --replication-factor 3 --zookeeper 192.168.1.11:2181/kafka 
    #在zookeeper集群/kafka目录下创建 hello_kafka 主题,partitions 是3 副本3个

    3.producer端代码

    public class ProducerDemo {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
            
            String sendContent = "hello_kafka";
            ProducerRecord<String,String> record = new ProducerRecord<>("hello_kafka",sendContent);
            
            Future<RecordMetadata> future = producer.send(record);
            
            RecordMetadata recordMetadata = future.get();
            
            LOG.info("发送的数据是 {},offset 是{}",sendContent,recordMetadata.offset());
            
        }
        
        //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            
            return prop;
        }
    }

    // 2019-08-05 23:17:22|INFO |com.zzh.hello.ProducerDemo 39|发送的数据是 hello_kafka,offset 是1

    4.consumer端代码

    public class ConsumerDemo {
        
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerDemo.class);
        
        
        public static void main(String[] args) {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            
            //1.订阅消息
            consumer.subscribe(Collections.singletonList("hello_kafka"));
            
            //2.读取消息
            for(;;) {
                
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(items->
                {
                    LOG.info("===============> offset:{},value:{}",items.offset(),items.value());
                }
                );
            }
        }
        
        private static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092");
            prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put("group.id", "hello_2");
            prop.put("client.id", "demo-consumer-client");
            prop.put("auto.offset.reset", "earliest");        // earliest(最早) latest(最晚)
            
            return prop;
        }
    }
  • 相关阅读:
    Flask 随记
    Notes on Sublime and Cmder
    Algorithms: Design and Analysis Note
    LeetCode 215 : Kth Largest Element in an Array
    LeetCode 229 : Majority Element II
    LeetCode 169 : Majority Element
    LeetCode 2:Add Two Numbers
    LeetCode 1:Two Sum
    Process and Kernel
    安装好scala后出现“找不到或无法加载主类”的问题
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11161334.html
Copyright © 2011-2022 走看看