zoukankan      html  css  js  c++  java
  • RabbitMQ消息中间件(第三章)第二部分-笔记

    Return消息机制

    • Return Listener用于处理一些不可路由的消息
    • 我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后我们的消息监听队列,进行消费处理操作!
    • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这个不可达的的消息,就要使用Return Listener!
    • 在基础API中有一个关键的配置项:
    • Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!默认为false

      以下是实际代码操作

      

    package com.cx.temp.common.rabbitmq.returnlistener;
    
    import com.alibaba.fastjson.JSONObject;
    import com.cx.temp.common.utils.JsonUtil;
    import com.rabbitmq.client.*;
    import net.sf.json.JSON;
    
    import java.io.IOException;
    
    /**
     * retunListener机制-生产端
     */
    public class Producer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式:消息的确认模式
            channel.confirmSelect();
    
            String exchangeName = "test_return_exchange";
            String routingKey = "return.save";
            String routingKeyError = "abc.save";
    
            //5 发送消息
            //这里使用routingKey消费端可以正常收到消息,如果用routingKeyError,由于消费端声明的路由key是"return.#",所以该消息不可达,则会被addReturnListener监听到
            String msg = "Hello RabbitMQ send Return message!";
            channel.basicPublish(exchangeName, routingKeyError, true, null, msg.getBytes());
    
            //6 添加一个确认监听
            //replyCode路由的响应码,replyText路由的响应文本才,exchange交换机,
            // routingKey路由key,properties发送时传的扩展参数,body具体内容
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                                         String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("------handle return------");
                    System.err.println("replyCode:" + replyCode);
                    System.err.println("replyText:" + replyText);
                    System.err.println("exchange:" + exchange);
                    System.err.println("routingKey:" + routingKey);
                    System.err.println("properties:" + JSONObject.toJSON(properties));
                    System.err.println("body:" + new String(body));
                }
            });
    
    
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.returnlistener;
    
    import com.cx.temp.common.rabbitmq.quickstart.QueueingConsumer;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * retunListener机制-消费端
     */
    public class Consumer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_return_exchange";
            String routingKey = "return.#";
            String queueName = "test_refund_queue";
    
            //声明交换机和队列然后进行绑定,最后指定路由KEY
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //5 创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println("消费端:" + msg);
            }
        }
    
    }

    消息端自定义监听

    • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消息处理!
    • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中常用的使用方式!

     以下是实际代码操作

    package com.cx.temp.common.rabbitmq.consumer;
    
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * 消息端自定义监听-生产端
     */
    public class Producer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            //4 指定我们的消息投递模式:消息的确认模式
            channel.confirmSelect();
    
            String exchangeName = "test_consumer_exchange";
            String routingKey = "consumer.save";
    
            //5 发送消息
            //这里使用routingKey消费端可以正常收到消息,如果用routingKeyError,由于消费端声明的路由key是"return.#",所以该消息不可达,则会被addReturnListener监听到
            String msg = "Hello RabbitMQ send Consumer message!";
    
            for (int i = 0; i < 5; i++) {
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
    
        }
    
    }
    package com.cx.temp.common.rabbitmq.consumer;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 自定义消费者
     */
    public class MyConsumer extends DefaultConsumer {
    
        public MyConsumer(Channel channel) {
            super(channel);
        }
    
        /**
         *
         * @param consumerTag 消费标签
         * @param envelope 包含一些关键信息
         * @param properties 附带扩展数据
         * @param body 消息
         * @throws IOException
         */
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("----------consume message----------");
            System.err.println("consumerTag:" + consumerTag);
            System.err.println("envelope:" + envelope);
            System.err.println("properties:" + properties);
            System.err.println("consumerTag:" + consumerTag);
            System.err.println("body:" + new String(body));
    
    
        }
    }
    package com.cx.temp.common.rabbitmq.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消息端自定义监听-消费端
     */
    public class Consumer {
    
        public static void main(String[] args)  throws Exception {
    
            //1 创建一个ConectionFacory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/test001");
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("123456");
    
            //2 通过连接工厂创建连接
            Connection connection = connectionFactory.newConnection();
    
            //3 通过connection创建一个Channel
            Channel channel = connection.createChannel();
    
            String exchangeName = "test_consumer_exchange";
            String routingKey = "consumer.#";
            String queueName = "test_consumer_queue";
    
            //声明交换机和队列然后进行绑定,最后指定路由KEY
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
    
            //5 创建消费者
            channel.basicConsume(queueName, true, new MyConsumer(channel));
        }
    
    }

  • 相关阅读:
    js加密
    sharepoint更新左侧列表的名字
    HTML转换JS
    Html空格字符代码:
    docker 与host互传文件
    Ubuntu里node命令出错,找不到
    docker查看运行容器详细信息
    docker保存容器的修改
    Docker容器中安装新的程序
    运行docker容器镜像
  • 原文地址:https://www.cnblogs.com/huihui-hui/p/14321332.html
Copyright © 2011-2022 走看看