zoukankan      html  css  js  c++  java
  • springboot整合RocketMq(非事务)

    1、配置文件

    1、yml配置文件

    rocketmq: #mq配置
      producer:
        iseffect: true
        type: default # (transaction,default) transaction:TransactionMQProducer; default:DefaultMQProducer
        groupName: testzjlGroup
        topicName: test_topic
        namesrvAddr: 192.168.244.128:9876
      consumer:
        iseffect: true
        type: default # (transaction,default) transaction:TransactionMQProducer; default:DefaultMQProducer
        groupName: testzjlGroup
        topicName: test_topic
        namesrvAddr: 192.168.244.128:9876

    2、对应的java类

    package com.gofun.customer.mq;
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    
    @ConfigurationProperties(prefix = "rocketmq.producer")
    public class RocketMqProducerProperties {
    
        private Boolean iseffect;
        private String type;
        private String groupName;
        private String topicName;
        private String namesrvAddr;
    
        public Boolean getIseffect() {
            return iseffect;
        }
    
        public void setIseffect(Boolean iseffect) {
            this.iseffect = iseffect;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getGroupName() {
            return groupName;
        }
    
        public void setGroupName(String groupName) {
            this.groupName = groupName;
        }
    
        public String getTopicName() {
            return topicName;
        }
    
        public void setTopicName(String topicName) {
            this.topicName = topicName;
        }
    
        public String getNamesrvAddr() {
            return namesrvAddr;
        }
    
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    }
    
    View Code

    2、DefaultMQProducer生产者

    1、生产者配置

    package com.gofun.customer.mq;
    
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @EnableConfigurationProperties(RocketMqProducerProperties.class)
    @ConditionalOnProperty(prefix = "rocketmq.producer", name = "iseffect", havingValue = "true")
    public class RocketMqConfig {
        @Autowired
        private RocketMqProducerProperties rocketMqProperties;
    
        @Bean
        @ConditionalOnProperty(prefix = "rocketmq.producer", name = "type", havingValue = "default")
        public DefaultMQProducer defaultMQProducer() throws MQClientException {
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer(rocketMqProperties.getGroupName());
            defaultMQProducer.setNamesrvAddr(rocketMqProperties.getNamesrvAddr());
            defaultMQProducer.setVipChannelEnabled(false);
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(10);
            defaultMQProducer.start();
            return defaultMQProducer;
        }
    
    }
    
    1. 启动应用创建生产者
    2. 使用配置文件为RocketMqProducerProperties类
    3. 配置文件iseffect = true时生效该配置
    4. 配置文件 type=default 时生效 DefaultMQProducer 生产者

    2、生产者工具类

    package com.gofun.customer.mq;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RocketMqProducer {
        @Autowired
        private DefaultMQProducer defaultMQProducer;
        @Autowired
        private RocketMqProducerProperties rocketMqProperties;
    
        public SendResult send(String topic, String tag, Object obj) {
            String body = null;
            if(obj == null){
                return null;
            }
            if(obj instanceof String){
                body = (String) obj;
            }else{
                body = JSON.toJSONString(obj);
            }
            Message msg = new Message(topic, tag, body.getBytes());
            SendResult sendResult = null;
            try {
                sendResult = defaultMQProducer.send(msg);
            } catch (Exception e) {
            }
            return sendResult;
        }
    
        /**
         * 延时消息
         * @param topic
         * @param tag
         * @param obj
         * @param delayTimeLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m  7m  8m  9m  10m  20m 30m  1h  2h
         *                       1  2  3   4   5  6  7  8  9  10  11  12  13  14   15  16  17  18
         * @return
         */
        public SendResult send(String topic, String tag, Object obj,int delayTimeLevel) {
            String body = null;
            if(obj == null){
                return null;
            }
            if(obj instanceof String){
                body = (String) obj;
            }else{
                body = JSON.toJSONString(obj);
            }
            Message msg = new Message(topic, tag, body.getBytes());
            msg.setDelayTimeLevel(delayTimeLevel);
            SendResult sendResult = null;
            try {
                sendResult = defaultMQProducer.send(msg);
            } catch (Exception e) {
            }
            return sendResult;
        }
    
        public SendResult send(String tag, Object obj) {
            return send(rocketMqProperties.getTopicName(), tag, obj);
        }
    
    }
    
    View Code

    3、生产者发送消息

    1、引入工具类调用 发送方法

    @Autowired
    private RocketMqProducer rocketMqProducer;
    rocketMqProducer.send("testTag","测试mq发送消息。。。。");
    

    4、问题

    注意:
    1. mq设置autoCreateTopicEnable=false时需要手动创建topic
    2. 启动后后台一直报打印日志如:closeChannel: close the connection to remote address[] result: true  说明namesrvAddr 的ip 或 端口号配置错误
    3. 启动后后台一直报打印日志如:closeChannel: close the connection to remote address[192.168.244.128:9876] result: true  可能是jar包错误
    4. 要引入fastjson的jar包

    3、消费者

    1、DefaultMQPullConsumer 消费者

    private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
    
        public void testDefaultMQPullConsumer() throws MQClientException {
            DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rocketMqConsumerProperties.getGroupName());
            consumer.setNamesrvAddr("192.168.244.128:9876;192.168.244.130:9876");
            consumer.start();
            // 从指定topic中拉取所有消息队列
            Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order-topic");
            for(MessageQueue mq:mqs){
                try {
                    // 获取消息的offset,指定从store中获取
                    long offset = consumer.fetchConsumeOffset(mq,true);
                    System.out.println("consumer from the queue:"+mq+":"+offset);
                    while(true){
                        PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                        putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
                        switch(pullResult.getPullStatus()){
                            case FOUND:
                                List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                                for (MessageExt m : messageExtList) {
                                    System.out.println(new String(m.getBody()));
                                }
                                break;
                            case NO_MATCHED_MSG:
                                break;
                            case NO_NEW_MSG:
                                break;
                            case OFFSET_ILLEGAL:
                                break;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            consumer.shutdown();
        }
    
        // 保存上次消费的消息下标
        private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) {
            OFFSE_TABLE.put(mq, nextBeginOffset);
        }
    
        // 获取上次消费的消息的下标
        private static Long getMessageQueueOffset(MessageQueue mq) {
            Long offset = OFFSE_TABLE.get(mq);
            if(offset != null){
                return offset;
            }
            return 0l;
        }

    2、DefaultMQPushConsumer 消费者模板方法

    RocketMqConsumerProperties 配置类 参见 消费者配置类。

    package com.gofun.customer.mq;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import java.util.List;
    
    public abstract class RocketMqDefaultConsumer {
        @Autowired
        private RocketMqConsumerProperties rocketMqConsumerProperties;
    
        public void listener(String topic, String tag) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqConsumerProperties.getGroupName());
            consumer.setNamesrvAddr(rocketMqConsumerProperties.getNamesrvAddr());
            consumer.subscribe(topic, tag);
            // 开启内部类实现监听
    //        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> RocketMqDefaultConsumer.this.dealBody(list));
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    return RocketMqDefaultConsumer.this.dealBody(list);
                }
            });
            consumer.start();
        }
    
        // 处理body的业务
        public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
    }
    
    1. 消费者需要继承此抽象类
    2. 传入topic 和 tag,添加监听, 开启消费者
    3. 定义抽象函数处理消息,实现类需要自已实现处理函数内容

    3、DefaultMQPushConsumer 消费者实现

    package com.gofun.customer.controller.test;
    
    import com.gofun.customer.mq.RocketMqDefaultConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    
    import java.util.List;
    
    @Component
    public class TestConsumer extends RocketMqDefaultConsumer implements InitializingBean {
        private final String testtopic = "test_topic";
        private final String testflag = "testTag";
    
        @Override
        public void afterPropertiesSet() throws Exception {
            super.listener(testtopic, testflag);
        }
    
        @Override
        public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) {
            if (!CollectionUtils.isEmpty(msgs)) {
                for (MessageExt msg : msgs) {
                    byte[] body = msg.getBody();
                    String s = new String(body);
                    System.out.println("这是rocketmq消费者消费的消息:"+s);
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    
    }
    
    1. 继承模板类定义自己的topic和tag
    2. 实现处理消息的方法
    3. 实现接口InitializingBean ,加载完成自动调用 afterPropertiesSet方法,父类模板添加监听

    3、消费模式

    在RocketMQ中,可以理解为没有ActiveMQ的createQueue()和createTopic()的用法,也就是并没有P2P和Pub/Sub类似的概念。RocketMQ不遵循JMS规范,而是使用了一套自定义的机制。可以理解为RocketMQ都是基于Pub/Sub发布订阅模式的,

    在此基础上提供了集群消息和广播消息两种消息模式,可通过消费端方法consumer.setMessageModel()进行设置。

    集群消息——MessageModel.CLUSTERING

    这方方式可以实现类似ActiveMQ负载均衡客户端的功能,同一个ConsumerGroup下的所有Consumer已负载均衡的方式消费消息。比较特殊的是,这种方式可以支持生产端先发送消息到Broker,消费端再订阅主题进行消费,比较灵活。RocketMQ默认为该模式。

    广播消息——MessageModel.BROADCASTING

    在这种模式下,生产端发送到Topic下的消息,会被订阅了该Topic的所有Consumer消费,即使它们处于同一个ConsumerGroup。

    原文链接:https://blog.csdn.net/weixin_34452850/article/details/82754419

    3、顺序消息

    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }, orderId);

    参考https://www.jianshu.com/p/5260a2739d80

  • 相关阅读:
    day06-for循环补充,可变与不可变类型,数字,字符串和列表的使用
    day05-while循环和for循环的使用
    day04-运算符,流程控制之if和input()用户交互功能
    day03-变量,基本数据类型,基本运算符
    day02-python和计算机介绍2
    day01-python和计算机介绍1
    仓库
    四则运算
    异常处理
    动手动脑3
  • 原文地址:https://www.cnblogs.com/happydreamzjl/p/12022412.html
Copyright © 2011-2022 走看看