zoukankan      html  css  js  c++  java
  • kafka学习总结013 --- kafka消费者API

    创建消费者

    public static Consumer<String, String> createConsume2(String groupName) {
        Properties properties = new Properties();
        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTARP_SERVER_URL);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new KafkaConsumer<>(properties);
    }

    数据消费流程

    public class MyConsumer1 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer1.class);
    
        public static void main(String[] args) {
            Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
            consumer.subscribe(Collections.singletonList("topic1"));
    
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer1: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }
    }

    运行结果

  • 相关阅读:
    2019 SDN阅读作业
    第02组 Alpha冲刺(2/6)
    2019 SDN上机第3次作业
    第02组 Alpha冲刺(1/6)
    2019 SDN上机第2次作业
    bzoj 3207 可持久化线段树
    bzoj 3524 可持久化线段树
    HDU 4757 可持久化trie树
    bzoj 3197 DP
    poj 2104 可持久化线段树
  • 原文地址:https://www.cnblogs.com/sniffs/p/13202923.html
Copyright © 2011-2022 走看看