zoukankan      html  css  js  c++  java
  • SpringBoot整合RocketMQ

    上篇博客讲解了服务器集群部署RocketMQ 博客地址:RocketMQ(2)---Docker部署RocketMQ集群

    这篇在上篇搭建好的基础上,将SpringBoot整合RocketMQ实现生产消费。

    GitHub地址https://github.com/yudiandemingzi/spring-boot-study

    一、搭建步骤

    先说下技术大致架构

    SpringBoot2.1.6 + Maven3.5.4 + rocketmq4.3.0 + JDK1.8 +Lombok(插件)

    1、添加rocketmq包

         <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.0</version>
            </dependency>

    2、JmsConfig(配置类)

    连接RocketMQ服务器配置类,这里为了方便直接写成常量。

    /**
     * @Description: 安装实际开发这里的信息 都是应该写在配置里,来读取,这里为了方便所以写成常量
     */
    public class JmsConfig {
        /**
         * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
         */
        public static final String NAME_SERVER = "127.12.15.6:9876;127.12.15.6:9877";
        /**
         * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
         */
        public static final String TOPIC = "topic_family";
    
    }

    3、Producer (生产者)

    @Slf4j
    @Component
    public class Producer {
        private String producerGroup = "test_producer";
        private DefaultMQProducer producer;
        
        public Producer(){
            //示例生产者
            producer = new DefaultMQProducer(producerGroup);
            //不开启vip通道 开通口端口会减2
            producer.setVipChannelEnabled(false);
            //绑定name server
            producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            start();
        }
        /**
         * 对象在使用之前必须要调用一次,只能初始化一次
         */
        public void start(){
            try {
                this.producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
      
        public DefaultMQProducer getProducer(){
            return this.producer;
        }
        /**
         * 一般在应用上下文,使用上下文监听器,进行关闭
         */
        public void shutdown(){
            this.producer.shutdown();
        }
    }

    4、Consumer (消费者)

    @Slf4j
    @Component
    public class Consumer {
    
        /**
         * 消费者实体对象
         */
        private DefaultMQPushConsumer consumer;
        /**
         * 消费者组
         */
        public static final String CONSUMER_GROUP = "test_consumer";
        /**
         * 通过构造函数 实例化对象
         */
        public Consumer() throws MQClientException {
    
            consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
            consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
            //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            //订阅主题和 标签( * 代表所有标签)下信息
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                // msgs中只收集同一个topic,同一个tag,并且key相同的message
                // 会把不同的消息分别放置到不同的队列中
                try {
                    for (Message msg : msgs) {
    
                        //消费者获取消息 这里只输出 不做后面逻辑处理
                        String body = new String(msg.getBody(), "utf-8");
                        log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
    
            consumer.start();
            System.out.println("消费者 启动成功=======");
        }
    }

    大致就是这边简单,下面就是测试。

    二、测试

    先写个测试接口进行测试。

    1、Controller

    @Slf4j
    @RestController
    public class Controller {
    
        @Autowired
        private Producer producer;
    
        private List<String> mesList;
    
        /**
         * 初始化消息
         */
        public Controller() {
            mesList = new ArrayList<>();
            mesList.add("小小");
            mesList.add("爸爸");
            mesList.add("妈妈");
            mesList.add("爷爷");
            mesList.add("奶奶");
    
        }
    
        @RequestMapping("/text/rocketmq")
        public Object callback() throws Exception {
            //总共发送五次消息
            for (String s : mesList) {
                //创建生产信息
                Message message = new Message(JmsConfig.TOPIC, "testtag", ("小小一家人的称谓:" + s).getBytes());
                //发送
                SendResult sendResult = producer.getProducer().send(message);
                log.info("输出生产者信息={}",sendResult);
            }
            return "成功";
        } 
    }

    2、测试结果

    很明显生产发送消息已经成功,二消费者也成功接收了消息!

    另外我们再来看下RocketMQ控制台是否也有消费记录

    很明显在控制台这边也会有消费记录!

    总结这边只是简单的整合,后面会通过RocketMQ实现分布式事务,可以用于线上实际环境中,到时候会深入讲解下源码。

    转载于:https://www.cnblogs.com/qdhxhz/p/11109696.html

  • 相关阅读:
    LeetCode对撞指针汇总
    167. Two Sum II
    215. Kth Largest Element in an Array
    2018Action Recognition from Skeleton Data via Analogical Generalization over Qualitative Representations
    题解 Educational Codeforces Round 84 (Rated for Div. 2) (CF1327)
    题解 JZPKIL
    题解 八省联考2018 / 九省联考2018
    题解 六省联考2017
    题解 Codeforces Round #621 (Div. 1 + Div. 2) (CF1307)
    题解Codeforces Round #620 (Div. 2)
  • 原文地址:https://www.cnblogs.com/it-deepinmind/p/12448016.html
Copyright © 2011-2022 走看看