zoukankan      html  css  js  c++  java
  • 监听kafka消息

    1、main方法中(1.0以上)

    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * Kafka消息消费者
     * 〈功能详细描述〉
     *
     * @author 17090889
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    public class ConsumerSample {
        public static void main(String[] args) {
            String topic = "test-topic";
            Properties props = new Properties();
            // Kafka集群,多台服务器地址之间用逗号隔开
            props.put("bootstrap.servers", "localhost:9092");
            // 消费组ID
            props.put("group.id", "test_group1");
            // Consumer的offset是否自动提交
            props.put("enable.auto.commit", "true");
            // 自动提交offset到zk的时间间隔,时间单位是毫秒
            props.put("auto.commit.interval.ms", "1000");
            // 消息的反序列化类型
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            // 订阅的话题
            consumer.subscribe(Arrays.asList(topic));
            // Consumer调用poll方法来轮询Kafka集群的消息,一直等到Kafka集群中没有消息或者达到超时时间100ms为止
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    System.out.println(record.partition() + record.offset());
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
            }
        }
    }

    2、Spring下kafka1.0以上版本(不依赖Spring-Kafka)

    3、Spring下kafka 0.8版本

      1)kafka消费者抽象工厂类

    /**
     * kafka消费者抽象工厂类
     * 〈功能详细描述〉
     *
     * @author
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Value;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public abstract class BaseKafkaConsumerFactory implements InitializingBean, DisposableBean {
    
        private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerFactory.class);
    
        /**
         * 消费的Topic与消费线程数组成的Map
         */
        private Map<String, Integer> topicThreadMap;
        /**
         * Consumer实例所需的配置
         */
        private Properties properties;
    
        /**
         * 线程池
         */
        private ThreadPoolExecutor taskExecutor;
    
        private ConsumerConnector consumerConnector;
    
        /**
         * zkConnect
         */
        private String zkConnect;
    
        @Value("${kafka.groupId}")
        private String groupId;
    
        /**
         * sessionTimeOut
         */
        @Value("${kafka.sessionTimeOut}")
        private String sessionTimeOut;
    
        /**
         * syncTime
         */
        @Value("${kafka.syncTime}")
        private String syncTime;
    
        /**
         * commitInterval
         */
        @Value("${kafka.commitInterval}")
        private String commitInterval;
    
        /**
         * offsetReset
         */
        @Value("${kafka.offsetReset}")
        private String offsetReset;
    
    
        @Override
        public void afterPropertiesSet() {
            logger.info("afterPropertiesSet-start");
            // 初始化properties
            if(properties==null){
                properties = new Properties();
                properties.put("zookeeper.connect", zkConnect);
                logger.info("zkConnect={}", zkConnect);
                // group 代表一个消费组
                properties.put("group.id", groupId);
                logger.info("groupId={}", groupId);
                // zk连接超时
                properties.put("zookeeper.session.timeout.ms", sessionTimeOut);
                properties.put("zookeeper.sync.time.ms", syncTime);
                properties.put("auto.commit.interval.ms", commitInterval);
                properties.put("auto.offset.reset", offsetReset);
                // 序列化类
                properties.put("serializer.class", "kafka.serializer.StringEncoder");
    
                properties.put("rebalance.max.retries", "10");
                // 当rebalance发生时,两个相邻retry操作之间需要间隔的时间。
                properties.put("rebalance.backoff.ms", "3100");
            }
    
            ConsumerConfig consumerConfig = new ConsumerConfig(properties);
            consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(topicThreadMap);
            // 实际有多少个stream,就设置多少个线程处理
    //        int messageProcessThreadNum = 0;
    //        for (List<KafkaStream<byte[], byte[]>> streamList : topicMessageStreams.values()) {
    //            messageProcessThreadNum = messageProcessThreadNum + streamList.size();
    //        }
            // 创建实际处理消息的线程池
            taskExecutor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));
            for (List<KafkaStream<byte[], byte[]>> streams : topicMessageStreams.values()) {
                for (final KafkaStream<byte[], byte[]> stream : streams) {
                    taskExecutor.submit(new Runnable() {
                        @Override
                        public void run() {
                            ConsumerIterator<byte[], byte[]> it = stream.iterator();
                            while (it.hasNext()) {
                                MessageAndMetadata<byte[], byte[]> data = it.next();
                                try {
                                    String kafkaMsg = new String(data.message(),"UTF-8");
                                    logger.info("来自topic:{}的消息:{}", topicThreadMap.keySet(), kafkaMsg);
                                    // 消息处理
                                    onMessage(data);
                                } catch (RuntimeException e) {
                                    logger.error("处理消息异常.", e);
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
    
                            }
                        }
    
                    });
                }
            }
    
        }
    
        /**
         * 消息处理类
         * @param data
         */
        protected abstract void onMessage(MessageAndMetadata<byte[], byte[]> data);
    
        @Override
        public void destroy() throws Exception {
            try {
                if (consumerConnector != null) {
                    consumerConnector.shutdown();
                }
            } catch (Exception e) {
                logger.warn("shutdown consumer failed", e);
            }
            try {
                if (taskExecutor != null) {
                    taskExecutor.shutdown();
                }
            } catch (Exception e) {
                logger.warn("shutdown messageProcessExecutor failed", e);
            }
            logger.info("shutdown consumer successfully");
        }
    
        public Properties getProperties() {
            return properties;
        }
    
        public void setProperties(Properties properties) {
            this.properties = properties;
        }
    
        public Map<String, Integer> getTopicThreadMap() {
            return topicThreadMap;
        }
    
        public void setTopicThreadMap(Map<String, Integer> topicThreadMap) {
            this.topicThreadMap = topicThreadMap;
        }
    
        public String getZkConnect() {
            return zkConnect;
        }
    
        public void setZkConnect(String zkConnect) {
            this.zkConnect = zkConnect;
        }
    }

      2)具体的kafka消费者实现类

    import com.xxx.sfmms.common.util.JsonConvertUtil;
    import com.xxx.sfmms.common.util.RedisUtil;
    import com.xxx.sfmms.common.util.StringUtil;
    import com.xxx.sfmms.service.intf.RecommendService;
    import kafka.message.MessageAndMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.slf4j.MDC;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 实名kafka消费者
     * 〈功能详细描述〉
     *
     * @author 17090889
     * @see [相关类/方法](可选)
     * @since [产品/模块版本] (可选)
     */
    public class RealNameKafkaConsumer extends BaseKafkaConsumerFactory {
    
        private final Logger LOGGER = LoggerFactory.getLogger(RealNameKafkaConsumer.class);
    
        private static final String STR_INVOKENO = "invokeNo";
    
        @Autowired
        private RecommendService recommendService;
    
    
        /**
         * 消息处理
         * @param data
         */
        @Override
        protected void onMessage(MessageAndMetadata<byte[], byte[]> data) {
            MDC.put(STR_INVOKENO, StringUtil.getUuid());
            String msg="";
            try {
                msg=new String(data.message(),"UTF-8");
                LOGGER.info("RealNameKafkaConsumer-data={},topic={}",msg,data.topic());
            } catch (UnsupportedEncodingException e) {
                LOGGER.info("字节数组转字符串异常");
                e.printStackTrace();
            }
            // 实名的事后kafka数据
            Map<String, String> map = JsonConvertUtil.json2pojo(msg, Map.class);
            LOGGER.info("RealNameKafkaConsumer-map={}", map);
            String userNo = map.get("eppAccountNO");
            LOGGER.info("RealNameKafkaConsumer-userNo={}", userNo);
            String flag = RedisUtil.getString("PULLNEW:RACCOUNTNO_" + userNo, "MEIS");
            // 不是渠道6被邀请用户
            if(!"1".equals(flag)){
                LOGGER.info("不是渠道6拉新用户");
                return;
            }
            // 20-初级认证 30-高级实名认证   40- 实名申诉降级、50-高级到期降级 60-实名撤销(人工手动降级) 70-申诉找回身份降级
            String authenStatus=map.get("authenStatus");
            // 真实姓名
            String realName=map.get("realName");
            // 身份证号码
            String idNo = map.get("idNO");
            // apptoken
            String appToken=map.get("appToken");
            // 校验任务
            Map<String, String> paramMap = new HashMap<String, String>(4);
            paramMap.put("userNo", userNo);
            paramMap.put("authenStatus",authenStatus);
            paramMap.put("realName",realName);
            paramMap.put("idNo", idNo);
            paramMap.put("appToken",appToken);
            Map<String,String> resultMap=recommendService.checkRulesAndRiskSendMoney(paramMap);
            LOGGER.info("resultMap={}", resultMap);
            MDC.remove(STR_INVOKENO);
        }
    }

      3)实现类的bean注入配置

    <bean id="realNameKafkaConsumer" class="com.xxx.sfmms.service.RealNameKafkaConsumer">
       <property name="topicThreadMap">
          <map>
             <entry key="${realTopic}" value="5"/>
          </map>
       </property>
       <property name="zkConnect">
          <value>${realZkConnect}</value>
       </property>
    </bean>
    
    
    <bean id="preCreditKafkaConsumer" class="com.xxx.sfmms.service.PreCreditKafkaConsumer">
       <property name="topicThreadMap">
          <map>
             <entry key="${rxdTopic}" value="5"/>
          </map>
       </property>
       <property name="zkConnect">
          <value>${rxdZkConnect}</value>
       </property>
    </bean>

      4)kafka consumer参数配置

    #kafka监听配置
    #实zk
    realZkConnect=xxx
    #topic
    realTopic=xxx
    #任zk
    rxdZkConnect=xxx
    #任性贷topic
    rxdTopic=xxx
    kafka.sessionTimeOut=6000
    kafka.syncTime=2000
    kafka.commitInterval=30000
    kafka.offsetReset=smallest
    kafka.groupId=xxx

      5)依赖包配置

    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.9.2</artifactId>
       <version>0.8.1.1</version>
       <exclusions>
          <exclusion>
             <artifactId>jmxtools</artifactId>
             <groupId>com.sun.jdmk</groupId>
          </exclusion>
          <exclusion>
             <artifactId>jmxri</artifactId>
             <groupId>com.sun.jmx</groupId>
          </exclusion>
       </exclusions>
    </dependency>

    END

  • 相关阅读:
    归并排序(Merge Sort)
    AtCoder AGC035D Add and Remove (状压DP)
    AtCoder AGC034D Manhattan Max Matching (费用流)
    AtCoder AGC033F Adding Edges (图论)
    AtCoder AGC031F Walk on Graph (图论、数论)
    AtCoder AGC031E Snuke the Phantom Thief (费用流)
    AtCoder AGC029F Construction of a Tree (二分图匹配)
    AtCoder AGC029E Wandering TKHS
    AtCoder AGC039F Min Product Sum (容斥原理、组合计数、DP)
    AtCoder AGC035E Develop (DP、图论、计数)
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/12520348.html
Copyright © 2011-2022 走看看