zoukankan      html  css  js  c++  java
  • 【RabbitMQ】05 通配符模式

    需要设定交换机模式为通配符模式 Topic

    在绑定规则上采用通配描述实现动态绑定

    创建通配符模式的生产者

    package cn.dzz.topicQueue;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class TopicInProducer {
    
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("192.168.2.121");
            connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
            connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
            connectionFactory.setUsername("test"); // guest
            connectionFactory.setPassword("123456"); // guest
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            /**
             *  多了一个创建交换机的过程
             *  public DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
             *      return this.exchangeDeclare(exchange, type.getType(), durable, autoDelete, internal, arguments);
             *  }
             *  String exchange 交换机名称
             *  String type 交换机类型,这里换成枚举类型,方便查找 com.rabbitmq.client.BuiltinExchangeType
             *      DIRECT("direct"), 定向  简单模式 和 工作模式
             *      FANOUT("fanout"), 扇形  广播(通知给所有和这个交换机绑定的队列)
             *      TOPIC("topic"), 通配符 ?
             *      HEADERS("headers"); 参数匹配, 视频暂不讲解
             *  boolean durable 持久化
             *  boolean autoDelete 自动删除
             *  boolean internal 内部使用 一般false
             *  Map<String, Object> arguments
            */
            String exchangeName = "test_topic";
            channel.exchangeDeclare(
                    exchangeName,
                    BuiltinExchangeType.TOPIC,
                    true,
                    false,
                    false,
                    null
            );
    
            // 这里队列的声明其实可以按照规则声明, 交换机名称 队列名称 队列序号这样
            String queueName1 = "test_topic - topicQueue - 1";
            String queueName2 = "test_topic - topicQueue - 2";
            channel.queueDeclare(queueName1, true, false, false, null);
            channel.queueDeclare(queueName2, true, false, false, null);
    
            // Topic的精髓在于指定我们想要的规则
            channel.queueBind(queueName1, exchangeName, "#.error");
            channel.queueBind(queueName1, exchangeName, "order.*");
            channel.queueBind(queueName2, exchangeName, "*.*");
    
            // 发送消息 这里制定一些不同的消息 以做出区分
            String level = "order.info";
            for (int i = 0; i < 20; i++) {
                if (i == 5) level = "order.warning";
                if (i == 10) level = "goods.warning";
                else if (i == 14) level = "order.error";
    
                String body = "sending routine msg " + i + level;
                channel.basicPublish(exchangeName, level, null, body.getBytes(StandardCharsets.UTF_8));
            }
    
            // 释放资源
            channel.close();
            connection.close();
        }
    }

    讲交换机的时候视频故意不改Topic模式

    其实是想介绍这个删除功能,这样就可以删除交换机了

    但是Topic队列的消息还保存在里面

    找到对应的队列,也可以删除这个的队列的消息

    然后创建消费者接收Topic交换机发送的消息:

    package cn.dzz.topicQueue;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    
    public class TopicQueueInConsumer2 {
    
        /**
         * 工作队列 消费者
         * @param args
         */
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
    
            connectionFactory.setHost("192.168.2.121");
            connectionFactory.setPort(ConnectionFactory.DEFAULT_AMQP_PORT); // 5672
            connectionFactory.setVirtualHost("/dzz"); // 虚拟主机? 默认值 /
            connectionFactory.setUsername("test"); // guest
            connectionFactory.setPassword("123456"); // guest
    
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            // 声明将不在需要
            // channel.queueDeclare("work_queue", true, false, false, null);
    
            // 从生产者复制过来需要的队列名称
            String queueName1 = "test_topic - topicQueue - 1";
            String queueName2 = "test_topic - topicQueue - 2";
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("body(message) " + new String(body, StandardCharsets.UTF_8));
                    System.out.println("- - - - - over - - - - -");
                }
            };
    
            channel.basicConsume(queueName2, true, consumer);
        }
    }

    执行完生产者之后,然后再执行消费者1 和2

    "C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=49785:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
    ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
    epositorycom
    abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
    epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.topicQueue.TopicQueueInConsumer1
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    body(message) sending routine msg 0order.info
    - - - - - over - - - - -
    body(message) sending routine msg 1order.info
    - - - - - over - - - - -
    body(message) sending routine msg 2order.info
    - - - - - over - - - - -
    body(message) sending routine msg 3order.info
    - - - - - over - - - - -
    body(message) sending routine msg 4order.info
    - - - - - over - - - - -
    body(message) sending routine msg 5order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 6order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 7order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 8order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 9order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 14order.error
    - - - - - over - - - - -
    body(message) sending routine msg 15order.error
    - - - - - over - - - - -
    body(message) sending routine msg 16order.error
    - - - - - over - - - - -
    body(message) sending routine msg 17order.error
    - - - - - over - - - - -
    body(message) sending routine msg 18order.error
    - - - - - over - - - - -
    body(message) sending routine msg 19order.error
    - - - - - over - - - - -

    可以看出消费者1是接收不到goods的消息的:

    "C:Program Files (x86)Javajdk1.8.0_291injava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1libidea_rt.jar=49765:C:Program FilesJetBrainsIntelliJ IDEA 2021.2.1in" -Dfile.encoding=UTF-8 -classpath "C:Program Files (x86)Javajdk1.8.0_291jrelibcharsets.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibdeploy.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextaccess-bridge-32.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextcldrdata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextdnsns.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjaccess.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextjfxrt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextlocaledata.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibext
    ashorn.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunec.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunjce_provider.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunmscapi.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextsunpkcs11.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibextzipfs.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjavaws.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjce.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfr.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjfxswt.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibjsse.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibmanagement-agent.jar;C:Program Files (x86)Javajdk1.8.0_291jrelibplugin.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    esources.jar;C:Program Files (x86)Javajdk1.8.0_291jrelib
    t.jar;C:UsersAdministratorIdeaProjectsRabbitMQConsumerService	argetclasses;C:UsersAdministrator.m2
    epositorycom
    abbitmqamqp-client5.6.0amqp-client-5.6.0.jar;C:UsersAdministrator.m2
    epositoryorgslf4jslf4j-api1.7.25slf4j-api-1.7.25.jar" cn.dzz.topicQueue.TopicQueueInConsumer2
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    body(message) sending routine msg 0order.info
    - - - - - over - - - - -
    body(message) sending routine msg 1order.info
    - - - - - over - - - - -
    body(message) sending routine msg 2order.info
    - - - - - over - - - - -
    body(message) sending routine msg 3order.info
    - - - - - over - - - - -
    body(message) sending routine msg 4order.info
    - - - - - over - - - - -
    body(message) sending routine msg 5order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 6order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 7order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 8order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 9order.warning
    - - - - - over - - - - -
    body(message) sending routine msg 10goods.warning
    - - - - - over - - - - -
    body(message) sending routine msg 11goods.warning
    - - - - - over - - - - -
    body(message) sending routine msg 12goods.warning
    - - - - - over - - - - -
    body(message) sending routine msg 13goods.warning
    - - - - - over - - - - -
    body(message) sending routine msg 14order.error
    - - - - - over - - - - -
    body(message) sending routine msg 15order.error
    - - - - - over - - - - -
    body(message) sending routine msg 16order.error
    - - - - - over - - - - -
    body(message) sending routine msg 17order.error
    - - - - - over - - - - -
    body(message) sending routine msg 18order.error
    - - - - - over - - - - -
    body(message) sending routine msg 19order.error
    - - - - - over - - - - -

    Topic模式就是基于路由规则规划队列消息处理

    相比前面的Routine模式要显得更灵活一些

  • 相关阅读:
    斯坦福大学Andrew Ng
    斯坦福大学Andrew Ng
    斯坦福大学Andrew Ng
    斯坦福大学Andrew Ng
    学到即赚到
    matlab学习笔记之五种常见的图形绘制功能
    Flutter混合栈的管理
    Android调用系统拍照裁剪和选图功能
    Android DataBinding库(MVVM设计模式)
    Flutter 动画使用
  • 原文地址:https://www.cnblogs.com/mindzone/p/15374619.html
Copyright © 2011-2022 走看看