zoukankan      html  css  js  c++  java
  • SpringBoot+Rocketmq

    @PostConstruct:用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化。此方法必须在将类放入服务之前调用。
    @PreDestroy:在开发中我们如果要在关闭spring容器后释放一些资源,就在这个类中写一个被@PreDestroy的方法。今天就因为这个浪费了好长时间,mq的生产者启动之后没有被销毁,导致我用idea结束程序之后端口号依然被占用,每次再启动都要杀进程。。。

    先把最简单的代码贴出来,只有最基本的发送接收功能

    pom.xml
    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>4.3.0</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.3.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
        </dependencies>

    application.properties

    apache.rocketmq.consumer.PushConsumer=PushConsumer
    apache.rocketmq.producer.producerGroup=Producer
    apache.rocketmq.namesrvAddr=localhost:9876

    TestController.java

    package com.rmqspringtest.demo;
    
    import com.rmqspringtest.demo.producer.ProducerService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TestController {
        @Autowired
        private ProducerService producer;
    
        @RequestMapping("/push")
        public String pushMsg(String msg) {
            return producer.send("test1", "push", msg);
        }
    }

    ConsumerService.java

    package com.rmqspringtest.demo.consumer;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    @Component
    public class ConsumerService {
        @Value("${apache.rocketmq.consumer.PushConsumer}")
        private String consumerGroup;
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        @PostConstruct
        public void defaultMQPushConsumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(namesrvAddr);
            try {
                consumer.subscribe("test1", "push");
    
                // 如果是第一次启动,从队列头部开始消费
                // 如果不是第一次启动,从上次消费的位置继续消费
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
                consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                    try {
                        for (MessageExt messageExt : list) {
                            String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            System.out.println("[Consumer] msgID(" + messageExt.getMsgId() + ") msgBody : " + messageBody);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
                System.out.println("[Consumer 已启动]");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    ProducerService.java

    package com.rmqspringtest.demo.producer;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ProducerService {
        @Value("${apache.rocketmq.producer.producerGroup}")
        private String producerGroup;
    
        @Value("${apache.rocketmq.namesrvAddr}")
        private String namesrvAddr;
    
        private DefaultMQProducer producer;
    
        @PostConstruct
        public void initProducer() {
            producer = new DefaultMQProducer(producerGroup);
            producer.setNamesrvAddr(namesrvAddr);
            producer.setRetryTimesWhenSendFailed(3);
            try {
                producer.start();
                System.out.println("[Producer 已启动]");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public String send(String topic, String tags, String msg) {
            SendResult result = null;
            try {
                Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                result = producer.send(message);
                System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "{"MsgId":"" + result.getMsgId() + ""}";
        }
    
        @PreDestroy
        public void shutDownProducer() {
            if (producer != null) {
                producer.shutdown();
            }
        }
    }

    cmd中执行命令开启服务:

    start mqnamesrv

    start mqbroker -n 127.0.0.1:9876

    发送请求:127.0.0.1:8080/push?msg=hello

    ok

    RocketMQTemplate 这玩意 看一下
  • 相关阅读:
    什么叫精通C++
    编程好书推荐
    Reading Notes ofC Traps and Pitfalls
    继承的小问题
    关于strcpy函数
    #pragma once 与 #ifndef 的区别解析
    模板类的友元重载函数
    NET开发人员必知的八个网站
    获取MDI窗体的实例
    .Net下收发邮件第三方公共库
  • 原文地址:https://www.cnblogs.com/gaoquanquan/p/10844320.html
Copyright © 2011-2022 走看看