zoukankan      html  css  js  c++  java
  • 监听kafka消息

    1、main方法中(1.0以上)

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Kafka消息消费者
     * 〈功能详细描述〉
     *
     * @author 17090889
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    public class ConsumerSample {
        public static void main(String[] args) {
            String topic = "test-topic";
            Properties props = new Properties();
            // Kafka集群,多台服务器地址之间用逗号隔开
            props.put("bootstrap.servers", "localhost:9092");
            // 消费组ID
            props.put("group.id", "test_group1");
            // Consumer的offset是否自动提交
            props.put("enable.auto.commit", "true");
            // 自动提交offset到zk的时间间隔,时间单位是毫秒
            props.put("auto.commit.interval.ms", "1000");
            // 消息的反序列化类型
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            // 订阅的话题
            consumer.subscribe(Arrays.asList(topic));
            // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    System.out.println(record.partition() + record.offset());
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
            }
        }
    }

    2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)

    3、Spring下kafka 0.8版本

      1)kafka消费者抽象工厂类

    /**
     * kafka消费者抽象工厂类
     * 〈功能详细描述〉
     *
     * @author
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean {
    
        private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class);
    
        /**
         * 消费的Topic与消费线程数组成的Map
         */
        private Map<String, Integer> topicThreadMap;
        /**
         * Consumer实例所需的配置
         */
        private Properties properties;
    
        /**
         * 线程池
         */
        private ThreadPoolExecutor taskExecutor;
    
        private ConsumerConnector consumerConnector;
    
        /**
         * zkConnect
         */
        private String zkConnect;
    
        @Value("${kafka.groupId}")
        private String groupId;
    
        /**
         * sessionTimeOut
         */
        @Value("${kafka.sessionTimeOut}")
        private String sessionTimeOut;
    
        /**
         * syncTime
         */
        @Value("${kafka.syncTime}")
        private String syncTime;
    
        /**
         * commitInterval
         */
        @Value("${kafka.commitInterval}")
        private String commitInterval;
    
        /**
         * offsetReset
         */
        @Value("${kafka.offsetReset}")
        private String offsetReset;
    
    
        @Override
        public void afterPropertiesSet() {
            logger.info("afterPropertiesSet-start");
            // 初始化properties
            if(properties==null){
                properties = new Properties();
                properties.put("zookeeper.connect", zkConnect);
                logger.info("zkConnect={}", zkConnect);
                // group 代表一个消费组
                properties.put("group.id", groupId);
                logger.info("groupId={}", groupId);
                // zk连接超时
                properties.put("zookeeper.session.timeout.ms", sessionTimeOut);
                properties.put("zookeeper.sync.time.ms", syncTime);
                properties.put("auto.commit.interval.ms", commitInterval);
                properties.put("auto.offset.reset", offsetReset);
                // 序列化类
                properties.put("serializer.class", "kafka.serializer.StringEncoder");
    
                properties.put("rebalance.max.retries", "10");
                // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。
                properties.put("rebalance.backoff.ms", "3100");
            }
    
            ConsumerConfig consumerConfig = new ConsumerConfig(properties);
            consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap);
            // 实际有多少个stream,就设置多少个线程处理
    //        int messageProcessThreadNum = 0;
    //        for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) {
    //            messageProcessThreadNum = messageProcessThreadNum + streamList.size();
    //        }
            // 创建实际处理消息的线程池
            taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));
            for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) {
                for (final KafkaStream<byte[], byte[]> stream : streams) {
                    taskExecutor.submit(new Runnable() {
                        @Override
                        public void run() {
                            ConsumerIterator<byte[], byte[]> it = stream.iterator();
                            while (it.hasNext()) {
                                MessageAndMetadata<byte[], byte[]> data = it.next();
                                try {
                                    String kafkaMsg = new String(data.message(),"UTF-8");
                                    logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg);
                                    // 消息处理
                                    onMessage(data);
                                } catch (RuntimeException e) {
                                    logger.error("处理消息异常.", e);
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
    
                            }
                        }
    
                    });
                }
            }
    
        }
    
        /**
         * 消息处理类
         * @param data
         */
        protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data);
    
        @Override
        public void destroy() throws Exception {
            try {
                if (consumerConnector != null) {
                    consumerConnector.shutdown();
                }
            } catch (Exception e) {
                logger.warn("shutdown consumer failed", e);
            }
            try {
                if (taskExecutor != null) {
                    taskExecutor.shutdown();
                }
            } catch (Exception e) {
                logger.warn("shutdown messageProcessExecutor failed", e);
            }
            logger.info("shutdown consumer successfully");
        }
    
        public Properties getProperties() {
            return properties;
        }
    
        public void setProperties(Properties properties) {
            this.properties = properties;
        }
    
        public Map<String, Integer> getTopicThreadMap() {
            return topicThreadMap;
        }
    
        public void setTopicThreadMap(Map<String, Integer> topicThreadMap) {
            this.topicThreadMap = topicThreadMap;
        }
    
        public String getZkConnect() {
            return zkConnect;
        }
    
        public void setZkConnect(String zkConnect) {
            this.zkConnect = zkConnect;
        }
    }

      2)具体的kafka消费者实现类

    import com.xxx.sfmms.common.util.JsonConvertUtil;
    import com.xxx.sfmms.common.util.RedisUtil;
    import com.xxx.sfmms.common.util.StringUtil;
    import com.xxx.sfmms.service.intf.RecommendService;
    import kafka.message.MessageAndMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.slf4j.MDC;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 实名kafka消费者
     * 〈功能详细描述〉
     *
     * @author 17090889
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory {
    
        private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class);
    
        private static final String STR_INVOKENO = "invokeNo";
    
        @Autowired
        private RecommendService recommendService;
    
    
        /**
         * 消息处理
         * @param data
         */
        @Override
        protected void onMessage(MessageAndMetadata<byte[], byte[]> data) {
            MDC.put(STR_INVOKENO, StringUtil.getUuid());
            String msg="";
            try {
                msg=new String(data.message(),"UTF-8");
                LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic());
            } catch (UnsupportedEncodingException e) {
                LOGGER.info("字节数组转字符串异常");
                e.printStackTrace();
            }
            // 实名的事后kafka数据
            Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class);
            LOGGER.info("RealNameKafkaConsumer-map={}", map);
            String userNo = map.get("eppAccountNO");
            LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo);
            String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS");
            // 不是渠道6被邀请用户
            if(!"1".equals(flag)){
                LOGGER.info("不是渠道6拉新用户");
                return;
            }
            // 20-初级认证 30-高级实名认证   40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级
            String authenStatus=map.get("authenStatus");
            // 真实姓名
            String realName=map.get("realName");
            // 身份证号码
            String idNo = map.get("idNO");
            // apptoken
            String appToken=map.get("appToken");
            // 校验任务
            Map<String, String> paramMap = new HashMap<String, String>(4);
            paramMap.put("userNo", userNo);
            paramMap.put("authenStatus",authenStatus);
            paramMap.put("realName",realName);
            paramMap.put("idNo", idNo);
            paramMap.put("appToken",appToken);
            Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap);
            LOGGER.info("resultMap={}", resultMap);
            MDC.remove(STR_INVOKENO);
        }
    }

      3)实现类的bean注入配置

    <bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer">
       <property name="topicThreadMap">
          <map>
             <entry key="${realTopic}" value="5"/>
          </map>
       </property>
       <property name="zkConnect">
          <value>${realZkConnect}</value>
       </property>
    </bean>
    
    
    <bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer">
       <property name="topicThreadMap">
          <map>
             <entry key="${rxdTopic}" value="5"/>
          </map>
       </property>
       <property name="zkConnect">
          <value>${rxdZkConnect}</value>
       </property>
    </bean>

      4)kafka consumer参数配置

    #kafka监听配置
    #实zk
    realZkConnect=xxx
    #topic
    realTopic=xxx
    #任zk
    rxdZkConnect=xxx
    #任性贷topic
    rxdTopic=xxx
    kafka.sessionTimeOut=6000
    kafka.syncTime=2000
    kafka.commitInterval=30000
    kafka.offsetReset=smallest
    kafka.groupId=xxx

      5)依赖包配置

    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.9.2</artifactId>
       <version>0.8.1.1</version>
       <exclusions>
          <exclusion>
             <artifactId>jmxtools</artifactId>
             <groupId>com.sun.jdmk</groupId>
          </exclusion>
          <exclusion>
             <artifactId>jmxri</artifactId>
             <groupId>com.sun.jmx</groupId>
          </exclusion>
       </exclusions>
    </dependency>

    END

  • 相关阅读:
    centos7 安装 supervisor
    远程桌面管理工具
    Delphi cxpagecontrol融合窗体
    Delphi调用网页美化SQL
    合并Dev BPL教程
    delphi 各新版本特性收集
    Delphi控件备份工具
    DBX Error: Driver could not be properly initialized..解决办法
    hugo 中文目录名 在 centos 的问题
    flutter Dialog里ListView的问题
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/12520348.html
Copyright © 2011-2022 走看看