zoukankan      html  css  js  c++  java
  • kafka java项目测试使用

    引入依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>
    </dependency>

    生产者

    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Properties;
    
    public class Producer {
    
        KafkaProducer<String, String> KafkaProducer;
    
        public Producer() {
            Properties map = new Properties();
            map.put("bootstrap.servers", "192.168.91.128:9092");
            map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer = new KafkaProducer<>(map);
        }
    
        int i = 0;
        String msg = "bb hh ";
    
        public void produce() {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", msg + i);
            i++;
            try {
                KafkaProducer.send(record, (recordMetadata, e) -> System.out.println("send success"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            Producer producer = new Producer();
            while (true) {
                producer.produce();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    消费者

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        KafkaConsumer<String, String> kafkaConsumer;
    
        public Consumer() {
            Properties map = new Properties();
            //map.put("bootstrap.servers", "59.111.60.130:9092,59.111.60.126:9092,59.111.60.127:9092");
            map.put("bootstrap.servers", "192.168.91.128:9092");
            map.put("group.id", "local-test-1");
            map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            kafkaConsumer = new KafkaConsumer<>(map);
            kafkaConsumer.subscribe(Collections.singleton("eagle"));
        }
    
        public void consumer() {
            System.out.println("wait for consume...");
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("key = " + record.key());
                        System.out.println("value = " + record.value());
                        System.out.println("partition = " + record.partition());
                        System.out.println("topic = " + record.topic());
                        System.out.println("offset = " + record.offset());
                        System.out.println("timestamp = " + record.timestamp());
                        System.out.println();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            Consumer consumer = new Consumer();
            consumer.consumer();
        }
    }

    如果连接kafka超时:

    修改kafka的config/server.properties文件中的如下内容:

    advertised.listeners=PLAINTEXT://IP地址:9092
    

    使用ifconfig或者ip a指令获取安装机器的ip地址,加入获取到的测试机器的IP地址为192.168.91.128,就将上述位置的配置参数修改为如下的内容:

    advertised.listeners=PLAINTEXT://192.168.91.128:9092
    

    修改完成后保存退出,并重新启动zk和kafka。



    参考:https://www.jianshu.com/p/94349568533c

  • 相关阅读:
    c++深拷贝与浅拷贝
    c++构造函数的explicit
    c++虚函数和虚函数表
    c++重载、重写、隐藏(重定义)
    c++传值、传指针、传引用
    ASP.Net Core API 学习の中间件
    WPF中String Format的用法
    ASP.Net Core API 全局处理异常
    989. Add to Array-Form of Integer
    1014. Best Sightseeing Pair
  • 原文地址:https://www.cnblogs.com/51python/p/10870337.html
Copyright © 2011-2022 走看看