多线程消费kafka数据
public class KafkaToES {
/**
* 消费者
**/
private static CustomMessageHandler consumer = new KafkaHandler("5");
/**
* 线程池
*/
private ScheduledExecutorService executor;
/**
* 线程启动,消费kafka方法
*/
public void start() {
executor = Executors.newScheduledThreadPool(poolSize);
//消费Kafka线程
executor.scheduleWithFixedDelay(() -> consumptionKafka(), 0, 5, TimeUnit.SECONDS);
}
}
不完整