zoukankan      html  css  js  c++  java
  • Kafka学习笔记-Java简单操作

    Maven依赖包:

    [plain] view plain copy
     
    1. <dependency>  
    2.         <groupId>org.apache.kafka</groupId>  
    3.         <artifactId>kafka-clients</artifactId>  
    4.         <version>0.8.2.1</version>  
    5. </dependency>  
    6.           
    7. <dependency>  
    8.     <groupId>org.apache.kafka</groupId>  
    9.     <artifactId>kafka_2.11</artifactId>  
    10.     <version>0.8.2.1</version>  
    11. </dependency>  


    代码如下:

    [java] view plain copy
     
    1. import java.util.Properties;  
    2.   
    3. import org.apache.kafka.clients.producer.Callback;  
    4. import org.apache.kafka.clients.producer.KafkaProducer;  
    5. import org.apache.kafka.clients.producer.ProducerRecord;  
    6. import org.apache.kafka.clients.producer.RecordMetadata;  
    7. import org.slf4j.Logger;  
    8. import org.slf4j.LoggerFactory;  
    9.   
    10. public class KafkaProducerTest {  
    11.       
    12.     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);  
    13.       
    14.     private static Properties properties = null;  
    15.       
    16.     static {  
    17.         properties = new Properties();  
    18.         properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
    19.         properties.put("producer.type", "sync");  
    20.         properties.put("request.required.acks", "1");  
    21.         properties.put("serializer.class", "kafka.serializer.DefaultEncoder");  
    22.         properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");  
    23.         properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
    24. //      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    25.         properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
    26. //      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    27.     }  
    28.       
    29.     public void produce() {  
    30.         KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);  
    31.         ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(  
    32.                 "test", "kkk".getBytes(), "vvv".getBytes());  
    33.         kafkaProducer.send(kafkaRecord, new Callback() {  
    34.             public void onCompletion(RecordMetadata metadata, Exception e) {  
    35.                 if(null != e) {  
    36.                     LOG.info("the offset of the send record is {}", metadata.offset());  
    37.                     LOG.error(e.getMessage(), e);  
    38.                 }  
    39.                 LOG.info("complete!");  
    40.             }  
    41.         });  
    42.         kafkaProducer.close();  
    43.     }  
    44.   
    45.     public static void main(String[] args) {  
    46.         KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();  
    47.         for (int i = 0; i < 10; i++) {  
    48.             kafkaProducerTest.produce();  
    49.         }  
    50.     }  
    51. }  
    [java] view plain copy
     
    1. import java.util.List;  
    2. import java.util.Map;  
    3. import java.util.Properties;  
    4.   
    5. import org.apache.kafka.clients.consumer.ConsumerConfig;  
    6. import org.apache.kafka.clients.consumer.ConsumerRecord;  
    7. import org.apache.kafka.clients.consumer.ConsumerRecords;  
    8. import org.apache.kafka.clients.consumer.KafkaConsumer;  
    9. import org.slf4j.Logger;  
    10. import org.slf4j.LoggerFactory;  
    11.   
    12. public class KafkaConsumerTest {  
    13.       
    14.     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);  
    15.       
    16.     public static void main(String[] args) {  
    17.         Properties properties = new Properties();  
    18.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,   
    19.                 "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
    20.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");              
    21.         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");              
    22.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  
    23.         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");  
    24. //      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");  
    25.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");    
    26.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   
    27.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
    28.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,   
    29.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
    30.           
    31.         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);  
    32.         kafkaConsumer.subscribe("test");  
    33. //      kafkaConsumer.subscribe("*");  
    34.         boolean isRunning = true;              
    35.         while(isRunning) {  
    36.             Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);  
    37.             if (null != results) {  
    38.                 for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {  
    39.                     LOG.info("topic {}", entry.getKey());  
    40.                     ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();  
    41.                     List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();  
    42.                     for (int i = 0, len = records.size(); i < len; i++) {  
    43.                         ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);  
    44.                         LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());  
    45.                         try {  
    46.                             LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));  
    47.                         } catch (Exception e) {  
    48.                             LOG.error(e.getMessage(), e);  
    49.                         }  
    50.                     }  
    51.                 }  
    52.             }  
    53.         }  
    54.           
    55.         kafkaConsumer.close();    
    56.           
    57.     }  
    58.   
    59. }  

    发现KafkaConsumer的poll方法未实现

    [java] view plain copy
     
    1. @Override  
    2. public Map<String, ConsumerRecords<K,V>> poll(long timeout) {  
    3.      // TODO Auto-generated method stub  
    4.      return null;  
    5. }  


    后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行

    [java] view plain copy
     
      1. import java.nio.ByteBuffer;  
      2. import java.util.ArrayList;  
      3. import java.util.Collections;  
      4. import java.util.HashMap;  
      5. import java.util.List;  
      6. import java.util.Map;  
      7.   
      8. import kafka.api.FetchRequest;  
      9. import kafka.api.FetchRequestBuilder;  
      10. import kafka.api.PartitionOffsetRequestInfo;  
      11. import kafka.cluster.Broker;  
      12. import kafka.common.ErrorMapping;  
      13. import kafka.common.TopicAndPartition;  
      14. import kafka.javaapi.FetchResponse;  
      15. import kafka.javaapi.OffsetRequest;  
      16. import kafka.javaapi.OffsetResponse;  
      17. import kafka.javaapi.PartitionMetadata;  
      18. import kafka.javaapi.TopicMetadata;  
      19. import kafka.javaapi.TopicMetadataRequest;  
      20. import kafka.javaapi.TopicMetadataResponse;  
      21. import kafka.javaapi.consumer.SimpleConsumer;  
      22. import kafka.message.MessageAndOffset;  
      23.   
      24. public class KafkaSimpleConsumerTest {  
      25.       
      26.     private List<String> borkerList = new ArrayList<String>();    
      27.         
      28.     public KafkaSimpleConsumerTest() {    
      29.         borkerList = new ArrayList<String>();    
      30.     }    
      31.     
      32.     public static void main(String args[]) {    
      33.         KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();    
      34.         // 最大读取消息数量    
      35.         long maxReadNum = Long.parseLong("3");    
      36.         // 订阅的topic    
      37.         String topic = "test";    
      38.         // 查找的分区    
      39.         int partition = Integer.parseInt("0");    
      40.         // broker节点  
      41.         List<String> seeds = new ArrayList<String>();    
      42.         seeds.add("centos.master");    
      43.         seeds.add("centos.slave1");    
      44.         seeds.add("centos.slave2");    
      45.         // 端口    
      46.         int port = Integer.parseInt("9092");    
      47.         try {    
      48.             kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);    
      49.         } catch (Exception e) {    
      50.             System.out.println("Oops:" + e);    
      51.             e.printStackTrace();    
      52.         }    
      53.     }    
      54.     
      55.     public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {    
      56.         // 获取指定topic partition的元数据    
      57.         PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);    
      58.         if (metadata == null) {    
      59.             System.out.println("can't find metadata for topic and partition. exit");    
      60.             return;    
      61.         }    
      62.         if (metadata.leader() == null) {    
      63.             System.out.println("can't find leader for topic and partition. exit");    
      64.             return;    
      65.         }    
      66.         String leadBroker = metadata.leader().host();    
      67.         String clientName = "client_" + topic + "_" + partition;    
      68.     
      69.         SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
      70.         long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);    
      71.         int numErrors = 0;    
      72.         while (maxReadNum > 0) {    
      73.             if (consumer == null) {    
      74.                 consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
      75.             }    
      76.             FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();    
      77.             FetchResponse fetchResponse = consumer.fetch(req);    
      78.     
      79.             if (fetchResponse.hasError()) {    
      80.                 numErrors++;    
      81.                 short code = fetchResponse.errorCode(topic, partition);    
      82.                 System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);    
      83.                 if (numErrors > 5)    
      84.                     break;    
      85.                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {    
      86.                     readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);    
      87.                     continue;    
      88.                 }    
      89.                 consumer.close();    
      90.                 consumer = null;    
      91.                 leadBroker = findNewLeader(leadBroker, topic, partition, port);    
      92.                 continue;    
      93.             }    
      94.             numErrors = 0;    
      95.     
      96.             long numRead = 0;    
      97.             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {    
      98.                 long currentOffset = messageAndOffset.offset();    
      99.                 if (currentOffset < readOffset) {    
      100.                     System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);    
      101.                     continue;    
      102.                 }    
      103.     
      104.                 readOffset = messageAndOffset.nextOffset();    
      105.                 ByteBuffer payload = messageAndOffset.message().payload();    
      106.     
      107.                 byte[] bytes = new byte[payload.limit()];    
      108.                 payload.get(bytes);    
      109.                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));    
      110.                 numRead++;    
      111.                 maxReadNum--;    
      112.             }    
      113.     
      114.             if (numRead == 0) {    
      115.                 try {    
      116.                     Thread.sleep(1000);    
      117.                 } catch (InterruptedException ie) {    
      118.                 }    
      119.             }    
      120.         }    
      121.         if (consumer != null)    
      122.             consumer.close();    
      123.     }    
      124.      
      125.     /** 
      126.      * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker 
      127.      * @param seedBrokers 
      128.      * @param port 
      129.      * @param topic 
      130.      * @param partition 
      131.      * @return 
      132.      */  
      133.     private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {    
      134.         PartitionMetadata partitionMetadata = null;    
      135.         loop: for (String seedBroker : seedBrokers) {    
      136.             SimpleConsumer consumer = null;    
      137.             try {    
      138.                 consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");    
      139.                 List<String> topics = Collections.singletonList(topic);    
      140.                 TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);    
      141.                 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);    
      142.     
      143.                 List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();    
      144.                 for (TopicMetadata topicMetadata : topicMetadatas) {    
      145.                     for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {    
      146.                         if (pMetadata.partitionId() == partition) {    
      147.                             partitionMetadata = pMetadata;    
      148.                             break loop;    
      149.                         }    
      150.                     }    
      151.                 }    
      152.             } catch (Exception e) {    
      153.                 System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);    
      154.             } finally {    
      155.                 if (consumer != null)    
      156.                     consumer.close();    
      157.             }    
      158.         }    
      159.         if (partitionMetadata != null) {    
      160.             borkerList.clear();    
      161.             for (Broker replica : partitionMetadata.replicas()) {    
      162.                 borkerList.add(replica.host());    
      163.             }    
      164.         }    
      165.         return partitionMetadata;    
      166.     }    
      167.     
      168.     public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {    
      169.         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);    
      170.         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();    
      171.         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));    
      172.         OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);    
      173.         OffsetResponse response = consumer.getOffsetsBefore(request);    
      174.         if (response.hasError()) {    
      175.             System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));    
      176.             return 0;    
      177.         }    
      178.         long[] offsets = response.offsets(topic, partition);    
      179.         return offsets[0];    
      180.     }    
      181.     
      182.     private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {    
      183.         for (int i = 0; i < 3; i++) {    
      184.             boolean goToSleep = false;    
      185.             PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);    
      186.             if (metadata == null) {    
      187.                 goToSleep = true;    
      188.             } else if (metadata.leader() == null) {    
      189.                 goToSleep = true;    
      190.             } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {    
      191.                 goToSleep = true;    
      192.             } else {    
      193.                 return metadata.leader().host();    
      194.             }    
      195.             if (goToSleep) {    
      196.                 try {    
      197.                     Thread.sleep(1000);    
      198.                 } catch (InterruptedException ie) {    
      199.                 }    
      200.             }    
      201.         }    
      202.         System.out.println("unable to find new leader after broker failure. exit");    
      203.         throw new Exception("unable to find new leader after broker failure. exit");    
      204.     }    
      205.     
      206. }    
  • 相关阅读:
    python库--pandas--文本文件读取
    python库--flashtext--大规模数据清洗利器
    PyCharm--帮助文档
    Git--命令
    symfony doctrine generate entity repository
    [转]MySQL性能优化的最佳20+条经验
    svn使用
    一致性hash
    JavaScript学习笔记 1
    curl发出请求
  • 原文地址:https://www.cnblogs.com/edison2012/p/5759223.html
Copyright © 2011-2022 走看看