消费者Demo
1 import org.apache.kafka.clients.consumer.ConsumerRecord; 2 import org.apache.kafka.clients.consumer.ConsumerRecords; 3 import org.apache.kafka.clients.consumer.KafkaConsumer; 4 import org.apache.kafka.common.serialization.StringDeserializer; 5 6 import java.util.Arrays; 7 import java.util.Properties; 8 9 public class ConsumerDemo { 10 private final KafkaConsumer<String, String> consumer; 11 private ConsumerRecords<String, String> msgList; 12 private final String topic; 13 private static final String GROUPID = "groupA"; 14 15 public ConsumerDemo(String topicName) { 16 Properties props = new Properties(); 17 props.put("bootstrap.servers", "localhost:9092"); 18 props.put("group.id", GROUPID); 19 props.put("enable.auto.commit", "true"); 20 props.put("auto.commit.interval.ms", "1000"); 21 props.put("session.timeout.ms", "30000"); 22 props.put("auto.offset.reset", "earliest"); 23 props.put("key.deserializer", StringDeserializer.class.getName()); 24 props.put("value.deserializer", StringDeserializer.class.getName()); 25 this.consumer = new KafkaConsumer<String, String>(props); 26 this.topic = topicName; 27 this.consumer.subscribe(Arrays.asList(topic)); 28 } 29 30 31 public void receiveMsg() { 32 int messageNo = 1; 33 System.out.println("---------开始消费---------"); 34 try { 35 for (;;) { 36 msgList = consumer.poll(1000); 37 if(null!=msgList&&msgList.count()>0){ 38 for (ConsumerRecord<String, String> record : msgList) { 39 System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset()); 40 } 41 }else{ 42 Thread.sleep(1000); 43 } 44 } 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } finally { 48 consumer.close(); 49 } 50 } 51 public static void main(String args[]) { 52 ConsumerDemo consumerDemo = new ConsumerDemo("KAFKA_TEST"); 53 consumerDemo.receiveMsg(); 54 } 55 }
36行开始拉取kafka服务器消息,进入源码KafkaConsumer.java poll方法
1 @Override 2 public ConsumerRecords<K, V> poll(long timeout) { 3 acquireAndEnsureOpen(); 4 try { 5 if (timeout < 0) 6 throw new IllegalArgumentException("Timeout must not be negative"); 7 8 if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) 9 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); 10 11 // poll for new data until the timeout expires 12 long start = time.milliseconds(); 13 long remaining = timeout; 14 do { 15 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); 16 if (!records.isEmpty()) { 17 // before returning the fetched records, we can send off the next round of fetches 18 // and avoid block waiting for their responses to enable pipelining while the user 19 // is handling the fetched records. 20 // 21 // NOTE: since the consumed position has already been updated, we must not allow 22 // wakeups or any other errors to be triggered prior to returning the fetched records. 23 if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) 24 client.pollNoWakeup(); 25 26 if (this.interceptors == null) 27 return new ConsumerRecords<>(records); 28 else 29 return this.interceptors.onConsume(new ConsumerRecords<>(records)); 30 } 31 32 long elapsed = time.milliseconds() - start; 33 remaining = timeout - elapsed; 34 } while (remaining > 0); 35 36 return ConsumerRecords.empty(); 37 } finally { 38 release(); 39 } 40 }
15行pollOnce()方法拉取消息,进入pollOnce()方法
1 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { 2 client.maybeTriggerWakeup(); 3 coordinator.poll(time.milliseconds(), timeout); 4 5 // fetch positions if we have partitions we're subscribed to that we 6 // don't know the offset for 7 if (!subscriptions.hasAllFetchPositions()) 8 updateFetchPositions(this.subscriptions.missingFetchPositions()); 9 10 // if data is available already, return it immediately 11 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); 12 if (!records.isEmpty()) 13 return records; 14 15 // send any new fetches (won't resend pending fetches) 16 fetcher.sendFetches(); 17 18 long now = time.milliseconds(); 19 long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); 20 21 client.poll(pollTimeout, now, new PollCondition() { 22 @Override 23 public boolean shouldBlock() { 24 // since a fetch might be completed by the background thread, we need this poll condition 25 // to ensure that we do not block unnecessarily in poll() 26 return !fetcher.hasCompletedFetches(); 27 } 28 }); 29 30 // after the long poll, we should check whether the group needs to rebalance 31 // prior to returning data so that the group can stabilize faster 32 if (coordinator.needRejoin()) 33 return Collections.emptyMap(); 34 35 return fetcher.fetchedRecords(); 36 }
11行拉取本地缓存的消息,本地消息为空,16行重新去服务器拉取