zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    首先本文是学习过程中的一个小demo,不涉及实际的发送短信、邮件的发送逻辑,同时,在文中 RabbitMQ 是基于发布订阅模式。


    所以如下会使用邮件、短信发送的例子,生产者对外发布发送消息的接口,根据调用的参数发送到相应的队列中。


    其实这里面还会存在一些问题,比如事务问题、重复签收问题等等,由于是练手Demo,其他问题留在后面的文章补充。

    文章目录

    1. 生产者1.1 maven依赖1.2 application.yml配置类1.3 交换机绑定队列1.4 生产者投递消息1.5 控制层调用代码2. 消费者2.1 maven依赖2.2 application.yml配置2.3 邮件消费者2.4 短信消费者3. 运行测试

    1. 生产者

    1.1 maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

    1.2 application.yml配置类

    spring:
      rabbitmq:
        ####连接地址
        host: 127.0.0.1
        ####端口号
        port: 5672
        ####账号
        username: guest
        ####密码
        password: guest
        ### 地址
        virtual-host: /

    1.3 交换机绑定队列

    @Component
    public class FanoutConfig {

        /**
         * 邮件队列
         */

        private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";

        /**
         * 短信队列
         */

        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";

        /**
         * 交换机名称
         */

        private String EXCHANGE_NAME = "fanoutExchange";

        /**
         * 1.定义邮件队列
         * @return
         */

        @Bean
        public Queue fanOutEamilQueue() {
            return new Queue(FANOUT_EMAIL_QUEUE);
        }

        /**
         * 1.定义短信队列
         * @return
         */

        @Bean
        public Queue fanOutSmsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }

        /**
         * 2.定义交换机
         * @return
         */

        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_NAME);
        }

        /**
         * 3.队列与交换机绑定邮件队列
         * @param fanOutEamilQueue
         * @param fanoutExchange
         * @return
         */

        @Bean
        Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
        }

        /**
         * 4.队列与交换机绑定短信队列
         * @param fanOutSmsQueue
         * @param fanoutExchange
         * @return
         */

        @Bean
        Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
        }
    }

    FanoutConfig 中执行如下几步任务:

    1. 定义短信队列
    2. 定义交换机
    3. 队列与交换机绑定邮件|短信队列

    1.4 生产者投递消息

    @Component
    public class FanoutProducer {

        @Autowired
        private AmqpTemplate amqpTemplate;

        /**
         * 发送消息
         * @param queueName 队列名称
         */

        public void send(String queueName) {
            String msg = "my_fanout_msg:" + new Date();
            System.out.println(msg + ":" + msg);
            amqpTemplate.convertAndSend(queueName, msg);
        }
    }

    1.5 控制层调用代码

    @RestController
    public class ProducerController {

        @Autowired
        private FanoutProducer fanoutProducer;

        @RequestMapping("/sendFanout")
        public String sendFanout(String queueName) {
            fanoutProducer.send(queueName);
            return "success";
        }
    }

    2. 消费者

    2.1 maven依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <dependencies>

        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>

    </dependencies>

    2.2 application.yml配置

    spring:
      rabbitmq:
      ####连接地址
        host: 127.0.0.1
       ####端口号   
        port: 5672
       ####账号 
        username: guest
       ####密码  
        password: guest
       ### 地址
        virtual-host: /

    server:
      port: 8081

    2.3 邮件消费者

    @Component
    @RabbitListener(queues = "fanout_eamil_queue")
    public class FanoutEamilConsumer {

        @RabbitHandler
        public void process(String msg) throws Exception {
            System.out.println("邮件消费者获取生产者消息msg:" + msg);
        }
    }

    2.4 短信消费者

    @Component
    @RabbitListener(queues = "fanout_sms_queue")
    public class FanoutSmsConsumer {

        @RabbitHandler
        public void process(String msg) {
            System.out.println("短信消费者获取生产者消息msg:" + msg);
        }
    }

    3. 运行测试

    访问时才会创建交换机以及队列,并非项目启动就会创建

    生产者访问链接(地址+发送的队列名称):
    http://localhost:8080/sendFanout?queueName=fanout_sms_queue

    消费者启动:

    案例代码: https://www.lanzous.com/i5zkf7c

    我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

  • 相关阅读:
    Node.js的安装与配置【转载】
    The Tomcat connector configured to listen on port 8080 failed to start. The port may already be in use or the connector【端口号被占用】
    MVC登陆认证简单设置
    Winform无边框窗体拖动
    Winform截图小程序
    C#Winform实时更新数据库信息Demo(使用Scoket)
    记DateTime.Now.ToString()遇到的一个坑
    T4模板的一些配置(从EF数据更新)
    设计模式之策略模式
    打造高效的研发组织架构:高效研发流程那些事(一)——读《技术领导力实战笔记》摘要
  • 原文地址:https://www.cnblogs.com/niceyoo/p/11460701.html
Copyright © 2011-2022 走看看