zoukankan      html  css  js  c++  java
  • Kafka入门之生产者消费者测试

    目录:

    kafka启动脚本以及关闭脚本

    1. 同一个生产者同一个Topic,两个相同的消费者相同的Group

    2. 同一个生产者同一个Topic,两个消费者不同Group

     3. 两个生产者同一个Topic,生产不同的消息,一个消费者

    运行的前提是有kafka,并启动kafka,这里我写了个kafka启动脚本:

    #!/bin/sh
    #创建启动脚本
    #启动zookeeper
    /user/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties &
    sleep 3 #等3秒后执行
    
    #启动kafka
    /user/kafka_2.11-2.0.0/bin/kafka-server-start.sh /user/kafka_2.11-2.0.0/config/server.properties &

     kafka关闭脚本:

    #!/bin/sh
    #创建关闭脚本
    #关闭kafka
    /user/kafka_2.11-2.0.0/bin/kafka-server-stop.sh /user/kafka_2.11-2.0.0/config/server.properties &
    sleep 3 #等3秒后执行
    
    #关闭zookeeper
    /user/kafka_2.11-2.0.0/bin/zookeeper-server-stop.sh /user/kafka_2.11-2.0.0/config/zookeeper.properties &

    1. 同一个生产者同一个Topic,两个相同的消费者相同的Group

    新建一个生产者TestKafkaProducer,需要引入kafka的lib中的jar包,主要包括两个类,如下所示:

    kafka生产者:

    package com.zc.kafka.producer.main;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    /**
     * Kafka生产者
     * 先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下:
     */
    public class SimpleKafkaProducer {
    
        private static long i = 0;
        
        public void send(String str) {
            // TODO Auto-generated method stub
    
            Properties props = new Properties();
    
            //broker地址
            props.put("bootstrap.servers", "localhost:9092");
    
            //请求时候需要验证
            props.put("acks", "all");
    
            //请求失败时候需要重试
            props.put("retries", 0);
    
            //内存缓存区大小
            props.put("buffer.memory", 33554432);
    
            //指定消息key序列化方式
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            //指定消息本身的序列化方式
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            //for (int i = 0; i < 10; i++) {  //i < 10
                // 生产一条消息的时间有点长
                //producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
                //System.out.println(i);
            //}
            // 这里的“test“是topic
            producer.send(new ProducerRecord<>("test", String.valueOf(i), str));
            i++;
            System.out.println("Message sent successfully");
            producer.close();
        }
    
    }

    生产数据:

    package com.zc.kafka.producer.test;
    
    import com.zc.kafka.producer.main.SimpleKafkaProducer;
    
    public class TestSimpleKafkaProducer {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            long i=0;
            SimpleKafkaProducer skp = new SimpleKafkaProducer();
            while(true) {
                skp.send("Hello: "+ String.valueOf(i));
                i++;
                try {
                    Thread.sleep(10000);  //ms
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    
    }

    新建两个消费者,引入kafka中lib中的jar包,分别是TestKafkaConsumer和TestKafkaConsumer2,他们有一个相同的类,如下所示:

    消息消费者:

    package com.zc.kafka.consumer.main;
    
    import java.util.Collections;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    /**
     * kafka消费者
     */
    public class SimpleKafkaConsumer {
    
        @SuppressWarnings({ "deprecation", "resource" })
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092");
            //每个消费者分配独立的组号,这里的“test”是group
            props.put("group.id", "test");
    
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
    
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
    
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
            
            //
            //props.put("auto.offset.reset", "earliest");
    
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Collections.singletonList("test"));  //核心函数1:订阅topic
    
            System.out.println("Subscribed to topic " + "test");
            //int i = 0;
    
            while (true) {
                //System.out.println(i++);
                //核心函数2:long poll,一次拉取回来多个消息
                /* 读取数据,读取超时时间为100ms */
                ConsumerRecords<String, String> records = consumer.poll(100);  
                //System.out.println(records.count());
                for (ConsumerRecord<String, String> record : records)
                    // print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s
    ",
                            record.offset(), record.key(), record.value());
            }
        }
    
    }

    启动生产者,并启动两个消费者。(我的生产者和两个消费者都在同一主机上)

    结果是:

    第一个启动的消费者消费消息,第二个消费者没有消费消息;我关闭掉第一个消费者,第二个消费者就会消费消息; (因为Group相同)

    同时只会有一个消费者在消费消息,并且消费消息没有重叠。

    消费者1:

    Subscribed to topic test
    offset = 4451, key = 25, value = Hello: 25
    offset = 4452, key = 26, value = Hello: 26
    offset = 4453, key = 27, value = Hello: 27
    offset = 4454, key = 28, value = Hello: 28
    offset = 4455, key = 29, value = Hello: 29
    offset = 4456, key = 30, value = Hello: 30
    offset = 4457, key = 31, value = Hello: 31
    offset = 4458, key = 32, value = Hello: 32

    消费者2:

    Subscribed to topic test
    offset = 4459, key = 33, value = Hello: 33
    offset = 4460, key = 34, value = Hello: 34
    offset = 4461, key = 35, value = Hello: 35
    offset = 4462, key = 36, value = Hello: 36

    2. 同一个生产者同一个Topic,两个消费者不同Group

    这里只是修改了TestKafkaConsumer2的源码,修改了组,具体如下所示:

    package com.zc.kafka.consumer.main;
    
    import java.util.Collections;
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    /**
     * kafka消费者
     */
    public class SimpleKafkaConsumer {
    
        @SuppressWarnings({ "deprecation", "resource" })
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092");
            //每个消费者分配独立的组号
            props.put("group.id", "Consumer2");  //修改了组
    
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
    
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
    
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
            
            //
            //props.put("auto.offset.reset", "earliest");
    
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Collections.singletonList("test"));  //核心函数1:订阅topic
    
            System.out.println("Subscribed to topic " + "test");
            //int i = 0;
    
            while (true) {
                //System.out.println(i++);
                //核心函数2:long poll,一次拉取回来多个消息
                /* 读取数据,读取超时时间为100ms */
                ConsumerRecords<String, String> records = consumer.poll(100);  
                //System.out.println(records.count());
                for (ConsumerRecord<String, String> record : records)
                    // print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s
    ",
                            record.offset(), record.key(), record.value());
            }
        }
    
    }

    启动生产者,并启动两个消费者。(我的生产者和两个消费者都在同一主机上)

    结果是:

    第一个启动的消费者消费消息,第二个消费者也再消费消息;(因为Group不相同)

    同时两个消费者都在消费消息,并且消费消息重叠。

    消费者1:

    Subscribed to topic test
    offset = 4463, key = 0, value = Hello: 0
    offset = 4464, key = 1, value = Hello: 1
    offset = 4465, key = 2, value = Hello: 2
    offset = 4466, key = 3, value = Hello: 3
    offset = 4467, key = 4, value = Hello: 4
    offset = 4468, key = 5, value = Hello: 5
    offset = 4469, key = 6, value = Hello: 6
    offset = 4470, key = 7, value = Hello: 7
    offset = 4471, key = 8, value = Hello: 8
    offset = 4472, key = 9, value = Hello: 9

    消费者2:

    Subscribed to topic test
    offset = 4466, key = 3, value = Hello: 3
    offset = 4467, key = 4, value = Hello: 4
    offset = 4468, key = 5, value = Hello: 5
    offset = 4469, key = 6, value = Hello: 6
    offset = 4470, key = 7, value = Hello: 7
    offset = 4471, key = 8, value = Hello: 8
    offset = 4472, key = 9, value = Hello: 9

    3. 两个生产者同一个Topic,生产不同的消息,一个消费者

    生产的数据和第一个生产者不同:

    package com.zc.kafka.producer.test;
    
    import com.zc.kafka.producer.main.SimpleKafkaProducer;
    
    public class TestSimpleKafkaProducer2 {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            long i=0;
            SimpleKafkaProducer skp = new SimpleKafkaProducer();
            while(true) {
                skp.send("Kafka: "+ String.valueOf(i));   //生产的数据不同
                i++;
                try {
                    Thread.sleep(10000);  //ms
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    
    }

    启动两个生产者,并启动消费者。(我的生产者和消费者都在同一主机上)

    结果是:

    消费者同时收到了两个生产者的消息; (因为Topic相同)

    消费者:

    Subscribed to topic test
    offset = 4473, key = 0, value = Hello: 0
    offset = 4474, key = 0, value = Kafka: 0
    offset = 4475, key = 1, value = Hello: 1
    offset = 4476, key = 1, value = Kafka: 1
    offset = 4477, key = 2, value = Hello: 2
    offset = 4478, key = 2, value = Kafka: 2
    offset = 4479, key = 3, value = Hello: 3
    offset = 4480, key = 3, value = Kafka: 3
    offset = 4481, key = 4, value = Hello: 4
    offset = 4482, key = 4, value = Kafka: 4
  • 相关阅读:
    Linux入门
    Linux和VMware
    vue中select的使用以及select设置默认选中
    Django ModelFrom组件
    Django登录(含随机生成图片验证码)注册实例
    Django组件---Django请求生命周期和中间件
    Python3之使用Crypto
    ORM大结局
    ORM多表查询下
    Pycharn破解补丁激活
  • 原文地址:https://www.cnblogs.com/zhangchao0515/p/9519151.html
Copyright © 2011-2022 走看看