package com.datad.dream.service; import com.alibaba.fastjson.JSON; import com.datad.dream.dao.KafkaInfConfigDao; import com.datad.dream.entity.KafkaInfConfig; import com.datad.dream.entity.KafkaSendInfo; import com.datad.dream.entity.Message; import com.datad.dream.sysInit.service.ApplicationContextService; import com.datad.dream.utils.Global; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * Created by Administrator on 2017/10/25 0025. */ @Service public class KafkaConsumerService extends Thread { private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); private ConsumerConnector consumer; private KafkaInfConfig kafkaInfConfig; private KafkaInfConfigDao kafkaInfConfigDao; private KafkaSendInfoService kafkaSendInfoService; private KafkaNotificationService kafkaNotificationService; public KafkaConsumerService() { kafkaInfConfigDao = ApplicationContextService.getService().getBean("kafkaInfConfigDao"); kafkaSendInfoService = ApplicationContextService.getService().getBean("kafkaSendInfoService"); kafkaNotificationService = ApplicationContextService.getService().getBean("kafkaNotificationService"); kafkaInfConfig = kafkaInfConfigDao.getKafkaConfig(Global.getConfig("appId"), Global.getConfig("applicationNo")); log.info("初始化kafka监听配置:{}", kafkaInfConfig); } @Override public void run() { Properties props = new Properties(); props.put("zookeeper.connect", kafkaInfConfig.getZkAddress()); props.put("group.id", kafkaInfConfig.getGroupId()); props.put("zookeeper.session.timeout.ms", kafkaInfConfig.getTimeout()); props.put("zookeeper.sync.time.ms", kafkaInfConfig.getSyncTime()); props.put("auto.commit.interval.ms", kafkaInfConfig.getIntervalMs()); props.put("auto.offset.reset", kafkaInfConfig.getReset()); props.put("serializer.class", kafkaInfConfig.getSerializerClass()); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(kafkaInfConfig.getTopic(), new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); try { Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(kafkaInfConfig.getTopic()).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) { String messages = it.next().message(); log.info("kafka监听到的消息:{}", messages); KafkaSendInfo coupInfo = JSON.parseObject(messages, KafkaSendInfo.class); log.info("监听到的消息体为:{}", coupInfo); kafkaSendInfoService.updateReceive(coupInfo.getAddition()); Message message = new Message(); message.setAppId(coupInfo.getAppId()); message.setApplicationNo(coupInfo.getApplicationNo()); message.setHead(coupInfo.getHead()); message.setBody(coupInfo.getBody()); message.setFoot(coupInfo.getFoot()); message.setAddition(coupInfo.getAddition()); pushMessage(message); } } catch (Exception e) { log.error("监听出现异常:{}", e); } log.info("kafka监听完毕"); } public boolean startKafak() { KafkaConsumerService rtt = new KafkaConsumerService(); new Thread(rtt).start(); return true; } public boolean stopKafka() { KafkaConsumerService rtt = new KafkaConsumerService(); new Thread(rtt).stop(); return true; } public void pushMessage(Message message){ kafkaInfConfig = kafkaInfConfigDao.getKafkaConfig(message.getAppId(), message.getApplicationNo()); if("0".equals(kafkaInfConfig.getIsDrainage())){ kafkaNotificationService.pushMessage(message); }else{ log.info("消息已经被销毁:{}",message); } } }