zoukankan      html  css  js  c++  java
  • kafka 官方示例代码--消费者

    kafka 0.9.0添加了一套新的Java 消费者API,用以替换之前的high-level API (基于ZK) 和low-level API。新的Java消费者API目前为测试版。另外kafka 0.9暂时还支持0.8的Client。

    1、High Level Consumer(0.8)

    package com.test.groups;
     
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
     
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    public class ConsumerGroupExample {
        private final ConsumerConnector consumer;
        private final String topic;
        private  ExecutorService executor;
     
        public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                    createConsumerConfig(a_zookeeper, a_groupId));
            this.topic = a_topic;
        }
     
        public void shutdown() {
            if (consumer != null) consumer.shutdown();
            if (executor != null) executor.shutdown();
            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                    System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
            } catch (InterruptedException e) {
                System.out.println("Interrupted during shutdown, exiting uncleanly");
            }
       }
     
        public void run(int a_numThreads) {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(a_numThreads));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
     
            // now launch all the threads
            //
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            // now create an object to consume the messages
            //
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new ConsumerTest(stream, threadNumber));
                threadNumber++;
            }
        }
     
        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
     
            return new ConsumerConfig(props);
        }
     
        public static void main(String[] args) {
            String zooKeeper = args[0];
            String groupId = args[1];
            String topic = args[2];
            int threads = Integer.parseInt(args[3]);
     
            ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
            example.run(threads);
     
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ie) {
     
            }
            example.shutdown();
        }
    }
    View Code
    package com.test.groups;
     
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
     
    public class ConsumerTest implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
     
        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }
     
        public void run() {
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }
    View Code

    2、low-level SimpleConsumer(0.8)

    package com.test.simple;
     
    import kafka.api.FetchRequest;
    import kafka.api.FetchRequestBuilder;
    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.common.ErrorMapping;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.*;
    import kafka.javaapi.consumer.SimpleConsumer;
    import kafka.message.MessageAndOffset;
     
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
     
    public class SimpleExample {
        public static void main(String args[]) {
            SimpleExample example = new SimpleExample();
            long maxReads = Long.parseLong(args[0]);
            String topic = args[1];
            int partition = Integer.parseInt(args[2]);
            List<String> seeds = new ArrayList<String>();
            seeds.add(args[3]);
            int port = Integer.parseInt(args[4]);
            try {
                example.run(maxReads, topic, partition, seeds, port);
            } catch (Exception e) {
                System.out.println("Oops:" + e);
                 e.printStackTrace();
            }
        }
     
        private List<String> m_replicaBrokers = new ArrayList<String>();
     
        public SimpleExample() {
            m_replicaBrokers = new ArrayList<String>();
        }
     
        public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
            // find the meta data about the topic and partition we are interested in
            //
            PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                System.out.println("Can't find metadata for Topic and Partition. Exiting");
                return;
            }
            if (metadata.leader() == null) {
                System.out.println("Can't find Leader for Topic and Partition. Exiting");
                return;
            }
            String leadBroker = metadata.leader().host();
            String clientName = "Client_" + a_topic + "_" + a_partition;
     
            SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
     
            int numErrors = 0;
            while (a_maxReads > 0) {
                if (consumer == null) {
                    consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
                }
                FetchRequest req = new FetchRequestBuilder()
                        .clientId(clientName)
                        .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                        .build();
                FetchResponse fetchResponse = consumer.fetch(req);
     
                if (fetchResponse.hasError()) {
                    numErrors++;
                    // Something went wrong!
                    short code = fetchResponse.errorCode(a_topic, a_partition);
                    System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                    if (numErrors > 5) break;
                    if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                        // We asked for an invalid offset. For simple case ask for the last element to reset
                        readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                        continue;
                    }
                    consumer.close();
                    consumer = null;
                    leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                    continue;
                }
                numErrors = 0;
     
                long numRead = 0;
                for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                    long currentOffset = messageAndOffset.offset();
                    if (currentOffset < readOffset) {
                        System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                        continue;
                    }
                    readOffset = messageAndOffset.nextOffset();
                    ByteBuffer payload = messageAndOffset.message().payload();
     
                    byte[] bytes = new byte[payload.limit()];
                    payload.get(bytes);
                    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                    numRead++;
                    a_maxReads--;
                }
     
                if (numRead == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ie) {
                    }
                }
            }
            if (consumer != null) consumer.close();
        }
     
        public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                         long whichTime, String clientName) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
            Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
            kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                    requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
            OffsetResponse response = consumer.getOffsetsBefore(request);
     
            if (response.hasError()) {
                System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
                return 0;
            }
            long[] offsets = response.offsets(topic, partition);
            return offsets[0];
        }
     
        private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
            for (int i = 0; i < 3; i++) {
                boolean goToSleep = false;
                PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
                if (metadata == null) {
                    goToSleep = true;
                } else if (metadata.leader() == null) {
                    goToSleep = true;
                } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                    // first time through if the leader hasn't changed give ZooKeeper a second to recover
                    // second time, assume the broker did recover before failover, or it was a non-Broker issue
                    //
                    goToSleep = true;
                } else {
                    return metadata.leader().host();
                }
                if (goToSleep) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ie) {
                    }
                }
            }
            System.out.println("Unable to find new leader after Broker failure. Exiting");
            throw new Exception("Unable to find new leader after Broker failure. Exiting");
        }
     
        private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
            PartitionMetadata returnMetaData = null;
            loop:
            for (String seed : a_seedBrokers) {
                SimpleConsumer consumer = null;
                try {
                    consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                    List<String> topics = Collections.singletonList(a_topic);
                    TopicMetadataRequest req = new TopicMetadataRequest(topics);
                    kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
     
                    List<TopicMetadata> metaData = resp.topicsMetadata();
                    for (TopicMetadata item : metaData) {
                        for (PartitionMetadata part : item.partitionsMetadata()) {
                            if (part.partitionId() == a_partition) {
                                returnMetaData = part;
                                break loop;
                            }
                        }
                    }
                } catch (Exception e) {
                    System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                            + ", " + a_partition + "] Reason: " + e);
                } finally {
                    if (consumer != null) consumer.close();
                }
            }
            if (returnMetaData != null) {
                m_replicaBrokers.clear();
                for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                    m_replicaBrokers.add(replica.host());
                }
            }
            return returnMetaData;
        }
    }
    View Code

     3、New Consumer API

    依赖包

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    View Code

    3.1 自动提交offset到zk

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }
    View Code

    3.2 手动控制offset

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
    }
    View Code
    try {
             while(running) {
                 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                 for (TopicPartition partition : records.partitions()) {
                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                     for (ConsumerRecord<String, String> record : partitionRecords) {
                         System.out.println(record.offset() + ": " + record.value());
                     }
                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                 }
             }
         } finally {
           consumer.close();
         }
    View Code

    3.3 多线程

    public class KafkaConsumerRunner implements Runnable {
         private final AtomicBoolean closed = new AtomicBoolean(false);
         private final KafkaConsumer consumer;
    
         public void run() {
             try {
                 consumer.subscribe(Arrays.asList("topic"));
                 while (!closed.get()) {
                     ConsumerRecords records = consumer.poll(10000);
                     // Handle new records
                 }
             } catch (WakeupException e) {
                 // Ignore exception if closing
                 if (!closed.get()) throw e;
             } finally {
                 consumer.close();
             }
         }
    
         // Shutdown hook which can be called from a separate thread
         public void shutdown() {
             closed.set(true);
             consumer.wakeup();
         }
     }
     
    Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
         closed.set(true);
         consumer.wakeup();
    View Code

      

  • 相关阅读:
    Git笔记
    排序学习LTR(1):排序算法的评价指标
    C++指针
    C++基础知识笔记
    Shell脚本--菜鸟教程笔记
    torch学习01-入门文档学习
    torch学习02-tensor学习
    torch学习0: 学习概览
    linux基础-用户创建及管理相关
    python-getattr() 函数 dir() 函数
  • 原文地址:https://www.cnblogs.com/piaolingzxh/p/5484770.html
Copyright © 2011-2022 走看看