建立kafka消费类ConsumerRunnable ,实现Runnable接口:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.*; /** * @Auther: lyl * @Date: 2019/9/12 16:28 * @Description: */ @Slf4j public class ConsumerRunnable implements Runnable { // 每个线程维护私有的KafkaConsumer实例 private final KafkaConsumer<String, String> consumer; public ConsumerRunnable(String brokerList, String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { try { while (true) { try { ConsumerRecords<String, String> records = consumer.poll(100); // 本例使用100ms作为获取超时时间 for (ConsumerRecord<String, String> record : records) { // 这里面写处理消息的逻辑 String value = record.value(); if (value.startsWith("obj_vehicle_pass")) { // System.out.println(value); value = value.substring(17, value.length()); JSONObject parse = JSONObject.parseObject(value); } } } catch (Exception e) { log.error("kafka数据消费异常============="); e.printStackTrace(); } } } catch (Exception e) { log.error("初始化kafka异常============="); e.printStackTrace(); } } }
在编写一个类,用来初始化上面这个类,并通过线程启动
import java.util.ArrayList; import java.util.List; /** * @Auther: lyl * @Date: 2019/9/12 16:29 * @Description: */ public class ConsumerGroup { private List<ConsumerRunnable> consumers; public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) { consumers = new ArrayList<>(consumerNum); for (int i = 0; i < consumerNum; ++i) { ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic); consumers.add(consumerThread); } } public void execute() { for (ConsumerRunnable task : consumers) { new Thread(task).start(); } } }
最后项目启动时先初始化一下ConsumerGroup这个类,在调用一下execute()方法就能进行消费