zoukankan      html  css  js  c++  java
  • RabbitMQ_5、主题模式

    主题模式

    基于模式(主题)接收消息

    *(星号)可以正好代替一个词。
    # (hash) 可以代替零个或多个单词。

    路由键设置为“ quick.orange.rabbit ”的消息将发送到两个队列
    消息“ lazy.orange.elephant ”也会发给他们两个。
    另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。
    “ lazy.pink.rabbit ”只会被传送到第二个队列一次,即使它匹配了两个绑定。
    “ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。
    如果我们违反約定并发送一到四个字的消息,例如“ orange ”或“ quick.orange.male.rabbit ”,会发生什么?好吧,这些消息不会匹配任何绑定并且会丢失。
    另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

    主題隊列 消息生產者

    String infoRoutingKey = "info.message.orange";
    String errorRoutingKey = "error.rabbit.lazy";
    String warningRoutingKey = "orange.warning.message";

    
    /**
     * @PackageName : com.rzk
     * @FileName : Send
     * @Description : 主题队列-消息生产者
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:21
     * @Version : 1.0.0
     */
    public class Send {
    
        //定义交换机名称
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
    
            try (
                    //连接工厂创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
                    //绑定交换机
                    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                    //
                    String infoMessage = " 普通消息  ";
                    String errorMessage = " 错误消息  ";
                    String warningMessage = " 警告消息  ";
                    //需要准备对应的路由
                    String infoRoutingKey = "info.message.orange";
                    String errorRoutingKey = "error.rabbit.lazy";
                    String warningRoutingKey = "orange.warning.message";
                    //队列消息的生产者:发送消息
                    channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
                    channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
                    channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] Sent '" + infoMessage + "'");
                    System.out.println(" [x] Sent '" + errorMessage + "'");
                    System.out.println(" [x] Sent '" + warningMessage + "'");
                }
        }
    }
    

    消息接收

    topic1會匹配到以下兩個:String infoRoutingKey = "info.message.orange";
    String warningRoutingKey = "orange.warning.message";

    主题队列 topic1

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 主题队列-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv01 {
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.55.192.186");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //获取队列(排他队列
            String queueName = channel.queueDeclare().getQueue();
            //队列绑定交换机
            String errorRoutingKey = "#.message.#";
            channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            /**
             * 监听队列消费消息
             * autoAck:自动应答
             * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
             */
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    

    主题队列 topic2

    topic2匹配的是error:String errorRoutingKey = "error.rabbit.lazy";

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 主题队列-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv02 {
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.55.192.186");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //获取队列(排他队列)
            String queueName = channel.queueDeclare().getQueue();
            //队列绑定交换机
            String routingKey = "*.rabbit.*";
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            //监听队列消费消息
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    
  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 一元三次方程
    Java实现 蓝桥杯VIP 算法训练 乘法表
    Java实现 蓝桥杯VIP 算法训练 矩阵加法
    Java实现 蓝桥杯VIP 算法训练 一元三次方程
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 平方计算
    Java实现 蓝桥杯VIP 算法训练 乘法表
    Java实现 蓝桥杯VIP 算法训练 乘法表
    监管只是压倒网盘业务的一根稻草,但不是主要原因(答案只有一个:成本!)
  • 原文地址:https://www.cnblogs.com/rzkwz/p/14929313.html
Copyright © 2011-2022 走看看