zoukankan      html  css  js  c++  java
  • Apache Kafka Consumer 消费者集

    1.目标

    在我们的上一篇文章中,我们讨论了Kafka Producer今天,我们将讨论Kafka Consumer。首先,我们将看到什么是Kafka Consumer和Kafka Consumer的例子。之后,我们将学习Kafka Consumer Group。此外,我们将看到Kafka Consumer的消费者记录API和配置设置。
    创建Kafka Producer后,将消息发送到Apache Kafka集群。现在,我们正在创建一个Kafka Consumer来使用来自Kafka集群的消息。
    所以,让我们详细讨论Kafka Consumer。

    Apache Kafka Consumer

    Apache Kafka Consumer | 卡夫卡消费者集团

    2.什么是卡夫卡消费者?

    Kafka Topics读取数据的应用程序就是我们所说的Consumer。基本上,Kafka Consumer订阅了Kafka集群中的一个或多个主题,然后进一步提供来自Kafka主题的令牌或消息。  
    此外,使用Heartbeat,我们可以了解Consumer与Kafka Cluster的连接性但是,让我们定义Heartbeat。它设置在Consumer,让Zookeeper或Broker Coordinator知道Consumer是否仍然连接到Cluster。因此,如果心跳不存在,Kafka Consumer将不再连接到群集。在这种情况下,经纪协调员必须重新平衡负载。此外,Heartbeat是群集的开销。此外,通过考虑数据吞吐量和开销,我们可以配置心跳为消费者的时间间隔。

    Apache Kafka Consumer

    什么是Apache Kafka Consumer

    此外,我们可以对消费者进行分组,而Kafka中的消费者群体中的消费者可以共享他们订阅的Kafka主题的分区。要理解,如果主题中有N个分区,Kafka Consumer Group中的N个消费者和该组已订阅主题,则每个消费者将从主题的分区中读取数据。因此,我们可以说,这只是一个消费者可以成群结队的提醒。
    让我们用命令修改Apache Kafka Operations
    要具体来说,要连接到Kafka集群并使用数据流,Kafka的Consumer API会有所帮助。
    下面是显示Apache Kafka Consumer的图片:

    Apache Kafka Consumer的工作

    Apache Kafka Consumer的工作

    要订阅一个或多个主题并处理在应用程序中生成的记录流,我们使用此Kafka Consumer API。换句话说,我们使用KafkaConsumer API来使用来自Kafka集群的消息。而且,下面看KafkaConsumer类的构造函数。

    1. public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    • CONFIGS

    返回消费者配置图。
    KafkaConsumer类有以下重要方法:1。public 
    java.util.Set <TopicPar-tition> assignment()
    获取当前由使用者分配的分区集。
    2. public string subscription()
    为了订阅给定的主题列表,获取动态分配的分区。
    探索Kafka性能调优 - Kafka优化的方法
    3. public void sub-scribe(java.util.List <java.lang.String> topics,ConsumerRe-balanceListener listener)
    此外,订阅给定动态分配的主题列表分区。
    4。 public void unsubscribe()
    现在,取消订阅给定分区列表中的主题。
    5. public void sub-scribe(java.util.List <java.lang.String> topics)
    为了订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则将其视为与unsubscribe()相同。
    6.  public void subscribe(java.util.regex.Pattern pattern,ConsumerRebalanceLis-tener listener)
    这里,参数模式引用正则表达式格式的订阅模式,并且listener参数从订阅模式获取通知。
    7. public void as-sign(java.util.List <TopicParti-tion> partitions)
    手动为客户分配分区列表。
    8.  民意调查()
    获取使用其中一个subscribe / assign API指定的主题或分区的数据。如果在轮询数据之前未订阅主题,则会返回错误。
    9. public void commitSync()
    为了提交所有订阅的主题和分区列表的最后一个poll()中返回的偏移量。对commitAsyn()应用相同的操作。
    10.  public void seek(TopicPartition partition,long offset)
    获取消费者将在下一个poll()方法中使用的当前偏移值。
    阅读Kafka的优点和缺点
    11. public void resume()
    为了恢复暂停的分区。
    12. public void wakeup()
    唤醒消费者。

    3. ConsumerRecord API

    基本上,要从Kafka集群接收记录,我们使用ConsumerRecord API。它包括一个主题名称,分区号,从中接收记录的偏移量也指向Kafka分区中的记录。此外,要创建具有特定主题名称,分区计数和<key,value>对的使用者记录,我们使用consumerRecord类。它的签名是:

    1. public ConsumerRecord 字符串主题,int分区,长偏移量,K键,V值
    2. public ConsumerRecord(string topic,int partition, long offset,K key, V value)
    • 话题

    从Kafka群集收到的消费者记录的主题名称。

    • 划分

    主题的分区。

    记录的密钥,如果没有密钥存在,则返回null。

    记录内容。
    学习Apache Kafka Streams | 流处理拓扑

    4. ConsumerRecords API

    基本上,它是ConsumerRecord的容器。要保留特定主题的每个分区的ConsumerRecord列表,我们使用此API。它的构造函数是:

    1. public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
      <Consumer-Record>K,V>>> records)
    • TopicPartition

    返回特定主题的分区映射。

    • 记录

    返回ConsumerRecord列表。
    这些是ConsumerRecords类的以下方法:
    1。public  int count()
    所有主题的记录数。
    2.  public Set partitions()
    具有此记录集中数据的分区集(如果未返回任何数据,则该集为空)。
    3.  public Iterator iterator()
    通常,迭代器使您可以遍历集合,获取或删除元素。
    4.  public list records()
    基本上,获取给定分区的记录列表。

    5. ConsumerRecord API与ConsumerRecords API

    一个。ConsumerRecord API
    ConsumerRecord API是从Kafka接收的键/值对。它包含主题名称和分区号,从中接收记录以及指向Kafka分区中记录的偏移量。
    湾 ConsumerRecords API
    然而,ConsumerRecords API是一个容器,它为特定主题的每个分区保存ConsumerRecord列表。基本上,Consumer.poll(long)操作返回的每个主题分区都有一个ConsumerRecord列表。
    Apache Kafka工作流程| Kafka Pub-Sub Messaging

    6.配置设置

    在这里,我们列出了Consumer客户端API的配置设置 - 
    1.  bootstrap.servers
    它引导了代理列表。
    2.  group.id
    将个人消费者分配给一个组。
    3.  enable.auto.commit
    基本上,如果值为true,则启用偏移的自动提交,否则不提交。
    4.  auto.commit.interval.ms
    基本上,它返回更新的消耗偏移量写入ZooKeeper的频率
    5.  session.timeout.ms
    它表示Kafka在放弃并继续使用消息之前等待ZooKeeper响应请求(读或写)的毫秒数。

    7. SimpleConsumer应用程序

    确保生产者应用程序步骤在此处保持不变。这里也开始你的ZooKeeper和Kafka经纪人。此外,使用名为SimpleCon-sumer.java java类创建SimpleConsumer应用程序然后键入以下代码
    阅读Apache Kafka职业范围与薪资趋势。

    import java.util.Properties;
    import java.util.Arrays;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    public class SimpleConsumer {
      public static void main(String[] args) throws Exception {
         if(args.length == 0){
            System.out.println("Enter topic name");
            return;
         }
         //Kafka consumer configuration settings
         String topicName = args[0].toString();
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
         props.put("value.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer
            <String, String>(props);
         //Kafka Consumer subscribes list of topics here.
         consumer.subscribe(Arrays.asList(topicName))
         //print the topic name
         System.out.println("Subscribed to topic " + topicName);
         int i = 0;
         while (true) {
            ConsumerRecords<String, String> records = con-sumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
            // print the offset,key and value for the consumer records.
            System.out.printf("offset = %d, key = %s, value = %s
    ",
               record.offset(), record.key(), record.value());
         }
      }
    }
     

     A。汇编

    通过使用以下命令,我们可以编译应用程序。


    1. javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

     B.执行

    而且,使用以下命令我们可以执行应用程序。

    1. java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

     C.输入

    此外,打开生产者CLI并向主题发送一些消息。我们可以将简单输入作为'Hello Consumer'。 

    d. 产量

    输出

    1. 订阅主题Hello-Kafka
    2. offset = 3 ,key = null,value = Hello Consumer
    3. Subscribed to topic Hello-Kafka
      offset = 3, key = null, value = Hello Consumer

    8.卡夫卡消费者集团

    基本上,Kafka中的Consumer组是来自Kafka主题的多线程或多机器消费。

    卡夫卡消费者 - 卡夫卡消费者集团

    卡夫卡消费者 - 卡夫卡消费者集团

    • 通过使用相同的group.id,消费者可以加入一个组。
    • 组的最大并行度是组中的消费者数量←分区数。
    • 此外,Kafka将主题的分区分配给组中的消费者。因此,每个分区仅由该组中的一个消费者使用。
    • 此外,Kafka保证消息只能由组中的单个消费者读取。
    • 消费者可以按照日志中存储的顺序查看消息。

    看看Storm Kafka与配置和代码
    集成 a。重新平衡消费者
    基本上,增加更多流程/线程将导致Kafka重新平衡。基本上,如果任何消费者或代理以某种方式无法向ZooKeeper发送心跳,则可以通过Kafka群集重新配置它。此外,在此重新平衡期间,Kafka将可用分区分配给可用线程,可能将分区移动到另一个进程。

    import java.util.Properties;
    import java.util.Arrays;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    public class ConsumerGroup {
      public static void main(String[] args) throws Exception {
         if(args.length < 2){
            System.out.println("Usage: consumer <topic> <groupname>");
            return;
         }
         String topic = args[0].toString();
         String group = args[1].toString();
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", group);
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
         props.put("value.deserializer",
            "org.apache.kafka.common.serializa-tion.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         consumer.subscribe(Arrays.asList(topic));
         System.out.println("Subscribed to topic " + topic);
         int i = 0;
         while (true) {
            ConsumerRecords<String, String> records = con-sumer.poll(100);
               for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s
    ",
                  record.offset(), record.key(), record.value());
         }
      }
    }
    ii. Compilation
    javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
    iii. Execution
    >>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":.
    ConsumerGroup <topic-name> my-group
    >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":.
    ConsumerGroup <topic-name> my-group
     

    因此,我们可以看到我们创建了名称的样本组,my-group和两个消费者。
    湾 输入
    现在,在打开生产者CLI后,发送一些消息,如 -

    Test consumer group 01
    Test consumer group 02
    1. 测试消费者组01
    2. 测试消费者组02

    C。第一个过程的输出
    学习Apache Kafka用例| 卡夫卡应用程序

    Subscribed to topic Hello-kafka
    offset = 3, key = null, value = Test consumer group 01
    1. 订阅主题Hello-kafka
    2. offset = 3 ,key = null,value =测试消费者组01

    d。此外,第二个过程的输出

    Subscribed to topic Hello-kafka
    offset = 3, key = null, value = Test consumer group 02
    1. 订阅主题Hello-kafka
    2. offset = 3 ,key = null,value =测试消费者组02

    所以,这完全是关于Kafka的Apache Kafka消费者和消费者群体的例子。希望你喜欢我们的解释。

    9.结论:卡夫卡消费者

    因此,我们通过使用Java客户端演示详细了解了Kafka Consumer和ConsumerGroup。此外,通过这个,我们了解了如何使用Java客户端发送和接收消息。此外,我们讨论了Kafka Consumer记录API和Consumer Records API以及两者的比较。此外,我们还学习了Kafka Consumer客户端API的配置设置。但是,如果有任何疑问,请随时在评论部分询问。
    另请参阅 -  
    Kafka Broker 
    供参考

  • 相关阅读:
    vim:去掉响铃
    vim:过一个字符
    Msys2:windows下好用的unix模拟器
    vim:折叠操作
    vim:inoremap命令
    vim:关于映射和跳出括号
    vim打造简易C语言编辑器(在用2016.7.10)
    vim利用插件管理工具-管理配置文件
    拨打电话的实现
    类似于抽奖活动的小程序
  • 原文地址:https://www.cnblogs.com/a00ium/p/10850141.html
Copyright © 2011-2022 走看看