zoukankan      html  css  js  c++  java
  • 【转】 springboot+rabbitmq整合示例程

    【转】 springboot+rabbitmq整合示例程

    关于什么是rabbitmq,请看另一篇文:

    http://www.cnblogs.com/boshen-hzb/p/6840064.html

    一、新建maven工程:springboot-rabbitmq

    二、引入springboot和rabbitmq的依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.springboot.rabbitmq</groupId>
      <artifactId>springboot-rabbitmq</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>springboot-rabbitmq</name>
      <description>springboot-rabbitmq</description>
      
      <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.4.1.RELEASE</version>
      </parent>
      <dependencies>
         <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <dependency>  
                <groupId>org.springframework.boot</groupId>  
                <artifactId>spring-boot-starter-test</artifactId>  
                <scope>test</scope>  
         </dependency>  
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
      </dependencies>
    </project>
    spring-boot-starter-test是为了后面写测试类用,
    spring-boot-starter-amqp才是真正的使用rabbitmq的依赖

    三、在src/main/resources里面新增application.properties
    该配置文件主要是对rabbimq的配置信息
    
    

    spring.application.name=springboot-rabbitmq
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.virtual-host=/

     

    四、新建springboot主类Application

    该类初始化创建队列、转发器,并把队列绑定到转发器

    package com.rabbit;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class Application {
        final static String queueName = "hello";
    
        @Bean
        public Queue helloQueue() {
            return new Queue("hello");
        }
        
        @Bean
        public Queue userQueue() {
            return new Queue("user");
        }
        
        //===============以下是验证topic Exchange的队列==========
        @Bean
        public Queue queueMessage() {
            return new Queue("topic.message");
        }
    
        @Bean
        public Queue queueMessages() {
            return new Queue("topic.messages");
        }
      //===============以上是验证topic Exchange的队列==========
        
        
        //===============以下是验证Fanout Exchange的队列==========
        @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
    
        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
    
        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
        //===============以上是验证Fanout Exchange的队列==========
        
    
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        /**
         * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
         * @param queueMessage
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
    
        /**
         * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
         * @param queueMessage
         * @param exchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
        }
        
        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
        
        
       
        public static void main(String[] args) throws Exception {
            SpringApplication.run(Application.class, args);
        }
    }

    五、各种情景实现

    1、最简单的hello生产和消费实现(单生产者和单消费者)

    生产者:

    package com.rabbit.hello;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloSender1 {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String sendMsg = "hello1 " + new Date();
            System.out.println("Sender1 : " + sendMsg);
            this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
        }
    
    }

    消费者:

    package com.rabbit.hello;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloQueue")
    public class HelloReceiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    
    }

    controller:

    package com.rabbit.controller;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.rabbit.hello.HelloSender1;
    
    @RestController
    @RequestMapping("/rabbit")
    public class RabbitTest {
        
        @Autowired
        private HelloSender1 helloSender1;
        @Autowired
        private HelloSender1 helloSender2;
        
        @PostMapping("/hello")
        public void hello() {
            helloSender1.send();
        }
    }

    启动程序,执行:

    结果如下:

    Sender1 : hello1 Thu May 11 17:23:31 CST 2017
    Receiver2  : hello1 Thu May 11 17:23:31 CST 2017

    2、单生产者-多消费者

    生产者:

    package com.rabbit.hello;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloSender1 {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(String msg) {
            String sendMsg = msg + new Date();
            System.out.println("Sender1 : " + sendMsg);
            this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
        }
    
    }

    消费者1:

    package com.rabbit.hello;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloQueue")
    public class HelloReceiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    
    }

    消费者2:

    package com.rabbit.hello;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloQueue")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    
    }

    controller:

    package com.rabbit.controller;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.rabbit.hello.HelloSender1;
    
    @RestController
    @RequestMapping("/rabbit")
    public class RabbitTest {
        
        @Autowired
        private HelloSender1 helloSender1;
        @Autowired
        private HelloSender1 helloSender2;
        
        @PostMapping("/hello")
        public void hello() {
            helloSender1.send("hello1");
        }
        
        /**
         * 单生产者-多消费者
         */
        @PostMapping("/oneToMany")
        public void oneToMany() {
            for(int i=0;i<10;i++){
                helloSender1.send("hellomsg:"+i);
            }
            
        }
    }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/oneToMany

    结果如下:

    Sender1 : hellomsg:0Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:1Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:2Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:3Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:4Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:5Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:6Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:7Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:8Thu May 11 17:37:59 CST 2017
    Sender1 : hellomsg:9Thu May 11 17:37:59 CST 2017
    Receiver2  : hellomsg:1Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:0Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:3Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:4Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:5Thu May 11 17:37:59 CST 2017
    Receiver2  : hellomsg:2Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:6Thu May 11 17:37:59 CST 2017
    Receiver2  : hellomsg:7Thu May 11 17:37:59 CST 2017
    Receiver2  : hellomsg:8Thu May 11 17:37:59 CST 2017
    Receiver1  : hellomsg:9Thu May 11 17:37:59 CST 2017

    从以上结果可知,生产者发送的10条消息,分别被两个消费者接收了

    3、多生产者-多消费者

    生产者1:

    package com.rabbit.hello;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloSender1 {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(String msg) {
            String sendMsg = msg + new Date();
            System.out.println("Sender1 : " + sendMsg);
            this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
        }
    
    }

    生产者2:

    package com.rabbit.hello;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class HelloSender2 {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(String msg) {
            String sendMsg = msg + new Date();
            System.out.println("Sender2 : " + sendMsg);
            this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
        }
    
    }

    消费者1:

    package com.rabbit.hello;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloQueue")
    public class HelloReceiver1 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver1  : " + hello);
        }
    
    }

    消费者2:

    package com.rabbit.hello;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "helloQueue")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver2  : " + hello);
        }
    
    }

    controller:

        /**
         * 多生产者-多消费者
         */
        @PostMapping("/manyToMany")
        public void manyToMany() {
            for(int i=0;i<10;i++){
                helloSender1.send("hellomsg:"+i);
                helloSender2.send("hellomsg:"+i);
            }
            
        }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/manyToMany

    结果如下:

    Sender1 : hellomsg:0Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:0Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:1Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:1Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:2Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:2Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:3Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:3Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:4Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:4Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:5Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:5Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:6Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:6Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:7Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:7Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:8Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:8Fri May 12 09:08:50 CST 2017
    Sender1 : hellomsg:9Fri May 12 09:08:50 CST 2017
    Sender2 : hellomsg:9Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:0Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:0Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:1Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:1Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:2Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:2Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:3Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:3Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:4Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:4Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:5Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:5Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:6Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:6Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:7Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:8Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:8Fri May 12 09:08:50 CST 2017
    Receiver1  : hellomsg:7Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:9Fri May 12 09:08:50 CST 2017
    Receiver2  : hellomsg:9Fri May 12 09:08:50 CST 2017

    和一对多一样,接收端仍然会均匀接收到消息

    4、实体类传输

    springboot完美的支持对象的发送和接收,不需要格外的配置。

    实体类(必须实现序列化接口):

    package com.rabbit.user;
    
    import java.io.Serializable;
    
    public class User implements Serializable{
            private String name;
            private String pass;
            public String getName() {
                return name;
            }
            public void setName(String name) {
                this.name = name;
            }
            public String getPass() {
                return pass;
            }
            public void setPass(String pass) {
                this.pass = pass;
            }
    }

    生产者:

    package com.rabbit.user;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class UserSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            User user=new User();
            user.setName("hzb");
            user.setPass("123456789");
            System.out.println("user send : " + user.getName()+"/"+user.getPass());
            this.rabbitTemplate.convertAndSend("userQueue", user);
        }
    
    }

    消费者:

    package com.rabbit.user;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "userQueue")
    public class UserReceiver {
    
        @RabbitHandler
        public void process(User user) {
            System.out.println("user receive  : " + user.getName()+"/"+user.getPass());
        }
    
    }

    controller:

        /**
         * 实体类传输测试
         */
        @PostMapping("/userTest")
        public void userTest() {
               userSender.send();
        }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/userTest

    结果如下:

    user send : hzb/123456789
    user receive  : hzb/123456789

    5、topic ExChange示例

    topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为

    “topic.message”,topic.messages的binding_key为“topic.#”;

    生产者:

    package com.rabbit.topic;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String msg1 = "I am topic.mesaage msg======";
            System.out.println("sender1 : " + msg1);
            this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
            
            String msg2 = "I am topic.mesaages msg########";
            System.out.println("sender2 : " + msg2);
            this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
        }
    
    }

    消费者1(topic.message)

    package com.rabbit.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "topic.message")
    public class topicMessageReceiver {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("topicMessageReceiver  : " +msg);
        }
    
    }

    消费者2(topic.messages)

    package com.rabbit.topic;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "topic.messages")
    public class topicMessagesReceiver {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("topicMessagesReceiver  : " +msg);
        }
    
    }

    controller:

        /**
         * topic exchange类型rabbitmq测试
         */
        @PostMapping("/topicTest")
        public void topicTest() {
               topicSender.send();
        }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/topicTest

    结果如下:

    sender1 : I am topic.mesaage msg======
    sender2 : I am topic.mesaages msg########
    topicMessageReceiver  : I am topic.mesaage msg======
    topicMessagesReceiver  : I am topic.mesaage msg======
    topicMessagesReceiver  : I am topic.mesaages msg########

    由以上结果可知:sender1发送的消息,routing_key是“topic.message”,所以exchange里面的绑定的binding_key是“topic.message”,topic.#都符合路由规则;所以sender1

    发送的消息,两个队列都能接收到;

    sender2发送的消息,routing_key是“topic.messages”,所以exchange里面的绑定的binding_key只有topic.#都符合路由规则;所以sender2发送的消息只有队列

    topic.messages能收到。

    6、fanout ExChange示例

    Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。

    这里使用三个队列来测试(也就是在Application类中创建和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和Application中创建的fanoutExchange转发器绑定。

    生产者:

    package com.rabbit.fanout;
    
    import java.util.Date;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String msgString="fanoutSender :hello i am hzb";
            System.out.println(msgString);
            this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);
        }
    
    }

    消费者A:

    package com.rabbit.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.A")
    public class FanoutReceiverA {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("FanoutReceiverA  : " + msg);
        }
    
    }

    消费者B:

    package com.rabbit.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.B")
    public class FanoutReceiverB {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("FanoutReceiverB  : " + msg);
        }
    
    }

    消费者C:

    package com.rabbit.fanout;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.C")
    public class FanoutReceiverC {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("FanoutReceiverC  : " + msg);
        }
    
    }

    controller:

        /**
         * fanout exchange类型rabbitmq测试
         */
        @PostMapping("/fanoutTest")
        public void fanoutTest() {
               fanoutSender.send();
        }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/fanoutTest

    结果如下:

    fanoutSender :hello i am hzb
    FanoutReceiverC  : fanoutSender :hello i am hzb
    FanoutReceiverB  : fanoutSender :hello i am hzb
    FanoutReceiverA  : fanoutSender :hello i am hzb

    由以上结果可知:就算fanoutSender发送消息的时候,指定了routing_key为"abcd.ee",但是所有接收者都接受到了消息

    7、带callback的消息发送

    增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。该示例中没有新建队列和exchange,用的是第5节中的topic.messages队列和exchange转发器。消费者也是第5节中的topicMessagesReceiver

    rabbitmq配置类:

    package com.rabbit.callback;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Scope;
    
    public class RabbitConfig {
        
        @Value("${spring.rabbitmq.host}")
        private String addresses;
        
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
        
        @Bean
        public ConnectionFactory connectionFactory() {
    
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
            connectionFactory.setPublisherConfirms(publisherConfirms);
            return connectionFactory;
        }
        
        @Bean
        /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplatenew() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            return template;
        }
    
    }

    生产者:

    package com.rabbit.callback;
    
    import java.util.Date;
    import java.util.UUID;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CallBackSender implements  RabbitTemplate.ConfirmCallback{
        @Autowired
        private RabbitTemplate rabbitTemplatenew;
        public void send() {
            
            rabbitTemplatenew.setConfirmCallback(this);
            String msg="callbackSender : i am callback sender";
            System.out.println(msg );
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
            System.out.println("callbackSender UUID: " + correlationData.getId());  
            this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);  
        }
    
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            // TODO Auto-generated method stub
            System.out.println("callbakck confirm: " + correlationData.getId());
        }
    }

    消费者:第5节中的topicMessagesReceiver

    controller:

        @PostMapping("/callback")
        public void callbak() {
            callBackSender.send();
        }

    用post方式执行:

    http://127.0.0.1:8080/rabbit/callback

    结果如下:

    callbackSender : i am callback sender
    callbackSender UUID: cd0c80a6-4c65-4bf9-b4f8-f3b1180755d6
    callbakck confirm: cd0c80a6-4c65-4bf9-b4f8-f3b1180755d6
    topicMessagesReceiver  : callbackSender : i am callback sender

    从上面可以看出callbackSender发出的UUID,收到了回应,又传回来了。

  • 相关阅读:
    C语言 sprintf 函数 C语言零基础入门教程
    C语言 printf 函数 C语言零基础入门教程
    C语言 文件读写 fgets 函数 C语言零基础入门教程
    C语言 文件读写 fputs 函数 C语言零基础入门教程
    C语言 fprintf 函数 C语言零基础入门教程
    C语言 文件读写 fgetc 函数 C语言零基础入门教程
    C语言 文件读写 fputc 函数 C语言零基础入门教程
    C语言 strlen 函数 C语言零基础入门教程
    Brad Abrams关于Naming Conventions的演讲中涉及到的生词集解
    适配器模式
  • 原文地址:https://www.cnblogs.com/Javastudy-note/p/13818151.html
Copyright © 2011-2022 走看看