zoukankan      html  css  js  c++  java
  • Apache Kafka(四)- 使用 Java 访问 Kafka

    1. Produer

    1.1. 基本 Producer

    首先使用 maven 构建相关依赖,这里我们服务器kafka 版本为 2.12-2.3.0,pom.xml 文件为:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>com.github.tang</groupId>
     8     <artifactId>kafka-beginner</artifactId>
     9     <version>1.0</version>
    10 
    11     <dependencies>
    12         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    13         <dependency>
    14             <groupId>org.apache.kafka</groupId>
    15             <artifactId>kafka-clients</artifactId>
    16             <version>2.3.0</version>
    17         </dependency>
    18 
    19         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    20         <dependency>
    21             <groupId>org.slf4j</groupId>
    22             <artifactId>slf4j-simple</artifactId>
    23             <version>1.7.26</version>
    24         </dependency>
    25 
    26     </dependencies>
    27 
    28 </project>

    然后创建一个 Producer:

     1 package com.github.tang.kafka.tutorial1;
     2 
     3 import org.apache.kafka.clients.producer.KafkaProducer;
     4 import org.apache.kafka.clients.producer.ProducerConfig;
     5 import org.apache.kafka.clients.producer.ProducerRecord;
     6 import org.apache.kafka.common.serialization.StringSerializer;
     7 
     8 import java.util.Properties;
     9 
    10 public class ProducerDemo {
    11 
    12     private static String bootstrapServers = "server_xxx:9092";
    13 
    14     public static void main(String[] args) {
    15 
    16         /**
    17          * create Producer properties
    18          *
    19          * Properties are available in official document:
    20          * https://kafka.apache.org/documentation/#producerconfigs
    21          *
    22          */
    23         Properties properties = new Properties();
    24         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    25         properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    26         properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    27 
    28         // create the producer
    29         KafkaProducer<String, String> produer = new KafkaProducer<String, String>(properties);
    30 
    31 
    32         // create a producer record
    33         ProducerRecord<String, String> record =
    34                 new ProducerRecord<String, String>("first_topic", "message from java");
    35 
    36         // send data - asynchronous
    37         /**
    38          *  asynchronous means the data would not send immediately
    39          *  however, the program would terminate immediately after run the send() method
    40          *  hence the data would not send to kafka topic
    41          *  and the consumer would not receive the data
    42          *
    43          *  so we need flush()
    44          */
    45         produer.send(record);
    46 
    47         /**
    48          *  use flush() to wait sending complete
    49          */
    50         produer.flush();
    51         produer.close();
    52 
    53     }
    54 }

    运行此程序可以在consumer-console-cli 下看到发送的消息。

    1.2. 带Callback() 的Producer

    Callback() 函数会在每次发送record 后执行,例如:

    首先实例化一个 logger() 对象:

    1 // create a logger
    2 final Logger logger = LoggerFactory.getLogger(ProducerDemoCallback.class);

    使用 Callback():

     1 /**
     2  * send data with Callback()
     3  */
     4 for(int i = 0; i < 10; i++) {
     5     // create a producer record
     6     ProducerRecord<String, String> record =
     7             new ProducerRecord<String, String>("first_topic", "message from java" + Integer.toString(i));
     8 
     9     produer.send(record, new Callback() {
    10         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    11             // execute every time a record is successfully sent or an exception is thrown
    12             if (e == null) {
    13                 // the record is sent successfully
    14                 logger.info("Received new metadata. 
    " +
    15                         "Topic: " + recordMetadata.topic() + "
    " +
    16                         "Partition: " + recordMetadata.partition() + "
    " +
    17                         "Offset: " + recordMetadata.offset() + "
    " +
    18                         "Timestamp: " + recordMetadata.timestamp());
    19             } else {
    20                 logger.error("Error while producing", e);
    21             }
    22         }
    23     });
    24 }

    部分输出结果如下:

    [kafka-producer-network-thread | producer-1] INFO com.github.tang.kafka.tutorial1.ProducerDemoCallback - Received new metadata.

    Topic: first_topic

    Partition: 2

    Offset: 21

    Timestamp: 1565501879059

    [kafka-producer-network-thread | producer-1] INFO com.github.tang.kafka.tutorial1.ProducerDemoCallback - Received new metadata.

    Topic: first_topic

    Partition: 2

    Offset: 22

    Timestamp: 1565501879075

    1.3. 发送带key的records

    上面的例子均是未带key,所以消息是按轮询的方式发送到partition。下面是带key的producer例子,重载send() 方法即可:

    1 String key = "id_" + Integer.toString(i);
    2 
    3 ProducerRecord<String, String> record =
    4         new ProducerRecord<String, String>(topic, key,"message from java" + Integer.toString(i));

    2. Consumer

    2.1. 基本Consumer

    下面是一个基本的consumer 例子:

     1 package com.github.tang.kafka.tutorial1;
     2 
     3 import org.apache.kafka.clients.consumer.ConsumerConfig;
     4 import org.apache.kafka.clients.consumer.ConsumerRecord;
     5 import org.apache.kafka.clients.consumer.ConsumerRecords;
     6 import org.apache.kafka.clients.consumer.KafkaConsumer;
     7 import org.apache.kafka.common.serialization.StringDeserializer;
     8 import org.slf4j.Logger;
     9 import org.slf4j.LoggerFactory;
    10 import java.time.Duration;
    11 import java.util.Arrays;
    12 import java.util.Properties;
    13 
    14 public class ConsumerDemo {
    15     private static String bootstrapServers = "server:9092";
    16     private static String groupId = "my-forth-app";
    17     private static String topic = "first_topic";
    18 
    19 
    20     public static void main(String[] args) {
    21         Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    22 
    23         /**
    24          * create Consumer properties
    25          *
    26          * Properties are available in official document:
    27          * https://kafka.apache.org/documentation/#consumerconfigs
    28          *
    29          */
    30         Properties properties = new Properties();
    31         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    32         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    33         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    34         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    35         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    36 
    37         // create consumer
    38         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    39 
    40         // subscribe consumer to our topic(s)
    41         consumer.subscribe(Arrays.asList(topic));
    42 
    43         // poll for new data
    44         while(true){
    45             ConsumerRecords<String, String> records =
    46                     consumer.poll(Duration.ofMinutes(100));
    47 
    48             for(ConsumerRecord record : records){
    49                 logger.info("Key: " + record.key() + "	" + "Value: " + record.value() +
    50                         "Topic: " + record.partition() + "	" + "Partition: " + record.partition()
    51                );
    52 
    53             }
    54         }
    55 
    56     }
    57 }

    部分输出结果如下:

     

    从输出结果可以看到,consumer 在读取时,(在指定offset为earliest的情况下)是先读完一个partition后,再读下一个partition。

    2.2. Consumer balancing

    之前提到过,在一个consumer group中的consumers可以自动做负载均衡。下面我们启动一个consumer后,再启动一个consumer。

    下面是第一个consumer的日志:

     

    在第二个consumer加入后,第一个consumer 重新分配 partition,从之前负责三个partition(0,1,2)到现在负责一个partition(2)。

    对于第二个consumer的日志:

     

    可以看到第二个consumer在加入后,开始负责2个partition(0与1)的读

    2.3 Consumer 多线程方式:

      1 package com.github.tang.kafka.tutorial1;
      2 
      3 import org.apache.kafka.clients.consumer.ConsumerConfig;
      4 import org.apache.kafka.clients.consumer.ConsumerRecord;
      5 import org.apache.kafka.clients.consumer.ConsumerRecords;
      6 import org.apache.kafka.clients.consumer.KafkaConsumer;
      7 import org.apache.kafka.common.errors.WakeupException;
      8 import org.apache.kafka.common.serialization.StringDeserializer;
      9 import org.slf4j.Logger;
     10 import org.slf4j.LoggerFactory;
     11 
     12 import java.time.Duration;
     13 import java.util.Arrays;
     14 import java.util.Properties;
     15 import java.util.concurrent.CountDownLatch;
     16 
     17 public class ConsumerDemoWithThreads {
     18 
     19     private static Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThreads.class);
     20 
     21     public static void main(String[] args) {
     22         String bootstrapServers = "server:9092";
     23         String groupId = "my-fifth-app";
     24         String topic = "first_topic";
     25 
     26         // latch for dealing with multiple threads
     27         CountDownLatch latch = new CountDownLatch(1);
     28 
     29         ConsumerRunnable consumerRunnable = new ConsumerRunnable(latch,
     30                 bootstrapServers,
     31                 groupId,
     32                 topic);
     33 
     34         Thread myConsumerThread = new Thread(consumerRunnable);
     35         myConsumerThread.start();
     36 
     37         // add a shutdown hook
     38         Runtime.getRuntime().addShutdownHook(new Thread(() ->{
     39             logger.info("Caught shutdown hook");
     40             consumerRunnable.shutdown();
     41 
     42             try{
     43                 latch.await();
     44             } catch (InterruptedException e){
     45                 e.printStackTrace();
     46             }
     47             logger.info("Application has exited");
     48 
     49         }));
     50 
     51         try{
     52             latch.await();
     53         } catch (InterruptedException e){
     54             logger.error("Application got interrupted", e);
     55         } finally {
     56             logger.info("Application is closing");
     57         }
     58 
     59     }
     60 
     61     private static class ConsumerRunnable implements Runnable{
     62 
     63         private CountDownLatch latch;
     64         KafkaConsumer<String, String> consumer;
     65         private String bootstrapServers;
     66         private String topic;
     67         private String groupId;
     68 
     69         public ConsumerRunnable(CountDownLatch latch,
     70                               String bootstrapServers,
     71                               String groupId,
     72                               String topic){
     73             this.latch = latch;
     74             this.bootstrapServers = bootstrapServers;
     75             this.topic = topic;
     76             this.groupId = groupId;
     77         }
     78 
     79         @Override
     80         public void run() {
     81 
     82             Properties properties = new Properties();
     83             properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     84             properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     85             properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     86             properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     87             properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     88 
     89             consumer = new KafkaConsumer<String, String>(properties);
     90             consumer.subscribe(Arrays.asList(topic));
     91 
     92             // poll for new data
     93             try {
     94                 while (true) {
     95                     ConsumerRecords<String, String> records =
     96                             consumer.poll(Duration.ofMinutes(100));
     97 
     98                     for (ConsumerRecord record : records) {
     99                         logger.info("Key: " + record.key() + "	" + "Value: " + record.value());
    100                         logger.info("Partition: " + record.partition() + "	" + "Offset: " + record.offset()
    101                         );
    102 
    103                     }
    104                 }
    105             } catch (WakeupException e){
    106                 logger.info("Received shutdown signal!");
    107             } finally {
    108                 consumer.close();
    109 
    110                 // tell our main code we're done with the consumer
    111                 latch.countDown();
    112             }
    113         }
    114 
    115         public void shutdown(){
    116             // the wakeup() method is a special method to interrupt consumer.poll()
    117             // it will throw the exceptioin WakeUpException
    118             consumer.wakeup();
    119         }
    120     }
    121 }

    2.4. Consumer使用 Assign and Seek

    Consumer 中可以使用Assign 分配一个topic的partition,然后用seek方法从给定offset读取records。一般此方式用于replay数据或是获取一条特定的record。

    在实现时,基于上一个例子,修改run()方法部分代码如下:

     1 // assign and seek are most used to replay data or fetch a specific message
     2 
     3 // assign
     4 TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
     5 long offsetToReadFrom = 15L;
     6 consumer.assign(Arrays.asList(partitionToReadFrom));
     7 
     8 // seek
     9 consumer.seek(partitionToReadFrom, offsetToReadFrom);
    10 
    11 int numberOfMessagesToRead = 5;
    12 boolean keepOnReading = true;
    13 int numberOfMessagesReadSoFar = 0;
    14 
    15 // poll for new data
    16 try {
    17     while (keepOnReading) {
    18         ConsumerRecords<String, String> records =
    19                 consumer.poll(Duration.ofMinutes(100));
    20 
    21         for (ConsumerRecord record : records) {
    22             numberOfMessagesReadSoFar += 1;
    23 
    24             logger.info("Key: " + record.key() + "	" + "Value: " + record.value());
    25             logger.info("Partition: " + record.partition() + "	" + "Offset: " + record.offset()
    26             );
    27 
    28             if (numberOfMessagesReadSoFar >= numberOfMessagesToRead){
    29                 keepOnReading = false;
    30                 break;
    31             }
    32         }
    33     }
    34 } catch (WakeupException e){
    35     logger.info("Received shutdown signal!");
    36 } finally {
    37     consumer.close();
    38 
    39     // tell our main code we're done with the consumer
    40     latch.countDown();
    41 }

    需要注意的是,使用此方法时,不需要指定consumer group。

    3. 客户端双向兼容

    在Kafka 0.10.2 版本之后,Kafka 客户端与Kafka brokers可以实现双向兼容(通过将API版本化实现,也就是说:不同的版本客户端发送的API版本不一样,且服务端可以处理不同版本API的请求)。

    也就是说:

    • 一个老版本的客户端(1.1之前版本)可以与更新版本的broker(2.0版本)正常交互
    • 一个新版本的客户端(2.0之前版本)可以与一个老版本的broker(1.1版本)正常交互

    对此的建议是:在任何时候都是用最新的客户端lib版本。

  • 相关阅读:
    file is universal (3 slices) but does not contain a(n) armv7s slice error for static libraries on iOS
    WebImageButton does not change images after being enabled in Javascript
    ajax OPTION
    编程遍历页面上所有TextBox控件并给它赋值为string.Empty?
    获取海洋天气预报
    C#线程系列教程(1):BeginInvoke和EndInvoke方法
    js控制只能输入数字和小数点
    Response.AddHeader(,)
    ManualResetEvent的理解
    Convert.ToInt32、int.Parse(Int32.Parse)、int.TryParse、(int) 区别
  • 原文地址:https://www.cnblogs.com/zackstang/p/11335971.html
Copyright © 2011-2022 走看看