KafkaConsumer是非线程安全的,多线程共享一个KafkaConsumer实例,kafka会有如下异常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
public class MyConsumer5 { private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class); public static void main(String[] args) throws InterruptedException { Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2"); consumer.subscribe(Collections.singletonList("topic1")); new Thread(() -> { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : consumerRecords) { LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value()); } } }).start(); new Thread(() -> { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2)); for (ConsumerRecord<String, String> record : consumerRecords) { LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value()); } } }).start(); } }
运行结果: