zoukankan      html  css  js  c++  java
  • RabbitMQ_5、主题模式

    主题模式

    基于模式(主题)接收消息

    *(星号)可以正好代替一个词。
    # (hash) 可以代替零个或多个单词。

    路由键设置为“ quick.orange.rabbit ”的消息将发送到两个队列
    消息“ lazy.orange.elephant ”也会发给他们两个。
    另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。
    “ lazy.pink.rabbit ”只会被传送到第二个队列一次,即使它匹配了两个绑定。
    “ quick.brown.fox ”不匹配任何绑定,因此将被丢弃。
    如果我们违反約定并发送一到四个字的消息,例如“ orange ”或“ quick.orange.male.rabbit ”,会发生什么?好吧,这些消息不会匹配任何绑定并且会丢失。
    另一方面,“ lazy.orange.male.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

    主題隊列 消息生產者

    String infoRoutingKey = "info.message.orange";
    String errorRoutingKey = "error.rabbit.lazy";
    String warningRoutingKey = "orange.warning.message";

    
    /**
     * @PackageName : com.rzk
     * @FileName : Send
     * @Description : 主题队列-消息生产者
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:21
     * @Version : 1.0.0
     */
    public class Send {
    
        //定义交换机名称
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("*");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
    
            try (
                    //连接工厂创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
                    //绑定交换机
                    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                    //
                    String infoMessage = " 普通消息  ";
                    String errorMessage = " 错误消息  ";
                    String warningMessage = " 警告消息  ";
                    //需要准备对应的路由
                    String infoRoutingKey = "info.message.orange";
                    String errorRoutingKey = "error.rabbit.lazy";
                    String warningRoutingKey = "orange.warning.message";
                    //队列消息的生产者:发送消息
                    channel.basicPublish(EXCHANGE_NAME, infoRoutingKey, null, infoMessage.getBytes(StandardCharsets.UTF_8));
                    channel.basicPublish(EXCHANGE_NAME, errorRoutingKey, null, errorMessage.getBytes(StandardCharsets.UTF_8));
                    channel.basicPublish(EXCHANGE_NAME, warningRoutingKey, null, warningMessage.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] Sent '" + infoMessage + "'");
                    System.out.println(" [x] Sent '" + errorMessage + "'");
                    System.out.println(" [x] Sent '" + warningMessage + "'");
                }
        }
    }
    

    消息接收

    topic1會匹配到以下兩個:String infoRoutingKey = "info.message.orange";
    String warningRoutingKey = "orange.warning.message";

    主题队列 topic1

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 主题队列-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv01 {
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.55.192.186");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //获取队列(排他队列
            String queueName = channel.queueDeclare().getQueue();
            //队列绑定交换机
            String errorRoutingKey = "#.message.#";
            channel.queueBind(queueName,EXCHANGE_NAME,errorRoutingKey);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            /**
             * 监听队列消费消息
             * autoAck:自动应答
             * 当消费者收到该消息,会返回通知消息队列 我消费者已经收到消息了
             */
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    

    主题队列 topic2

    topic2匹配的是error:String errorRoutingKey = "error.rabbit.lazy";

    /**
     * @PackageName : com.rzk.simple.recv
     * @FileName : Recv
     * @Description : 主题队列-消息接收
     * @Author : rzk
     * @CreateTime : 23/6/2021 上午12:22
     * @Version : 1.0.0
     */
    public class Recv02 {
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            //创建工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.55.192.186");
            factory.setUsername("yeb");
            factory.setVirtualHost("/yeb");
            factory.setPassword("yeb");
            factory.setPort(5672);
            //连接工厂创建连接
            Connection connection = factory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //获取队列(排他队列)
            String queueName = channel.queueDeclare().getQueue();
            //队列绑定交换机
            String routingKey = "*.rabbit.*";
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            //监听队列消费消息
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        }
    }
    
  • 相关阅读:
    C++面向对象高级编程(下)第二周-Geekband
    C++面向对象高级编程(下)第一周-Geekband
    C++面向对象高级编程(下)-Geekband
    堆,栈,内存管理, 拓展补充-Geekband
    C++面向对象高级编程(上)-Geekband
    MFC 多屏显示
    Open CASCADE Technology: IGES Support
    JAVA反射
    HashMap
    Linux 系统编程
  • 原文地址:https://www.cnblogs.com/rzkwz/p/14929313.html
Copyright © 2011-2022 走看看