zoukankan      html  css  js  c++  java
  • RabbitMQ

    应用场景: 1.异步处理 : 同步阻塞的(会造成等待), 异步是非阻塞的(不会等待), 批量数据,就可以采用异步处理.

          2.系统解耦 : 多个系统之间, 不需要直接交互, 通过消息进行业务流转.

          3.流量削峰 : 高负载请求/任务缓冲处理.

    消息队列中增加了交换器(Exchange):

        1.Direct Exchange 直连交换机, 根据路由键完全匹配进行路由消息队列;

        2.Topic Exchange 通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词:

        3. Fanout Exchange 广播交换机, 投递到所有绑定的队列, 不需要规则.

        4. Headers Exchange 基于消息内容中的header属性进行匹配.

    依赖pom.xml

            <!--RabbitMQ依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    属性配置application.properties

    spring.rabbitmq.host=10.10.32.140
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    #开启发送确认
    spring.rabbitmq.publisher-confirms=true
    #开启发送失败退回
    spring.rabbitmq.publisher-returns=true
    #开启ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    第一类, 直连交换机directExchage

    发送者MessageSender

    @Component
    public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        public void send(String exchage, String routingKey) {
            String msg = "你好现在是 " + new Date();
            System.out.println("send content = " + msg);
    
            this.rabbitTemplate.setMandatory(true);
            this.rabbitTemplate.setConfirmCallback(this);
            this.rabbitTemplate.setReturnCallback(this);
    
            //发送消息
            this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
    
        }
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        }
    }

    消费者MessageReceiver

    @Component
    @RabbitListener(queues = "queue1")
    public class MessageReceiver {
    
        public void process(String msg, Channel channel, Message message) throws IOException {
    
            try {
                Thread.sleep(3000);
                System.out.println("睡眠3s");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("receiver fail");
            }
    
        }
    }

    配置类RabbitConfig

    @Configuration
    public class RabbitConfig {
    
        /**
         * 定义一个交换器 exchage: DirectExchage 直连交换机, 精确匹配
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            //创建一个直连交换器, 他就是在rabbitmq服务器上创建这么一个交换器
            return new DirectExchange("exchage1");
        }
    
        /**
         * 创建一个队列, 合规队列是用来存放exchage路由过来的消息
         * @return
         */
        @Bean
        public Queue Queuq1() {
            return new Queue("queue1", true);
        }
    
        /**
         * 建立起关系, 交换机 + 队列 绑定起来
         * @return
         */
        @Bean
        public Binding bindingDirectExchange(Queue queuq1, DirectExchange directExchange) {
            return BindingBuilder.bind(queuq1).to(directExchange).with("routingkey1");
        }
    
    
    }

    Controller访问

    @RestController
    public class MessageController {
    
        @Autowired
        private MessageSender helloSender;
    
        /**
         * 正常发送消息
         * @return
         */
        @RequestMapping("/boot/send")
        public String send () {
            helloSender.send("exchage1", "routingkey1");
            return "success";
        }
        
    }

    第二类,广播交换器 FanoutExchange, 不需要路由匹配

    配置类RabbitConfig

      /**
         * 创建一个FanoutExchange交换器, 不需要路由匹配
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("exchange2");
        }
    
        /**
         * 创建队列2
         * @return
         */
        public Queue queue2() {
            return new Queue("queue2", true);
        }
    
        /**
         * 创建队列3
         * @return
         */
        public Queue queue3() {
            return new Queue("queue3", true);
        }
    
        /**
         * 把队列2和FanoutExchage交换机绑定
         * @param queue2
         * @param fanoutExchange
         * @return
         */
        public Binding bindingFanoutExchage(Queue queue2, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue2).to(fanoutExchange);
        }
    
        /**
         * 把队列3和FanoutExchage交换机绑定
         * @param queue3
         * @param fanoutExchange
         * @return
         */
        public Binding bindingFanoutExchage2(Queue queue3, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue3).to(fanoutExchange);
        }

    消息发送者, 与第一种一样

    @Component
    public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        public void send(String exchage, String routingKey) {
            String msg = "你好现在是 " + new Date();
            System.out.println("send content = " + msg);
    
            this.rabbitTemplate.setMandatory(true);
            this.rabbitTemplate.setConfirmCallback(this);
            this.rabbitTemplate.setReturnCallback(this);
    
            //发送消息
            this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
    
        }
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        }
    }

    消息接收者2

    @Component
    @RabbitListener(queues = "queue2")
    public class MessageReceiver {
    
        public void process(String msg, Channel channel, Message message) throws IOException {
    
            try {
                Thread.sleep(3000);
                System.out.println("睡眠3s");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("receiver fail");
            }
    
        }
    }

    消息接收者3

    @Component
    @RabbitListener(queues = "queue3")
    public class MessageReceiver {
    
        public void process(String msg, Channel channel, Message message) throws IOException {
    
            try {
                Thread.sleep(3000);
                System.out.println("睡眠3s");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("receiver fail");
            }
    
        }
    }

    Controller访问

    @RestController
    public class MessageController {
    
        @Autowired
        private MessageSender helloSender;
    
        /**
         * 正常发送消息
         * @return
         */
        @RequestMapping("/boot/send")
        public String send () {
            helloSender.send("exchage2", "");
            return "success";
        }
        
    }

     

    第三类,Topic Exchange通配符交换机, #匹配多个单词, *匹配一个单词, 用.隔开的称为一个单词

    配置类RabbitConfig

    /**
         * 创建交换器 TopicExchange, 模糊匹配
         * @return
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange("exchange3");
        }
    
        @Bean
        public Queue queue4() {
            return new Queue("queue4", true);
        }
    
        @Bean
        public Queue queue5() {
            return new Queue("queue5", true);
        }
    
        /**
         * 和queue4建立联系
         * @param queue4
         * @param topicExchange
         * @return
         */
        public Binding bindingTopicExchange(Queue queue4, TopicExchange topicExchange) {
            return BindingBuilder.bind(queue4).to(topicExchange).with("#.k4.*");
        }
    
        /**
         * 和queue5建立联系
         * @param queue5
         * @param topicExchange
         * @return
         */
        public Binding bindingTopicExchange2(Queue queue5, TopicExchange topicExchange) {
            return BindingBuilder.bind(queue5).to(topicExchange).with("#.K5.*");
        }

    消息发送者, 与第一种一样

    @Component
    public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        public void send(String exchage, String routingKey) {
            String msg = "你好现在是 " + new Date();
            System.out.println("send content = " + msg);
    
            this.rabbitTemplate.setMandatory(true);
            this.rabbitTemplate.setConfirmCallback(this);
            this.rabbitTemplate.setReturnCallback(this);
    
            //发送消息
            this.rabbitTemplate.convertAndSend(exchage, routingKey, msg);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
    
        }
    
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    
        }
    }

    消息接收者4

    @Component
    @RabbitListener(queues = "queue4")
    public class MessageReceiver {
    
        public void process(String msg, Channel channel, Message message) throws IOException {
    
            try {
                Thread.sleep(3000);
                System.out.println("睡眠3s");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("receiver fail");
            }
    
        }
    }

    消息接收者5

    @Component
    @RabbitListener(queues = "queue5")
    public class MessageReceiver {
    
        public void process(String msg, Channel channel, Message message) throws IOException {
    
            try {
                Thread.sleep(3000);
                System.out.println("睡眠3s");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            try {
                //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉; 否则消息服务器以为这条消息没处理掉 后续还会再发
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //丢弃这条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                System.out.println("receiver fail");
            }
    
        }
    }

    Controller访问

    @RestController
    public class MessageController {
    
        @Autowired
        private MessageSender helloSender;
    
        /**
         * 正常发送消息
         * @return
         */
        @RequestMapping("/boot/send")
        public String send () {
            //helloSender.send("exchage2", "xy.k4.z");
        helloSender.send("exchage2", "xy.h.k4.z");
    return "success"; } }

    总结:

       RabbitMQ应用十分广泛, 程序员必备.

  • 相关阅读:
    ios中,在SearchBar里面搜索内容,可根据内容来查找所需的信息资源,可获得SearchBar中的内容
    TableViewCell,TableView,UITableViewCell
    iOS-多线程 ,整理集锦,多种线程的创建
    从服务器获取的 数值,进行值传递,不同的文件夹之间的调用。
    Principle 安装步骤
    Principle: 做动效,选对软件很重要 --- 转载自简书
    ios 给图片添加水印
    symbol(s) not found for architecture x86_64 之 linker command failed with exit code 1 (use -v to see invocation)解决方案排查
    IOS 本地通知推送消息
    【ios 7】 之后的设置系统的状态栏隐藏的方法分享
  • 原文地址:https://www.cnblogs.com/goujh/p/10933544.html
Copyright © 2011-2022 走看看