zoukankan      html  css  js  c++  java
  • springboot踩雷系列之RabbitMQ(进阶版)

    进阶版是在基础版的基础上进行了一些扩展,比如一对多、多对多、格式匹配、回调等等

    一对多:

    测试类:

    /**
         * 单生产者-多消费者
         */
        @RequestMapping("/oneToMany")
        public void oneToMany() {
            for(int i=0;i<10;i++){
                helloSender1.send("oneToMany:"+i);
            }
        }

    生产者不变 增加消费者:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者二号
     * <p>
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue"})
    public class ReceiverController2 {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者二号 : " + msg);
        }
    }
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者三号
     * <p>
     *
     * @Author
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue"})
    public class ReceiverController3 {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者三号 : " + msg);
        }
    }

    执行结果:

    如此可见,一对多时,消费者随机接收消息

    多对多:

    测试类

     /**
         * 多生产者-多消费者
         */
        @RequestMapping("/manyToMany")
        public void manyToMany() {
            for(int i=0;i<10;i++){
                helloSender1.send("manyToMany:"+i);
                helloSender2.send("manyToMany:"+i);
            }
        }

    增加生产者:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    /**
     * 生产者二号
     * <p>
     * @Date 2019/5/23 15:56
     **/
    @Component
    public class SendController2 {
    
        @Autowired
        private AmqpTemplate template;
    
        public void send(String msg) {
            String context = msg+ LocalDateTime.now();
            System.out.println("生产者二号 : " + context);
            //参数1为队列名称 2为消息内容
            this.template.convertAndSend("testQueue", context);
        }
    }

    访问结果:

    由此可见 并不是所有消息发送完毕之后消费才开始消费

    topic  exchange发送(根据定义的格式 选择对应的消费者):

    测试类:

    /**
         * 参数二比参数一多了一个s  用以区分topic的传输结果
         */
        @RequestMapping("/topicTest")
    public void topicTest() {
    helloSender1.send( "hi, i am message all","hi, i am message 1","hi, i am messages 2");
    }
    
    

    生产者(请注意两个队列的名字不同,"topic.1","topic.only","topic.messages"是关键字):

    //数组传输使用  topic exchange
        public void send(String... msgs) {
    System.out.println("消息1: " + msgs[0]);
    template.convertAndSend("exchange", "topic.1", msgs[0]);
    System.out.println("消息2: " + msgs[1]);
    template.convertAndSend("exchange", "topic.only", msgs[1]);
    System.out.println("消息3: " + msgs[2]);
    template.convertAndSend("exchange", "topic.messages", msgs[2]);
     }

    消费1:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者一号
     * <p>
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue","topic.message","fanout.A"})
    public class ReceiverController {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者一号 : " + msg);
        }
    }

    消费者2:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者二号
     * <p>
     *
     * @Author
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue","topic.messages","fanout.B"})
    public class ReceiverController2 {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者二号 : " + msg);
        }
    }

    增加队列:

      @Bean
        public Queue messageQueue() {
            return new Queue("topic.message");
        }
    
        @Bean
        public Queue messagesQueue() {
            return new Queue("topic.messages");
        }
    
        /**
         *下方分别将队列topic.message  topic.messages与exchange绑定并设定此队列可以接受到的消息队列格式
         * 此处Queue的名称必须与上方的初始化队列的方法名一致  根据方法名对应具体的队列
         * 关键字不可随意设置 必须由点隔开
         * 网上有人说*代表一个 #代表多个 两个点中间代表一个单词
         *<p>
         */
        @Bean
        Binding bindingExchangeMessage(Queue messageQueue, TopicExchange exchange) {
            return BindingBuilder.bind(messageQueue).to(exchange).with("topic.*");
        }
    
        @Bean
        Binding bindingExchangeMessages(Queue messagesQueue, TopicExchange exchange) {
            return BindingBuilder.bind(messagesQueue).to(exchange).with("topic.only");
        }

    访问结果:

    fanout fanoutExchange(所有fanout.开头的队列都可以接受消息)
    测试类:
     /**
         * 测试是否所有fanout.开头的队列都可以接受消息  消费者三号未实现fanout.开头的队列 测试是否可以接受到消息
         */
        @RequestMapping("/fanoutTest")
        public void fanoutTest() {
            helloSender1.sendFanout("fanout message");
        }

    消费者1:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者一号
     * <p>
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue","topic.message","fanout.A"})
    public class ReceiverController {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者一号 : " + msg);
        }
    }

    消费者2:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者二号
     * <p>
     * @Date 2019/5/23 15:58
     **/
    @Component
    @RabbitListener(queues = {"testQueue","topic.messages","fanout.B"})
    public class ReceiverController2 {
    
        @RabbitHandler
        public void process(String msg) {
            System.out.println("消费者二号: " + msg);
        }
    }

    生产者:

    //fanout exchange使用  routingKey些什么都可以
        public void sendFanout (String msg) {
            System.out.println("生产者一号 : " + msg);
            template.convertAndSend("fanoutExchange", "nonentityQueue",msg);
        }

    访问结果:

    回调消息
    测试类
    /*
         *回调消息
         */
        @RequestMapping("/confimCallBacK")
        public void confimCallBacK(){
            String msg ="生产者:发送回调消息";
            SendCallBack.sendConfirm(msg);
        }

    生产者:

    import com.example.springboot.entity.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import tk.mybatis.mapper.util.StringUtil;
    
    import java.time.LocalDateTime;
    import java.util.UUID;
    
    /**
     * 生产者
     * <p>
     *
     * @Author
     * @Date 2019/5/23 15:56
     **/
    @Component
    public class SendCallBackController implements RabbitTemplate.ConfirmCallback{
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendConfirm(String msg) {
            rabbitTemplate.setConfirmCallback(this);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("回调消息主键: " + correlationData.getId());
            rabbitTemplate.convertAndSend("exchange", "callbackQueue", msg, correlationData);
        }
    
        //回调函数 实现RabbitTemplate.ConfirmCallback接口中的方法
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("执行回调: " + correlationData.getId());
        }
    }

    增加配置类:

    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Scope;
    /**
     * RabbitMQ的回调实现
     * <p>
     * @Date 2019/5/24 15:11
     **/
    public class RabbitCallBack {
        //@Value 获取配置文件的对应的值  此处设置是为了设定回调地址
        @Value("${spring.rabbitmq.host}")
        private String addresses;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
    
        @Bean
        /*
         *可设置为prototype singleton也就是不同的作用域
         * singleton类似于单例模式 返回的都是一个实例 spring容器只会保存一个Bean    线程之间资源 完全共享
         * prototype每一次请求都会生成一个新的bean  多线程情况下必须一个线程对应一个独立的实例
         * 无状态的可以使用singleton  有状态的使用prototype
         *<p>
         */
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate getRabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(getConnectionFactory());
            return template;
        }
    
        @Bean
        public ConnectionFactory getConnectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setPublisherConfirms(publisherConfirms);
            return connectionFactory;
        }

    增加配置:

    访问结果:

    本文参考了很多博客之类的,因为看完了之后没保存,导致找不到了。。。

    另附上源代码一份:

    https://github.com/duxianshenga/demo1.2

  • 相关阅读:
    快速开发框架:进销存业务注意事项
    延时执行函数:前浪死在沙滩上
    新增筛选方案
    进销存数据库设计:采购订单
    SasS 设计原则十二因素
    四种线程池的解析
    高并发下的流量控制
    Mybatis 缓存机制
    谈谈JVM垃圾回收
    如何使错误日志更加方便地排查问题
  • 原文地址:https://www.cnblogs.com/huayuxiaoxiang/p/10919273.html
Copyright © 2011-2022 走看看