zoukankan      html  css  js  c++  java
  • Kafka详解五:Kafka Consumer的底层API- SimpleConsumer

    问题导读

    1.Kafka如何实现和Consumer之间的交互?
    2.使用SimpleConsumer有哪些弊端呢?

    1.Kafka提供了两套API给Consumer

    • The high-level Consumer API
    • The SimpleConsumer API


          第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情

    • 一个消息读取多次
    • 在一个处理过程中只消费Partition其中的一部分消息
    • 添加事务管理机制以保证消息被处理且仅被处理一次

    2.使用SimpleConsumer有哪些弊端呢?

    • 必须在程序中跟踪offset值
    • 必须找出指定Topic Partition中的lead broker
    • 必须处理broker的变动

    3.使用SimpleConsumer的步骤

    • 从所有活跃的broker中找出哪个是指定Topic Partition中的leader broker
    • 找出指定Topic Partition中的所有备份broker
    • 构造请求
    • 发送请求查询数据
    • 处理leader broker变更

    4.代码实例

    1. package bonree.consumer;
    2. import java.nio.ByteBuffer;
    3. import java.util.ArrayList;
    4. import java.util.Collections;
    5. import java.util.HashMap;
    6. import java.util.List;
    7. import java.util.Map;
    8. import kafka.api.FetchRequest;
    9. import kafka.api.FetchRequestBuilder;
    10. import kafka.api.PartitionOffsetRequestInfo;
    11. import kafka.common.ErrorMapping;
    12. import kafka.common.TopicAndPartition;
    13. import kafka.javaapi.FetchResponse;
    14. import kafka.javaapi.OffsetResponse;
    15. import kafka.javaapi.PartitionMetadata;
    16. import kafka.javaapi.TopicMetadata;
    17. import kafka.javaapi.TopicMetadataRequest;
    18. import kafka.javaapi.consumer.SimpleConsumer;
    19. import kafka.message.MessageAndOffset;
    20. public class SimpleExample {
    21.         private List<String> m_replicaBrokers = new ArrayList<String>();
    22.         public SimpleExample() {
    23.                 m_replicaBrokers = new ArrayList<String>();
    24.         }
    25.         public static void main(String args[]) {
    26.                 SimpleExample example = new SimpleExample();
    27.                 // 最大读取消息数量
    28.                 long maxReads = Long.parseLong("3");
    29.                 // 要订阅的topic
    30.                 String topic = "mytopic";
    31.                 // 要查找的分区
    32.                 int partition = Integer.parseInt("0");
    33.                 // broker节点的ip
    34.                 List<String> seeds = new ArrayList<String>();
    35.                 seeds.add("192.168.4.30");
    36.                 seeds.add("192.168.4.31");
    37.                 seeds.add("192.168.4.32");
    38.                 // 端口
    39.                 int port = Integer.parseInt("9092");
    40.                 try {
    41.                         example.run(maxReads, topic, partition, seeds, port);
    42.                 } catch (Exception e) {
    43.                         System.out.println("Oops:" + e);
    44.                         e.printStackTrace();
    45.                 }
    46.         }
    47.         public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
    48.                 // 获取指定Topic partition的元数据
    49.                 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    50.                 if (metadata == null) {
    51.                         System.out.println("Can't find metadata for Topic and Partition. Exiting");
    52.                         return;
    53.                 }
    54.                 if (metadata.leader() == null) {
    55.                         System.out.println("Can't find Leader for Topic and Partition. Exiting");
    56.                         return;
    57.                 }
    58.                 String leadBroker = metadata.leader().host();
    59.                 String clientName = "Client_" + a_topic + "_" + a_partition;
    60.                 SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    61.                 long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
    62.                 int numErrors = 0;
    63.                 while (a_maxReads > 0) {
    64.                         if (consumer == null) {
    65.                                 consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    66.                         }
    67.                         FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
    68.                         FetchResponse fetchResponse = consumer.fetch(req);
    69.                         if (fetchResponse.hasError()) {
    70.                                 numErrors++;
    71.                                 // Something went wrong!
    72.                                 short code = fetchResponse.errorCode(a_topic, a_partition);
    73.                                 System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
    74.                                 if (numErrors > 5)
    75.                                         break;
    76.                                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {
    77.                                         // We asked for an invalid offset. For simple case ask for
    78.                                         // the last element to reset
    79.                                         readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
    80.                                         continue;
    81.                                 }
    82.                                 consumer.close();
    83.                                 consumer = null;
    84.                                 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
    85.                                 continue;
    86.                         }
    87.                         numErrors = 0;
    88.                         long numRead = 0;
    89.                         for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    90.                                 long currentOffset = messageAndOffset.offset();
    91.                                 if (currentOffset < readOffset) {
    92.                                         System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
    93.                                         continue;
    94.                                 }
    95.                                 readOffset = messageAndOffset.nextOffset();
    96.                                 ByteBuffer payload = messageAndOffset.message().payload();
    97.                                 byte[] bytes = new byte[payload.limit()];
    98.                                 payload.get(bytes);
    99.                                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    100.                                 numRead++;
    101.                                 a_maxReads--;
    102.                         }
    103.                         if (numRead == 0) {
    104.                                 try {
    105.                                         Thread.sleep(1000);
    106.                                 } catch (InterruptedException ie) {
    107.                                 }
    108.                         }
    109.                 }
    110.                 if (consumer != null)
    111.                         consumer.close();
    112.         }
    113.         public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
    114.                 TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    115.                 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    116.                 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    117.                 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    118.                 OffsetResponse response = consumer.getOffsetsBefore(request);
    119.                 if (response.hasError()) {
    120.                         System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
    121.                         return 0;
    122.                 }
    123.                 long[] offsets = response.offsets(topic, partition);
    124.                 return offsets[0];
    125.         }
    126.         /**
    127.          * @param a_oldLeader
    128.          * @param a_topic
    129.          * @param a_partition
    130.          * @param a_port
    131.          * @return String
    132.          * @throws Exception
    133.          *             找一个leader broker
    134.          */
    135.         private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
    136.                 for (int i = 0; i < 3; i++) {
    137.                         boolean goToSleep = false;
    138.                         PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
    139.                         if (metadata == null) {
    140.                                 goToSleep = true;
    141.                         } else if (metadata.leader() == null) {
    142.                                 goToSleep = true;
    143.                         } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
    144.                                 // first time through if the leader hasn't changed give
    145.                                 // ZooKeeper a second to recover
    146.                                 // second time, assume the broker did recover before failover,
    147.                                 // or it was a non-Broker issue
    148.                                 //
    149.                                 goToSleep = true;
    150.                         } else {
    151.                                 return metadata.leader().host();
    152.                         }
    153.                         if (goToSleep) {
    154.                                 try {
    155.                                         Thread.sleep(1000);
    156.                                 } catch (InterruptedException ie) {
    157.                                 }
    158.                         }
    159.                 }
    160.                 System.out.println("Unable to find new leader after Broker failure. Exiting");
    161.                 throw new Exception("Unable to find new leader after Broker failure. Exiting");
    162.         }
    163.         private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    164.                 PartitionMetadata returnMetaData = null;
    165.                 loop: for (String seed : a_seedBrokers) {
    166.                         SimpleConsumer consumer = null;
    167.                         try {
    168.                                 consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
    169.                                 List<String> topics = Collections.singletonList(a_topic);
    170.                                 TopicMetadataRequest req = new TopicMetadataRequest(topics);
    171.                                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    172.                                 List<TopicMetadata> metaData = resp.topicsMetadata();
    173.                                 for (TopicMetadata item : metaData) {
    174.                                         for (PartitionMetadata part : item.partitionsMetadata()) {
    175.                                                 if (part.partitionId() == a_partition) {
    176.                                                         returnMetaData = part;
    177.                                                         break loop;
    178.                                                 }
    179.                                         }
    180.                                 }
    181.                         } catch (Exception e) {
    182.                                 System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
    183.                         } finally {
    184.                                 if (consumer != null)
    185.                                         consumer.close();
    186.                         }
    187.                 }
    188.                 if (returnMetaData != null) {
    189.                         m_replicaBrokers.clear();
    190.                         for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
    191.                                 m_replicaBrokers.add(replica.host());
    192.                         }
    193.                 }
    194.                 return returnMetaData;
    195.         }
    196. }

           转自:http://www.aboutyun.com/thread-11117-1-1.html

  • 相关阅读:
    扩展Image组件,属性不显示
    unity ugui如何用scrollview展示多个不同的3d物体
    Unity 给模型添加子物体,跟随父物体移动和旋转时不同步问题
    Unity3D性能优化
    Vs2017安装不上个问题,愁了好久
    Time.timeScale 暂停游戏
    unity.3d 打开monodevelop无法调用命名空间问题
    Unity.3D中,两个界面各自脚本中的变量如何调用
    * 笔记标题及标签整体说明+md总结
    & 文件透传整理
  • 原文地址:https://www.cnblogs.com/sh425/p/7158844.html
Copyright © 2011-2022 走看看