zoukankan      html  css  js  c++  java
  • java连接kafka总结

    1、连接demo是采用的这篇博客中的内容:

    https://blog.csdn.net/weixin_39098944/article/details/108067005

    主要代码如下:

    (1)添加依赖

     1 <dependency>
     2                 <groupId>org.apache.kafka</groupId>
     3                 <artifactId>kafka_2.12</artifactId>
     4                 <version>1.0.0</version>
     5             <scope>provided</scope> 
     6         </dependency>
     7         
     8         <dependency>
     9                 <groupId>org.apache.kafka</groupId>
    10                 <artifactId>kafka-clients</artifactId>
    11                 <version>1.0.0</version>
    12         </dependency>
    13         
    14         <dependency>
    15             <groupId>org.apache.kafka</groupId>
    16             <artifactId>kafka-streams</artifactId>
    17             <version>1.0.0</version>
    18         </dependency>

    (2)生产者

     1 import java.util.Properties;
     2 import org.apache.kafka.clients.producer.KafkaProducer;
     3 import org.apache.kafka.clients.producer.ProducerRecord;
     4 import org.apache.kafka.common.serialization.StringSerializer;
     5 
     6 /**
     7  1. 
     8  2. Title: KafkaProducerTest
     9  3. Description: 
    10  4. kafka 生产者demo
    11  5. Version:1.0.0  
    12  6. @author dengcs
    13  */
    14 public class KafkaProducerTest implements Runnable {
    15 
    16     private final KafkaProducer<String, String> producer;
    17     private final String topic;
    18     public KafkaProducerTest(String topicName) {
    19         Properties props = new Properties();
    20         props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
    21         props.put("acks", "all");
    22         props.put("retries", 0);
    23         props.put("batch.size", 16384);
    24         props.put("key.serializer", StringSerializer.class.getName());
    25         props.put("value.serializer", StringSerializer.class.getName());
    26         this.producer = new KafkaProducer<String, String>(props);
    27         this.topic = topicName;
    28     }
    29 
    30     @Override
    31     public void run() {
    32         int messageNo = 1;
    33         try {
    34             for(;;) {
    35                 String messageStr="你好,这是第"+messageNo+"条数据";
    36                 producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
    37                 //生产了100条就打印
    38                 if(messageNo%100==0){
    39                     System.out.println("发送的信息:" + messageStr);
    40                 }
    41                 //生产1000条就退出
    42                 if(messageNo%1000==0){
    43                     System.out.println("成功发送了"+messageNo+"条");
    44                     break;
    45                 }
    46                 messageNo++;
    47             }
    48         } catch (Exception e) {
    49             e.printStackTrace();
    50         } finally {
    51             producer.close();
    52         }
    53     }
    54     
    55     public static void main(String args[]) {
    56         KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
    57         Thread thread = new Thread(test);
    58         thread.start();
    59     }
    60 }

    (3)消费者

    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    
    /**
     * 
    * Title: KafkaConsumerTest
    * Description: 
    *  kafka消费者 demo
    * Version:1.0.0  
    * @author dengcs
     */
    public class KafkaConsumerTest implements Runnable {
    
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUPID = "groupA";
    
        public KafkaConsumerTest(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
        @Override
        public void run() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                for (;;) {
                        msgList = consumer.poll(1000);
                        if(null!=msgList&&msgList.count()>0){
                        for (ConsumerRecord<String, String> record : msgList) {
                            //消费100条就打印 ,但打印的数据不一定是这个规律的
                            if(messageNo%100==0){
                                System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                            }
                            //当消费了1000条就退出
                            if(messageNo%1000==0){
                                break;
                            }
                            messageNo++;
                        }
                    }else{    
                        Thread.sleep(1000);
                    }
                }        
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }  
        public static void main(String args[]) {
            KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
            Thread thread1 = new Thread(test1);
            thread1.start();
        }
    }

    2、遇到的一个问题:org.apache.kafka.common.errors.TimeoutException

    解决方法:https://blog.csdn.net/maoyuanming0806/article/details/80553632/

    问题原因:上述代码中的 master:9092,slave1:9092,slave2:9092,是别人的例子,我自己的代码里是IP1:9092,IP2:9092,IP3:9092.

    连接kafka的时候,报错TimeoutException,之后将log日志级别改为debug,发现java.io.IOException: Can't resolve address: izwz9c79fdwp9sb65vpyk3z:9092,也就是说我的客户端在连接izwz9c79fdwp9sb65vpyk3z:9092,如不是我的ip:port.

    解决方法:在 windows则去添加一条host映射

    C:WindowsSystem32driversetchosts

    39.108.61.252 izwz9c79fdwp9sb65vpyk3z
    127.0.0.1 localhost

    问题解决。




  • 相关阅读:
    【电子书】企业级IT运维宝典之GoldenGate实战下载
    10.Oracle Golden Date(ogg)的搭建和管理(转载)
    VMware Workstation 15 Pro 永久激活密钥
    oracle undo表空间增大不释放
    Oracle11g-BBED安装
    alter system/session set events相关知识
    DG环境的日常巡检
    nginx ----http强制跳转https
    转载:Zabbix-(五)监控Docker容器与自定义jvm监控项
    ORACLE备份保留策略(RETENTION POLICY)
  • 原文地址:https://www.cnblogs.com/GSONG/p/14719035.html
Copyright © 2011-2022 走看看