zoukankan      html  css  js  c++  java
  • kafka学习总结014 --- consumer多线程问题

    KafkaConsumer是非线程安全的,多线程共享一个KafkaConsumer实例,kafka会有如下异常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    public class MyConsumer5 {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class);
    
        public static void main(String[] args) throws InterruptedException {
            Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
            consumer.subscribe(Collections.singletonList("topic1"));
    
            new Thread(() -> {
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                    }
                }
            }).start();
    
            new Thread(() -> {
                while (true) {
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                    }
                }
            }).start();
        }
    }

    运行结果:

    使用方法可见:https://blog.csdn.net/clypm/article/details/80618036

  • 相关阅读:
    iOS学习-UILabel
    react js
    代理模式
    利用gitbush从git上下载代码到本地
    VS2017企业版密钥
    office2016产品密钥及激活工具
    .netframe初识
    树的遍历——c#实现
    数据结构——总结
    单例模式
  • 原文地址:https://www.cnblogs.com/sniffs/p/13203040.html
Copyright © 2011-2022 走看看