zoukankan      html  css  js  c++  java
  • Apache Kafka(四)- 使用 Java 访问 Kafka

    1. Produer

    1.1. 基本 Producer

    首先使用 maven 构建相关依赖,这里我们服务器kafka 版本为 2.12-2.3.0,pom.xml 文件为:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>com.github.tang</groupId>
     8     <artifactId>kafka-beginner</artifactId>
     9     <version>1.0</version>
    10 
    11     <dependencies>
    12         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    13         <dependency>
    14             <groupId>org.apache.kafka</groupId>
    15             <artifactId>kafka-clients</artifactId>
    16             <version>2.3.0</version>
    17         </dependency>
    18 
    19         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    20         <dependency>
    21             <groupId>org.slf4j</groupId>
    22             <artifactId>slf4j-simple</artifactId>
    23             <version>1.7.26</version>
    24         </dependency>
    25 
    26     </dependencies>
    27 
    28 </project>

    然后创建一个 Producer:

     1 package com.github.tang.kafka.tutorial1;
     2 
     3 import org.apache.kafka.clients.producer.KafkaProducer;
     4 import org.apache.kafka.clients.producer.ProducerConfig;
     5 import org.apache.kafka.clients.producer.ProducerRecord;
     6 import org.apache.kafka.common.serialization.StringSerializer;
     7 
     8 import java.util.Properties;
     9 
    10 public class ProducerDemo {
    11 
    12     private static String bootstrapServers = "server_xxx:9092";
    13 
    14     public static void main(String[] args) {
    15 
    16         /**
    17          * create Producer properties
    18          *
    19          * Properties are available in official document:
    20          * https://kafka.apache.org/documentation/#producerconfigs
    21          *
    22          */
    23         Properties properties = new Properties();
    24         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    25         properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    26         properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    27 
    28         // create the producer
    29         KafkaProducer<String, String> produer = new KafkaProducer<String, String>(properties);
    30 
    31 
    32         // create a producer record
    33         ProducerRecord<String, String> record =
    34                 new ProducerRecord<String, String>("first_topic", "message from java");
    35 
    36         // send data - asynchronous
    37         /**
    38          *  asynchronous means the data would not send immediately
    39          *  however, the program would terminate immediately after run the send() method
    40          *  hence the data would not send to kafka topic
    41          *  and the consumer would not receive the data
    42          *
    43          *  so we need flush()
    44          */
    45         produer.send(record);
    46 
    47         /**
    48          *  use flush() to wait sending complete
    49          */
    50         produer.flush();
    51         produer.close();
    52 
    53     }
    54 }

    运行此程序可以在consumer-console-cli 下看到发送的消息。

    1.2. 带Callback() 的Producer

    Callback() 函数会在每次发送record 后执行,例如:

    首先实例化一个 logger() 对象:

    1 // create a logger
    2 final Logger logger = LoggerFactory.getLogger(ProducerDemoCallback.class);

    使用 Callback():

     1 /**
     2  * send data with Callback()
     3  */
     4 for(int i = 0; i < 10; i++) {
     5     // create a producer record
     6     ProducerRecord<String, String> record =
     7             new ProducerRecord<String, String>("first_topic", "message from java" + Integer.toString(i));
     8 
     9     produer.send(record, new Callback() {
    10         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    11             // execute every time a record is successfully sent or an exception is thrown
    12             if (e == null) {
    13                 // the record is sent successfully
    14                 logger.info("Received new metadata. 
    " +
    15                         "Topic: " + recordMetadata.topic() + "
    " +
    16                         "Partition: " + recordMetadata.partition() + "
    " +
    17                         "Offset: " + recordMetadata.offset() + "
    " +
    18                         "Timestamp: " + recordMetadata.timestamp());
    19             } else {
    20                 logger.error("Error while producing", e);
    21             }
    22         }
    23     });
    24 }

    部分输出结果如下:

    [kafka-producer-network-thread | producer-1] INFO com.github.tang.kafka.tutorial1.ProducerDemoCallback - Received new metadata.

    Topic: first_topic

    Partition: 2

    Offset: 21

    Timestamp: 1565501879059

    [kafka-producer-network-thread | producer-1] INFO com.github.tang.kafka.tutorial1.ProducerDemoCallback - Received new metadata.

    Topic: first_topic

    Partition: 2

    Offset: 22

    Timestamp: 1565501879075

    1.3. 发送带key的records

    上面的例子均是未带key,所以消息是按轮询的方式发送到partition。下面是带key的producer例子,重载send() 方法即可:

    1 String key = "id_" + Integer.toString(i);
    2 
    3 ProducerRecord<String, String> record =
    4         new ProducerRecord<String, String>(topic, key,"message from java" + Integer.toString(i));

    2. Consumer

    2.1. 基本Consumer

    下面是一个基本的consumer 例子:

     1 package com.github.tang.kafka.tutorial1;
     2 
     3 import org.apache.kafka.clients.consumer.ConsumerConfig;
     4 import org.apache.kafka.clients.consumer.ConsumerRecord;
     5 import org.apache.kafka.clients.consumer.ConsumerRecords;
     6 import org.apache.kafka.clients.consumer.KafkaConsumer;
     7 import org.apache.kafka.common.serialization.StringDeserializer;
     8 import org.slf4j.Logger;
     9 import org.slf4j.LoggerFactory;
    10 import java.time.Duration;
    11 import java.util.Arrays;
    12 import java.util.Properties;
    13 
    14 public class ConsumerDemo {
    15     private static String bootstrapServers = "server:9092";
    16     private static String groupId = "my-forth-app";
    17     private static String topic = "first_topic";
    18 
    19 
    20     public static void main(String[] args) {
    21         Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    22 
    23         /**
    24          * create Consumer properties
    25          *
    26          * Properties are available in official document:
    27          * https://kafka.apache.org/documentation/#consumerconfigs
    28          *
    29          */
    30         Properties properties = new Properties();
    31         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    32         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    33         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    34         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    35         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    36 
    37         // create consumer
    38         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    39 
    40         // subscribe consumer to our topic(s)
    41         consumer.subscribe(Arrays.asList(topic));
    42 
    43         // poll for new data
    44         while(true){
    45             ConsumerRecords<String, String> records =
    46                     consumer.poll(Duration.ofMinutes(100));
    47 
    48             for(ConsumerRecord record : records){
    49                 logger.info("Key: " + record.key() + "	" + "Value: " + record.value() +
    50                         "Topic: " + record.partition() + "	" + "Partition: " + record.partition()
    51                );
    52 
    53             }
    54         }
    55 
    56     }
    57 }

    部分输出结果如下:

     

    从输出结果可以看到,consumer 在读取时,(在指定offset为earliest的情况下)是先读完一个partition后,再读下一个partition。

    2.2. Consumer balancing

    之前提到过,在一个consumer group中的consumers可以自动做负载均衡。下面我们启动一个consumer后,再启动一个consumer。

    下面是第一个consumer的日志:

     

    在第二个consumer加入后,第一个consumer 重新分配 partition,从之前负责三个partition(0,1,2)到现在负责一个partition(2)。

    对于第二个consumer的日志:

     

    可以看到第二个consumer在加入后,开始负责2个partition(0与1)的读

    2.3 Consumer 多线程方式:

      1 package com.github.tang.kafka.tutorial1;
      2 
      3 import org.apache.kafka.clients.consumer.ConsumerConfig;
      4 import org.apache.kafka.clients.consumer.ConsumerRecord;
      5 import org.apache.kafka.clients.consumer.ConsumerRecords;
      6 import org.apache.kafka.clients.consumer.KafkaConsumer;
      7 import org.apache.kafka.common.errors.WakeupException;
      8 import org.apache.kafka.common.serialization.StringDeserializer;
      9 import org.slf4j.Logger;
     10 import org.slf4j.LoggerFactory;
     11 
     12 import java.time.Duration;
     13 import java.util.Arrays;
     14 import java.util.Properties;
     15 import java.util.concurrent.CountDownLatch;
     16 
     17 public class ConsumerDemoWithThreads {
     18 
     19     private static Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThreads.class);
     20 
     21     public static void main(String[] args) {
     22         String bootstrapServers = "server:9092";
     23         String groupId = "my-fifth-app";
     24         String topic = "first_topic";
     25 
     26         // latch for dealing with multiple threads
     27         CountDownLatch latch = new CountDownLatch(1);
     28 
     29         ConsumerRunnable consumerRunnable = new ConsumerRunnable(latch,
     30                 bootstrapServers,
     31                 groupId,
     32                 topic);
     33 
     34         Thread myConsumerThread = new Thread(consumerRunnable);
     35         myConsumerThread.start();
     36 
     37         // add a shutdown hook
     38         Runtime.getRuntime().addShutdownHook(new Thread(() ->{
     39             logger.info("Caught shutdown hook");
     40             consumerRunnable.shutdown();
     41 
     42             try{
     43                 latch.await();
     44             } catch (InterruptedException e){
     45                 e.printStackTrace();
     46             }
     47             logger.info("Application has exited");
     48 
     49         }));
     50 
     51         try{
     52             latch.await();
     53         } catch (InterruptedException e){
     54             logger.error("Application got interrupted", e);
     55         } finally {
     56             logger.info("Application is closing");
     57         }
     58 
     59     }
     60 
     61     private static class ConsumerRunnable implements Runnable{
     62 
     63         private CountDownLatch latch;
     64         KafkaConsumer<String, String> consumer;
     65         private String bootstrapServers;
     66         private String topic;
     67         private String groupId;
     68 
     69         public ConsumerRunnable(CountDownLatch latch,
     70                               String bootstrapServers,
     71                               String groupId,
     72                               String topic){
     73             this.latch = latch;
     74             this.bootstrapServers = bootstrapServers;
     75             this.topic = topic;
     76             this.groupId = groupId;
     77         }
     78 
     79         @Override
     80         public void run() {
     81 
     82             Properties properties = new Properties();
     83             properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     84             properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     85             properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     86             properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     87             properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     88 
     89             consumer = new KafkaConsumer<String, String>(properties);
     90             consumer.subscribe(Arrays.asList(topic));
     91 
     92             // poll for new data
     93             try {
     94                 while (true) {
     95                     ConsumerRecords<String, String> records =
     96                             consumer.poll(Duration.ofMinutes(100));
     97 
     98                     for (ConsumerRecord record : records) {
     99                         logger.info("Key: " + record.key() + "	" + "Value: " + record.value());
    100                         logger.info("Partition: " + record.partition() + "	" + "Offset: " + record.offset()
    101                         );
    102 
    103                     }
    104                 }
    105             } catch (WakeupException e){
    106                 logger.info("Received shutdown signal!");
    107             } finally {
    108                 consumer.close();
    109 
    110                 // tell our main code we're done with the consumer
    111                 latch.countDown();
    112             }
    113         }
    114 
    115         public void shutdown(){
    116             // the wakeup() method is a special method to interrupt consumer.poll()
    117             // it will throw the exceptioin WakeUpException
    118             consumer.wakeup();
    119         }
    120     }
    121 }

    2.4. Consumer使用 Assign and Seek

    Consumer 中可以使用Assign 分配一个topic的partition,然后用seek方法从给定offset读取records。一般此方式用于replay数据或是获取一条特定的record。

    在实现时,基于上一个例子,修改run()方法部分代码如下:

     1 // assign and seek are most used to replay data or fetch a specific message
     2 
     3 // assign
     4 TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
     5 long offsetToReadFrom = 15L;
     6 consumer.assign(Arrays.asList(partitionToReadFrom));
     7 
     8 // seek
     9 consumer.seek(partitionToReadFrom, offsetToReadFrom);
    10 
    11 int numberOfMessagesToRead = 5;
    12 boolean keepOnReading = true;
    13 int numberOfMessagesReadSoFar = 0;
    14 
    15 // poll for new data
    16 try {
    17     while (keepOnReading) {
    18         ConsumerRecords<String, String> records =
    19                 consumer.poll(Duration.ofMinutes(100));
    20 
    21         for (ConsumerRecord record : records) {
    22             numberOfMessagesReadSoFar += 1;
    23 
    24             logger.info("Key: " + record.key() + "	" + "Value: " + record.value());
    25             logger.info("Partition: " + record.partition() + "	" + "Offset: " + record.offset()
    26             );
    27 
    28             if (numberOfMessagesReadSoFar >= numberOfMessagesToRead){
    29                 keepOnReading = false;
    30                 break;
    31             }
    32         }
    33     }
    34 } catch (WakeupException e){
    35     logger.info("Received shutdown signal!");
    36 } finally {
    37     consumer.close();
    38 
    39     // tell our main code we're done with the consumer
    40     latch.countDown();
    41 }

    需要注意的是,使用此方法时,不需要指定consumer group。

    3. 客户端双向兼容

    在Kafka 0.10.2 版本之后,Kafka 客户端与Kafka brokers可以实现双向兼容(通过将API版本化实现,也就是说:不同的版本客户端发送的API版本不一样,且服务端可以处理不同版本API的请求)。

    也就是说:

    • 一个老版本的客户端(1.1之前版本)可以与更新版本的broker(2.0版本)正常交互
    • 一个新版本的客户端(2.0之前版本)可以与一个老版本的broker(1.1版本)正常交互

    对此的建议是:在任何时候都是用最新的客户端lib版本。

  • 相关阅读:
    Windows进程通信(2)使用内存映射文件
    VC2010添加头文件目录
    CString(ANSI/Unicode)与string/wstring的安全转换
    1005 ( Number Sequence )
    1004 ( Let the Balloon Rise )
    1003 ( Max Sum )
    CreateMutex
    CloseHandle
    delphi的Frame简单演示
    DLL中显示模式窗体
  • 原文地址:https://www.cnblogs.com/zackstang/p/11335971.html
Copyright © 2011-2022 走看看