zoukankan      html  css  js  c++  java
  • Kafka之常用API代码示例

    其实Kafka作为一款消息队列,本身的api并不多,也不复杂,本文列举出来生产者和消费者常见的api

    public class KafkaTestProduer {
    
        public static final Logger logger = LoggerFactory.getLogger(KafkaTestProduer.class);
    
        public final static String kafkaServer = "10.37.136.172:9092";
        public final static String topic = "mxz.test.202010311544";
    
        public static Properties initConfig() {
    
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.CLIENT_ID_CONFIG, "mxz.client");
            props.put(ProducerConfig.RETRIES_CONFIG, 3);
            return props;
        }
    
        public static Properties initConsumerConfig() {
    
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "mxz.client");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "202101051648");
            props.put("auto.offset.reset", "earliest");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerinterceptorTTL.class.getName());
            return props;
        }
    
    //     public static void main(String[] args) {
    //
    //     Properties props = initConfig();
    //     KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    //
    //     ProducerRecord<String, String> record = new ProducerRecord<>(topic, "batman");
    //     ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "superman");
    //     ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "aquaman");
    //     ProducerRecord<String, String> record4 = new ProducerRecord<>(topic, "flash");
    //     ProducerRecord<String, String> record5 = new ProducerRecord<>(topic, "wonderwoman");
    //     List<ProducerRecord<String, String>> list = new ArrayList<>();
    //     list.add(record);
    //     list.add(record2);
    //     list.add(record3);
    //     list.add(record4);
    //     list.add(record5);
    //     try {
    //     for (ProducerRecord<String, String> recordItem : list) {
    //     producer.send(recordItem, new CallbackImpl());
    //     }
    //
    //     }finally {
    //     producer.close();
    //     }
    //
    //     }
    
        public static class CallbackImpl implements Callback {
    
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
    
                } else {
                    System.out.print(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset());
                }
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            logger.info("kafka consumer main begins {} ");
            Properties props = initConsumerConfig();
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    
                }
    
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    
                }
    
            });
    
    //        ConsumerRecords<String, String> records = consumer.poll(1000);
    //        TopicPartition tp = new TopicPartition(topic, 0);
    //        List<ConsumerRecord<String, String>> list = records.records(tp);
    //        if (list.isEmpty()) {
    //            return;
    //        }
    //        long offset = list.get(0).offset();
    
            Set<TopicPartition> assignment = consumer.assignment();
            while (assignment.size() == 0) {
                consumer.poll(100L);
                assignment = consumer.assignment();
            }
             for (TopicPartition tpItem : assignment) {
                consumer.seek(tpItem, 19);
             }
    //        Set<TopicPartition> assignment = new HashSet<>();
    //        while (assignment.size() == 0) {
    //            consumer.poll(100L);
    //            assignment = consumer.assignment();
    //        }
    //        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
    //        for (TopicPartition tp : assignment) {
    //            timestampToSearch.put(tp, System.currentTimeMillis() - 1000 * 1000);
    //        }
    //
    //        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
    //        for (TopicPartition tp : assignment) {
    //            OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
    //            if (offsetAndTimestamp != null) {
    //                consumer.seek(tp, offsetAndTimestamp.offset());
    ////                consumer.poll(1000);
    //            }
    //        }
    
            // for (TopicPartition tp : assignment) {
            // consumer.seek(tp,10);
            // }
            ConsumerRecords<String, String> records = consumer.poll(1000);
            //
            // for (TopicPartition tp : records.partitions()) {
            // List<ConsumerRecord<String, String>> list = records.records(tp);
            // long lastConsumedOffset = list.get(list.size() - 1).offset() ;
            // System.out.println("lastConsumedOffset: " + lastConsumedOffset);
            // }
            //
             for (ConsumerRecord<String, String> record : records) {
    
             System.out.println(record.value());
    
             }
    
            consumer.commitAsync(new OffsetCommitCallback() {
    
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception == null) {
                        logger.info("offset {}", offsets);
                        logger.info("time {} ", System.currentTimeMillis());
                        System.out.println("TIME IS END");
                    } else {
                        System.out.println(exception);
                        logger.info("offset {}", offsets);
                        logger.info("time {} ", System.currentTimeMillis());
                    }
                }
            });
    
            Thread.sleep(30000);
            consumer.close();
        }
    }

     再加一个生产者指定partition的写法

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
            if (topic == null)
                throw new IllegalArgumentException("Topic cannot be null.");
            if (timestamp != null && timestamp < 0)
                throw new IllegalArgumentException(
                        String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
            if (partition != null && partition < 0)
                throw new IllegalArgumentException(
                        String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
            this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.headers = new RecordHeaders(headers);
        }
  • 相关阅读:
    答《同样 25 岁,为什么有的人事业小成、家庭幸福,有的人却还在一无所有的起点上?》
    [面试记录-附部分面试题]2014第一波的找工作的记录
    项目总结(二)->一些常用的工具浅谈
    项目总结(一)->项目的七宗罪
    Android学习笔记(三)Application类简介
    Android学习笔记(二)Manifest文件节点详解
    Android学习笔记(一)Android应用程序的组成部分
    Mac下搭建Eclipse Android开发环境
    Android开发必知--自定义Toast提示
    正则表达式(一)
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14331663.html
Copyright © 2011-2022 走看看