zoukankan      html  css  js  c++  java
  • kafka监听类

    package com.datad.dream.service;
    
    import com.alibaba.fastjson.JSON;
    import com.datad.dream.dao.KafkaInfConfigDao;
    import com.datad.dream.entity.KafkaInfConfig;
    import com.datad.dream.entity.KafkaSendInfo;
    import com.datad.dream.entity.Message;
    import com.datad.dream.sysInit.service.ApplicationContextService;
    import com.datad.dream.utils.Global;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * Created by Administrator on 2017/10/25 0025.
     */
    @Service
    public class KafkaConsumerService extends Thread {
        private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
        private ConsumerConnector consumer;
        private KafkaInfConfig kafkaInfConfig;
        private KafkaInfConfigDao kafkaInfConfigDao;
        private KafkaSendInfoService kafkaSendInfoService;
        private KafkaNotificationService kafkaNotificationService;
    
        public KafkaConsumerService() {
            kafkaInfConfigDao = ApplicationContextService.getService().getBean("kafkaInfConfigDao");
            kafkaSendInfoService = ApplicationContextService.getService().getBean("kafkaSendInfoService");
            kafkaNotificationService = ApplicationContextService.getService().getBean("kafkaNotificationService");
            kafkaInfConfig = kafkaInfConfigDao.getKafkaConfig(Global.getConfig("appId"), Global.getConfig("applicationNo"));
            log.info("初始化kafka监听配置:{}", kafkaInfConfig);
        }
    
        @Override
        public void run() {
            Properties props = new Properties();
            props.put("zookeeper.connect", kafkaInfConfig.getZkAddress());
            props.put("group.id", kafkaInfConfig.getGroupId());
            props.put("zookeeper.session.timeout.ms", kafkaInfConfig.getTimeout());
            props.put("zookeeper.sync.time.ms", kafkaInfConfig.getSyncTime());
            props.put("auto.commit.interval.ms", kafkaInfConfig.getIntervalMs());
            props.put("auto.offset.reset", kafkaInfConfig.getReset());
            props.put("serializer.class", kafkaInfConfig.getSerializerClass());
            ConsumerConfig config = new ConsumerConfig(props);
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(kafkaInfConfig.getTopic(), new Integer(1));
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            try {
                Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
                KafkaStream<String, String> stream = consumerMap.get(kafkaInfConfig.getTopic()).get(0);
                ConsumerIterator<String, String> it = stream.iterator();
                while (it.hasNext()) {
                    String messages = it.next().message();
                    log.info("kafka监听到的消息:{}", messages);
                    KafkaSendInfo coupInfo = JSON.parseObject(messages, KafkaSendInfo.class);
                    log.info("监听到的消息体为:{}", coupInfo);
                    kafkaSendInfoService.updateReceive(coupInfo.getAddition());
                    Message message = new Message();
                    message.setAppId(coupInfo.getAppId());
                    message.setApplicationNo(coupInfo.getApplicationNo());
                    message.setHead(coupInfo.getHead());
                    message.setBody(coupInfo.getBody());
                    message.setFoot(coupInfo.getFoot());
                    message.setAddition(coupInfo.getAddition());
                    pushMessage(message);
                }
            } catch (Exception e) {
                log.error("监听出现异常:{}", e);
            }
            log.info("kafka监听完毕");
        }
    
        public boolean startKafak() {
            KafkaConsumerService rtt = new KafkaConsumerService();
            new Thread(rtt).start();
            return true;
        }
    
        public boolean stopKafka() {
            KafkaConsumerService rtt = new KafkaConsumerService();
            new Thread(rtt).stop();
            return true;
        }
    
        public void pushMessage(Message message){
            kafkaInfConfig = kafkaInfConfigDao.getKafkaConfig(message.getAppId(), message.getApplicationNo());
            if("0".equals(kafkaInfConfig.getIsDrainage())){
                kafkaNotificationService.pushMessage(message);
            }else{
                log.info("消息已经被销毁:{}",message);
            }
        }
    }
  • 相关阅读:
    adjacent_diffenerce
    数值算法速食食谱
    accumulate
    平面分割
    进制转换
    奖学金
    谁考了第k名
    奇数单增序列
    病人排序
    灯的开关状态
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/8798933.html
Copyright © 2011-2022 走看看