zoukankan      html  css  js  c++  java
  • kafka学习总结014 --- consumer多线程问题

    KafkaConsumer是非线程安全的,多线程共享一个KafkaConsumer实例,kafka会有如下异常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    public class MyConsumer5 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class);
    
        public static void main(String[] args) throws InterruptedException {
            Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
            consumer.subscribe(Collections.singletonList("topic1"));
    
            new Thread(() -> {
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                    }
                }
            }).start();
    
            new Thread(() -> {
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                    }
                }
            }).start();
        }
    }

    运行结果:

    使用方法可见:https://blog.csdn.net/clypm/article/details/80618036

  • 相关阅读:
    Day 03
    Day 02
    Day 01
    re正则表达四
    python学习05之数据可视化
    python学习04之柱形图和热图
    python学习03之线图表
    python的学习02之数据处理
    python的学习01之csv文件处理
    中国大学排名实例
  • 原文地址:https://www.cnblogs.com/sniffs/p/13203040.html
Copyright © 2011-2022 走看看