zoukankan      html  css  js  c++  java
  • Kafka

    1.概述

      在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

    2.内容

      在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

    • 一个消息进行多次读取
    • 在处理过程中只消费 Partition 其中的某一部分消息
    • 添加事物管理机制以保证消息仅被处理一次

      当然,在使用的过程当中也是有些弊端的,其内容如下:

    • 必须在程序中跟踪 Offset 的值
    • 必须找出指定的 Topic Partition 中的 Lead Broker
    • 必须处理 Broker 的变动

      使用其 API 的思路步骤如下所示:

    • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
    • 找出指定 Topic Partition 中的所有备份 Broker
    • 构造请求
    • 发送请求并查询数据
    • 处理 Leader Broker 的变动

    3.代码实现

    3.1 Java Project

      若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

    • scala-xml_${version}-${version}.jar
    • scala-library-${version}.jar
    • metrics-core-${version}.jar
    • kafka-client-${version}.jar
    • kafka_${version}-${version}.jar

      针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

    3.2 Maven Project

      对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
        </exclusion>
        </exclusions>
    </dependency>

      这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

    3.3 代码实现

      在低级消费 API 中,实现代码如下所示:

    /**
     * @Date Mar 2, 2016
     *
     * @Author dengjie
     *
     * @Note Simple consumer api
     */
    public class SimpleKafkaConsumer {
    	private static Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    	private List<String> m_replicaBrokers = new ArrayList<String>();
    
    	public SimpleKafkaConsumer() {
    		m_replicaBrokers = new ArrayList<String>();
    	}
    
    	public static void main(String[] args) {
    		SimpleKafkaConsumer example = new SimpleKafkaConsumer();
    		// Max read number
    		long maxReads = SystemConfig.getIntProperty("kafka.read.max");
    		// To subscribe to the topic
    		String topic = SystemConfig.getProperty("kafka.topic");
    		// Find partition
    		int partition = SystemConfig.getIntProperty("kafka.partition");
    		// Broker node's ip
    		List<String> seeds = new ArrayList<String>();
    		String[] hosts = SystemConfig.getPropertyArray("kafka.server.host", ",");
    		for (String host : hosts) {
    			seeds.add(host);
    		}
    		int port = SystemConfig.getIntProperty("kafka.server.port");
    		try {
    			example.run(maxReads, topic, partition, seeds, port);
    		} catch (Exception e) {
    			log.error("Oops:" + e);
    			e.printStackTrace();
    		}
    	}
    
    	public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port)
    			throws Exception {
    		// Get point topic partition's meta
    		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    		if (metadata == null) {
    			log.info("[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting");
    			return;
    		}
    		if (metadata.leader() == null) {
    			log.info("[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting");
    			return;
    		}
    		String leadBroker = metadata.leader().host();
    		String clientName = "Client_" + a_topic + "_" + a_partition;
    
    		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    		long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
    				clientName);
    		int numErrors = 0;
    		while (a_maxReads > 0) {
    			if (consumer == null) {
    				consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
    			}
    			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
    					.addFetch(a_topic, a_partition, readOffset, 100000).build();
    			FetchResponse fetchResponse = consumer.fetch(req);
    
    			if (fetchResponse.hasError()) {
    				numErrors++;
    				// Something went wrong!
    				short code = fetchResponse.errorCode(a_topic, a_partition);
    				log.info("[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:" + leadBroker
    						+ " Reason: " + code);
    				if (numErrors > 5)
    					break;
    				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
    					// We asked for an invalid offset. For simple case ask for
    					// the last element to reset
    					readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
    							clientName);
    					continue;
    				}
    				consumer.close();
    				consumer = null;
    				leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
    				continue;
    			}
    			numErrors = 0;
    
    			long numRead = 0;
    			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    				long currentOffset = messageAndOffset.offset();
    				if (currentOffset < readOffset) {
    					log.info("[SimpleKafkaConsumer.run()] - Found an old offset: " + currentOffset + " Expecting: "
    							+ readOffset);
    					continue;
    				}
    
    				readOffset = messageAndOffset.nextOffset();
    				ByteBuffer payload = messageAndOffset.message().payload();
    
    				byte[] bytes = new byte[payload.limit()];
    				payload.get(bytes);
    				System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); // Message deal enter
    				numRead++;
    				a_maxReads--;
    			}
    
    			if (numRead == 0) {
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException ie) {
    				}
    			}
    		}
    		if (consumer != null)
    			consumer.close();
    	}
    
    	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime,
    			String clientName) {
    		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
    				kafka.api.OffsetRequest.CurrentVersion(), clientName);
    		OffsetResponse response = consumer.getOffsetsBefore(request);
    
    		if (response.hasError()) {
    			log.info("[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: "
    					+ response.errorCode(topic, partition));
    			return 0;
    		}
    		long[] offsets = response.offsets(topic, partition);
    		return offsets[0];
    	}
    
    	/**
    	 * @param a_oldLeader
    	 * @param a_topic
    	 * @param a_partition
    	 * @param a_port
    	 * @return String
    	 * @throws Exception
    	 *             find next leader broker
    	 */
    	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
    		for (int i = 0; i < 3; i++) {
    			boolean goToSleep = false;
    			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
    			if (metadata == null) {
    				goToSleep = true;
    			} else if (metadata.leader() == null) {
    				goToSleep = true;
    			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
    				// first time through if the leader hasn't changed give
    				// ZooKeeper a second to recover
    				// second time, assume the broker did recover before failover,
    				// or it was a non-Broker issue
    				//
    				goToSleep = true;
    			} else {
    				return metadata.leader().host();
    			}
    			if (goToSleep) {
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException ie) {
    				}
    			}
    		}
    		throw new Exception("Unable to find new leader after Broker failure. Exiting");
    	}
    
    	private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
    		PartitionMetadata returnMetaData = null;
    		loop: for (String seed : a_seedBrokers) {
    			SimpleConsumer consumer = null;
    			try {
    				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
    				List<String> topics = Collections.singletonList(a_topic);
    				TopicMetadataRequest req = new TopicMetadataRequest(topics);
    				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
    
    				List<TopicMetadata> metaData = resp.topicsMetadata();
    				for (TopicMetadata item : metaData) {
    					for (PartitionMetadata part : item.partitionsMetadata()) {
    						if (part.partitionId() == a_partition) {
    							returnMetaData = part;
    							break loop;
    						}
    					}
    				}
    			} catch (Exception e) {
    				log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", "
    						+ a_partition + "] Reason: " + e);
    			} finally {
    				if (consumer != null)
    					consumer.close();
    			}
    		}
    		if (returnMetaData != null) {
    			m_replicaBrokers.clear();
    			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
    				m_replicaBrokers.add(replica.host());
    			}
    		}
    		return returnMetaData;
    	}
    }
    

    4.总结

      在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

    5.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    TVM性能评估分析(七)
    TVM性能评估分析(六)
    理论三:里式替换(LSP)跟多态有何区别?哪些代码违背了LSP?
    理论二:如何做到“对扩展开放、修改关闭”?扩展和修改各指什么?
    (转)浅谈 SOLID 原则的具体使用
    老一辈的革命先烈告诉我,凡事打好提前量
    理论一:对于单一职责原则,如何判定某个类的职责是否够“单一”?
    实战二(下):如何利用面向对象设计和编程开发接口鉴权功能?
    实战二(上):如何对接口鉴权这样一个功能开发做面向对象分析?
    12 | 实战一(下):如何利用基于充血模型的DDD开发一个虚拟钱包系统?
  • 原文地址:https://www.cnblogs.com/smartloli/p/5241067.html
Copyright © 2011-2022 走看看