zoukankan      html  css  js  c++  java
  • JAVA封装消息中间件调用二(kafka消费者篇)

      上一遍我简单介绍了kafka的生成者使用,调用方式比较简单,今天我给大家分享下封装kafka消费者,作为中间件,我们做的就是最大程度的解耦,使业务方接入我们依赖程度降到最低。

      第一步,我们先配置一个消费者核心类

      

    package com.meiren.message.kafka.consumer;
    
    import com.meiren.message.kafka.beans.ConsumerProperty;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by zhangwentao on 2017/5/18.
     */
    public class ConsumerHandler {
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;
    
        public ConsumerHandler(ConsumerProperty  consumerProperty, List<String> topics) {
            Properties props = new Properties();
            props.put("bootstrap.servers", consumerProperty.getBrokerList());
            props.put("group.id", consumerProperty.getGroupId());
            props.put("enable.auto.commit", consumerProperty.getEnableAutoCommit());
            props.put("auto.commit.interval.ms", consumerProperty.getAutoCommitInterval());
            props.put("session.timeout.ms", consumerProperty.getSessionTimeout());
            props.put("key.deserializer", consumerProperty.getKeySerializer());
            props.put("value.deserializer", consumerProperty.getValueSerializer());
            consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(topics);
        }
    
        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
            Thread t = new Thread(new Runnable(){//启动一个子线程来监听kafka消息
                public void run(){
           while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            for (final ConsumerRecord record : records) {
                System.out.println("监听到kafka消息。。。。。。");
                executors.submit(new ConsumerWorker(record));
            }
          }
                }});
            t.start();
    
        }
    
        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }
    }

    这个核心类有3个部分组成:1.构造方法(生成一个消费者配置,订阅topic),2.开启多线程监听消费者  3.关闭线程是和消费者

     public ConsumerHandler(ConsumerProperty  consumerProperty, List<String> topics) 
    consumerProperty是消费者配置信息类,包含了消费者的配置属性和topic
    public class ConsumerProperty {
    
        private String brokerList;
    
        private String groupId;
    
        private  String enableAutoCommit="true";
    
        private String autoCommitInterval="1000";
    
        private String sessionTimeout="30000";
    
        private String keySerializer="org.apache.kafka.common.serialization.StringDeserializer";
    
        private String valueSerializer="org.apache.kafka.common.serialization.StringDeserializer";
        /**
         * topic以及消费的实现类
         */
        private List<MessageContainer> messageContainers;

      2.监听消费者信息

     public void execute(int workerNum) { } 这段代码的入参是线程数,开启一个线程池ThreadPoolExecutor,建立一个长连接,每200毫秒去kafka服务器拉取消息,每拉到一个消息,就分配给一个线程类ConsumerWorker去处理这个消息
    这里要特别注意是,监听kafka的过程需要另起一个线程去监听,不然主线程会一直在while(true)里面阻塞掉。

    3.关闭线程池和消费者(一般情况下会一直处于监听状态)
    第二步,我们设置服务启动的时候去监听
    public class PropertyFactory {
    
        public  static ProducerProperty producerProperty;
    
        public  static ConsumerProperty consumerProperty;
    
    
        public ProducerProperty getProducerProperty() {
            return producerProperty;
        }
    
        public void setProducerProperty(ProducerProperty producerProperty) {
            this.producerProperty = producerProperty;
        }
    
        public ConsumerProperty getConsumerProperty() {
            return consumerProperty;
        }
    
        public void setConsumerProperty(ConsumerProperty consumerProperty) {
            this.consumerProperty = consumerProperty;
        }
    
    
        ConsumerHandler consumer=null;
    
        @PostConstruct
        public  void startListerConsumer(){
            consumer= new ConsumerListener(consumerProperty).startListen();
        }
    
        @PreDestroy
        public void shutDown(){
        if(consumer!=null){
            consumer.shutdown();
        }
        }
    }

    这是一个属性工程的bean,当这个bean被创建完成后,会执行startListerConsumer()方法(@PostConstruct的含义就是在bean被创建之后执行)  ,startListerConsumer的作用开启监听

    ConsumerHandler  consumers = new ConsumerHandler( consumerProperty, topics);
            consumers.execute(workerNum);

    另外,我们看到这个beanFactory有2个属性ProducerProperty 和ConsumerProperty ,这个2个分别是消费者个和生产者的配置,是bean在初始化的时候注入进去的

    这里重点介绍一下说ConsumerProperty 的messageContainers属性,它是一个集合对象,包含需要订阅的topic和处理该Topic的实现了MessageListener接口的实现类

    public class MessageContainer {
        private String topic;
    
        private MessageListener messageHandle;
    
     
    }
    public interface MessageListener {
    
        public void onMessage(ConsumerMessageBO message);
    }

    上面说到监听到每个消息都会分配一个
    ConsumerWorker去处理消息,我们看看具ConsumerWorker的
    public class ConsumerWorker implements Runnable {
    
             private ConsumerRecord<String, String> consumerRecord;
    
                public ConsumerWorker(ConsumerRecord record) {
                   this.consumerRecord = record;
               }
    
        public void run() {
            ConsumerMessageBO  consumerMessageBO= JSONObject.parseObject(consumerRecord.value(),ConsumerMessageBO.class);
            consumerMessageBO.setOffset(consumerRecord.offset());
            consumerMessageBO.setPartition(consumerRecord.partition());
            for(MessageContainer messageContainer: PropertyFactory.consumerProperty.getMessageContainers()){
                if(consumerRecord.topic().equals(messageContainer.getTopic())){
                    messageContainer.getMessageHandle().onMessage(consumerMessageBO);
                }
            }
    
        }

    根据监听到topic,然后和ConsumerProperty 的messageContainers属性的topic进行比对,找到对应topic处理的实现类调用其onMessage方法

    我们JAVA的核心代码基本已经写完了

    第三步、业务方接入我们封装的部分

    新建一个spring-kafka.xml文件

    
        <bean id="consumerProperty" class="com.meiren.message.kafka.beans.ConsumerProperty">
            <property name="brokerList" value="${kafka.bootstrap.servers}"/>
            <property name="groupId" value="${kafka.group.id}"/>
            <property name="messageContainers" >
                <list>
                    <ref bean="smsMessageContainer"></ref>
                    <ref bean="emailMessageContainer"></ref>
                </list>
            </property>
        </bean>
        <bean id="producerProperty" class="com.meiren.message.kafka.beans.ProducerProperty">
            <property name="brokerList" value="${kafka.bootstrap.servers}"/>
        </bean>
    
        <bean id ="emailMessageHandler" class="com.meiren.message.kafka.handle.EmailMessageHandler"/>
        <bean id ="smsMessageHandler" class="com.meiren.message.kafka.handle.SmsMessageHandler"/>
    
        <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
            <constructor-arg value="sms_async_send"/>
            <property name="messageHandle" ref="smsMessageHandler"></property>
        </bean>
    <bean id ="emailMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer"> <constructor-arg value="email_async_send"/> <property name="messageHandle" ref="emailMessageHandler"></property> </bean>   
      <!--配置工厂类 -->
     
    <bean class="com.meiren.message.kafka.beans.PropertyFactory">
        <property name="consumerProperty" ref="consumerProperty"/>
         <property name="producerProperty" ref="producerProperty"/>
      </bean>
    </beans>

    这个配置文件对应的就是PropertyFactory的属性,其实就是消费者个和生产者的配置。

    我们配置好这个文件后,我们需要一个消息实现类

    public class SmsMessageHandler  implements MessageListener{
        public  static final Logger log= LoggerFactory.getLogger(SmsMessageHandler.class);
        @Autowired
        private SmsSendLogDao  smsSendLogDao;
        public void onMessage(ConsumerMessageBO consumerMessageBO) {
    
    
                }
    
            }catch (Exception e){
                System.out.println("转换消息异常:"+e.getMessage());
            }
    
        }

    只要实现了MessageListener接口,并且在spring-kafka.xm配置好对应的topic就可以了

     <bean id="smsMessageContainer" class="com.meiren.message.kafka.beans.MessageContainer">
            <constructor-arg value="sms_async_send"/>
            <property name="messageHandle" ref="smsMessageHandler"></property>
        </bean>

    整个接入就完成了,由于这是第一版本,所以封装的程度还不算很好,但是也基本符合应用(一个配置文件,一个实现类),有不足之处将会在后面版本进行完善迭代。

    至此我们已经将kafka集成spring的功能简单实现了,下一篇我将介绍消息队列(kafka)的一些实际应用。

    
    
     
     

      

  • 相关阅读:
    基于visual Studio2013解决面试题之1307二分查找
    基于visual Studio2013解决面试题之1306奇偶位数交换
    基于visual Studio2013解决面试题之1305字符串所有子集
    cocos2d-x游戏开发系列教程-坦克大战游戏之虚拟手柄控制坦克移动
    cocos2d-x游戏开发系列教程-坦克大战游戏之虚拟手柄的显示
    cocos2d-x游戏开发系列教程-坦克大战游戏之坦克的显示
    基于visual Studio2013解决面试题之1204大数组查找
    6.6 判断字符串是不是字母类型的
    6.4 从字符串中删除不需要的字符
    6.3 计算字符在字符串中出现的次数
  • 原文地址:https://www.cnblogs.com/zwt1990/p/6934641.html
Copyright © 2011-2022 走看看