zoukankan      html  css  js  c++  java
  • RabbitMQ交换机

    一、交换机

    1.1 作用

    Exchange(交换机) 的作用就是接收消息并根据路由键转发消息到绑定的队列。

    1.2 交换机常用属性

    属性 含义
    Name 交换机名称
    Type 交换机类型,direct、topic、fanout、headers等,它们本质都一样,只是消息转发的逻辑不同
    Durability 是否持久化,true 为持久化
    Auto Delete 当最后一个绑定到 Exchange 上的队列删除后,自动删除该 Exchange
    Internal 当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 false
    Arguments 扩展参数,用于扩展 AMQP 协议自制定化使用

    二、不同类型的交换机

    2.1 Direct Exchange

    **Direct Exchange (直连型交换机) ** 是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:

    1) 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key);
    2) 当一个携带着路由值为 R 的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为 R 的队列。    
    

    Direct 模式可以使用 RabbitMQ 自带的 Exchange 即 default Exchange ,所以不需要将 Exchange 进行任何绑定(binding) 操作,消息传递时,RouteKey 必须完全匹配才会被队列接受,否则消息会被丢弃。

    我们之前的实例使用的就是 default Exchange

    实例:

    消费者:

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            String exchangeType = "direct";
    
            // 4.声明交换机
            String exchangeName = "directExchange";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    
            // 5.申明队列
            String queueName = "directQueue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 6.将交换机和队列进行绑定关系
            // 注意:我们只绑定一个路由KEY,说明另一个路由不会被消费掉
            String routingKey = "directA";
            channel.queueBind(queueName, exchangeName, routingKey);
    
    
            // 7.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
    
            // 8.循环消费
            System.err.println("消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端消费: " + msg);
            }
        }
    
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            // 4.循环发送消息
            // 声明交换机
            String exchangeName = "directExchange";
            
            // 发送消息
            String routingKey1 = "directA";
            String msg1 = "directA 消息";
            channel.basicPublish(exchangeName, routingKey1, null, msg1.getBytes());
    
            String routingKey2 = "directB";
            String msg2 = "directB 消息";
            channel.basicPublish(exchangeName, routingKey2, null, msg2.getBytes());
    
            // 5.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
        }
    }
    

    先启动消费者,然后启动生产者,查看控制台输出:

    消费端启动
    消费端消费: directA 消息
    

    可以看到未被绑定的 routingKey 消息未被消费掉。

    2.2 Fanout Exchange

    Fanout Exchange 是扇型交换机,它不处理路由键,只需要将队列绑定到交换机上。任何发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定(Binding)的所有 Queue上。它的转发消息是最快的。

    实例:

    消费者

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            String exchangeType = "fanout";
    
            // 4.声明交换机
            String exchangeName = "fanoutExchange";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    
            // 5.申明队列
            String queueName1 = "fanoutQueue1";
            channel.queueDeclare(queueName1, false, false, false, null);
    
            String queueName2 = "fanoutQueue2";
            channel.queueDeclare(queueName2, false, false, false, null);
    
    
            // 6.将交换机和队列进行绑定关系,不需要设置 routingKey
            channel.queueBind(queueName1, exchangeName, "");
            channel.queueBind(queueName2, exchangeName, "");
    
    
            // 7.创建消费者并消费
            new Thread(() -> {
                try {
                    handleConsumer(channel,queueName1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(() -> {
                try {
                    handleConsumer(channel,queueName2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
    
        private static void handleConsumer(Channel channel, String queueName) throws Exception {
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            // 7.循环消费
            System.err.println(queueName + " 消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println(queueName + "消费端消费: " + msg);
            }
        }
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            
            // 4.声明交换机
            String exchangeName = "fanoutExchange";
            // 5.循环发送消息
            for (int i = 0; i < 5; i++) {
                String msg = "消息"+i;
                channel.basicPublish(exchangeName, "", null, msg.getBytes());
            }
            // 6.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
    
        }
    }
    

    先启动消费者,然后启动生产者,查看控制台输出:

    fanoutQueue1 消费端启动
    fanoutQueue2 消费端启动
    fanoutQueue1消费端消费: 消息0
    fanoutQueue1消费端消费: 消息1
    fanoutQueue2消费端消费: 消息0
    fanoutQueue1消费端消费: 消息2
    fanoutQueue2消费端消费: 消息1
    fanoutQueue2消费端消费: 消息2
    fanoutQueue1消费端消费: 消息3
    fanoutQueue1消费端消费: 消息4
    fanoutQueue2消费端消费: 消息3
    fanoutQueue2消费端消费: 消息4
    

    2.3 Topic Exchange

    Topic Exchange(主题交换机) 会将所有发送到 Topic Exchange 的消息转发到所有关心 RouteKey 中指定的TopicQueue 上。

    ExchangeRouteKey 和某 Topic 进行模糊匹配。使用这种类型,队列需要绑定一个 Topic。

    注:可以使用通配符进行模糊匹配

    • # 匹配一个或多个词
    • * 匹配一个词

    实例:

    消费者:

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            String exchangeType = "topic";
    
            // 4.声明交换机
            String exchangeName = "topicExchange";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    
            // 5.申明队列
            String queueName1 = "topicQueue1";
            channel.queueDeclare(queueName1, false, false, false, null);
    
            String queueName2 = "topicQueue2";
            channel.queueDeclare(queueName2, false, false, false, null);
    
            // 6.将交换机和队列进行绑定关系
            // 匹配一个或多个词
            String routingKey1 = "user.#";
            channel.queueBind(queueName1, exchangeName, routingKey1);
            // 匹配一个词
            String routingKey2 = "order.*";
            channel.queueBind(queueName2, exchangeName, routingKey2);
    
            // 7.创建消费者并消费
            new Thread(() -> {
                try {
                    handleConsumer(channel,queueName1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(() -> {
                try {
                    handleConsumer(channel,queueName2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    
    
        private static void handleConsumer(Channel channel, String queueName) throws Exception {
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            // 7.循环消费
            System.err.println(queueName + " 消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println(queueName + "消费端消费: " + msg);
            }
        }
        
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
    
            // 4.声明交换机
            String exchangeName = "topicExchange";
            // 5.发送消息
            // 模糊匹配多个词
            channel.basicPublish(exchangeName, "user.a", null, "user.a".getBytes());
            channel.basicPublish(exchangeName, "user.a.b", null, "user.a.b".getBytes());
    
            // 模糊匹配一个词
            channel.basicPublish(exchangeName, "order.a", null, "order.a".getBytes());
            channel.basicPublish(exchangeName, "order.a.b", null, "order.a.b".getBytes());
    
            // 6.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
    
        }
    }
    

    先启动消费者,然后启动生产者,查看控制台输出:

    topicQueue2 消费端启动
    topicQueue1 消费端启动
    topicQueue1消费端消费: user.a
    topicQueue1消费端消费: user.a.b
    topicQueue2消费端消费: order.a
    

    2.4 Headers Exchange

    Headers Exchange(头交换机) 不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配。
    在绑定 QueueExchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headersExchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。

    不经常使用,了解即可。

    三、Dead Letter Exchange(死信交换机)

    3.1 死信模式

    死信模式指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预,这个过程中的 exchangequeue 就是所谓的 Dead Letter ExchangeQueue

    3.2 死信消息生成原因

    消息变成死信有以下几种情况:

    • 消费者对消息使用了 **basicReject ** 或 者 basicNack 回复,并且 requeue 参数设置为 false,即不再将该消息重新在消费者间进行投递.
    • 消息由于消息有效期(per-message TTL)过期
    • 消息由于队列超过其长度限制而被丢弃

    3.3 实例

    死信消费者:

    public class DeadLetterConsumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            // 4.声明死信交换机
            String exchangeName = "deadLetterExchange";
            String exchangeType = "topic";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    
            // 5.申明队列
            String queueName = "deadLetterQueue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 6.将交换机和队列进行绑定关系
            String routingKey = "#";
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 7.创建消费者消费
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
    
            System.err.println("死信消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("死信消费端消费: " + msg);
            }
    
        }
    
    }
    

    业务消费者:

    public class BusinessConsumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            String exchangeType = "topic";
    
            // 4.声明交换机
            String exchangeName = "businessExchange";
            channel.exchangeDeclare(exchangeName, exchangeType, false, false, false, null);
    
            // 5.申明队列
            String queueName = "businessQueue";
            Map<String, Object> arguments = new HashMap();
            // 指定死信交换机名
            arguments.put("x-dead-letter-exchange", "deadLetterExchange");   
            // 如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。
            // 如果没有设置,则保留该消息原有的路由key
    //        arguments.put("x-dead-letter-routing-key", "business.#");
            channel.queueDeclare(queueName, true, false, false, arguments);
    
            // 5.将交换机和队列进行绑定关系
            String routingKey = "business.#";
            channel.queueBind(queueName, exchangeName, routingKey);
    
            // 6.创建消费者
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            // autoAck = false 设置手动确认消费
            channel.basicConsume(queueName, false, queueingConsumer);
    
            // 7.循环消费
            System.err.println("业务消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("业务消费端接收消息: " + msg);
                // 设置消费失败
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }
        }
    
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            // 4.循环发送消息
            // 声明交换机
            String exchangeName = "businessExchange";
            // 声明路由
            String routingKey = "business.order";
            for (int i = 0; i < 5; i++) {
                String msg = "business 消息" + i;
                channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            }
    
    
            // 5.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
    
        }
    }
    
    1. 启动死信消费者
    2. 启动业务消费者
      3)启动生产者
    3. 观察控制台输出

    业务消费者控制台输出:

    业务消费端启动
    业务消费端接收消息: business 消息0
    业务消费端接收消息: business 消息1
    业务消费端接收消息: business 消息2
    业务消费端接收消息: business 消息3
    业务消费端接收消息: business 消息4
    

    死信消费者控制台输出:

    死信消费端启动
    死信消费端消费: business 消息0
    死信消费端消费: business 消息1
    死信消费端消费: business 消息2
    死信消费端消费: business 消息3
    死信消费端消费: business 消息4
    

    可以看到处理失败的消息已经传递到死信交换机中,并被死信消费者消费。

    从上面的代码,我们可以知道死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型 Direct、Fanout、Topic。一般来说,会为每个业务队列分配一个独有的 路由key ,并对应的配置一个死信队列进行监听。

    3.4 死信的处理方式

    死信的产生是不可避免,我们需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

    1. 丢弃,如果不是很重要,可以选择丢弃
    2. 记录死信入库,然后做后续的业务分析或处理
    3. 通过死信队列,由负责监听死信的应用程序进行处理
  • 相关阅读:
    修改dedecms默认文章来源 "未知"改为关键词
    wordpress不用插件实现Pagenavi页面导航功能
    在WordPress后台菜单系统中添加Home链接
    如何自定义wordpress登录界面的Logo
    怎样给WordPress分配更多的内存
    解决升级WordPress及插件需输入FTP账号的问题
    Facebook IV Winner's Interview: 1st place, Peter Best (aka fakeplastictrees)
    An Attempt to Understand Boosting Algorithm(s)
    A Statistical View of Deep Learning (V): Generalisation and Regularisation
    A Statistical View of Deep Learning (IV): Recurrent Nets and Dynamical Systems
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13260747.html
Copyright © 2011-2022 走看看