zoukankan      html  css  js  c++  java
  • 主题模式

    • Topic(主题模式)

      • Topic exchange

        direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但匹配规则有些不同
        routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但是通常它们指定与消息相关的某些功能。一些有效的rounting key 如:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。rounting key中可以包含任意多个单词,最多255个字节。
        binding key可以存在如下两种特殊的字符 即:
        1、*(星号)可以代替一个单词。
        2、#(哈希)可以替代零个或多个单词

        ​ 在上面图片中,Routing key 设置为"quick.orange.rabbit"的消息将传递到两个队列。消息"lazy.orange.elephant"也将发送给他们两个。但,"quick.orange.fox"只会进入第一个队列,而"lazy.brown.fox"只会进入第二个队列。"lazy.pink.rabbit"将被传递到第二队只有一次,即使两个绑定匹配。"quick.brown.fox"与任何绑定都不匹配,因此将被丢弃。
        如果我们发送一个或四个单词的消息,例如"orange"或"quick.orange.male.rabbit",这些消息将不匹配任何绑定,并且将会丢失。但"lazy.orange.male.rabbit"即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。

      • 生产者消费者代码:

        ​ 生产者

        public class TopicEmitLog {
            private static final String EXCHANGE_NAME = "topic_logs";
        
            public static void main(String[] args) throws Exception {
                //获取连接
                Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            
                Channel channel = connection.createChannel();
            
                //创建队列
                //channel.queueDeclare("direct_loge",true,false,false,null);
                //声明交换机,
                channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            
                String message="hello";
                //发送消息
            
                //发送消息
                channel.basicPublish(EXCHANGE_NAME, "topics.log", null, message.getBytes("utf-8"));
            
                channel.close();
                connection.close();
            
            }
        
        }
        

        消费者1可以收到消息

        public class TopicRecv {
        
            public static final String QUEUE_NAME = "topic_queues";
            
            public static final String EXCHANGE_NAME = "topic_logs";
            
            public static void main(String[] args) throws Exception{
                //获取连接
                Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            
                //声明通道
                Channel channel = connection.createChannel();
            
                channel.exchangeDeclare(EXCHANGE_NAME, "topic");
                //声明队列队列
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //4.绑定队列到交换器,指定路由key为topics
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
            
                //
                //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
            
                DeliverCallback deliverCallback = new DeliverCallback(){
                    @Override
                    public void handle(String consumerTag, Delivery delivery) throws IOException {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println(" [x] Received '" + message + "'");
                    }
                };
            
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
                    @Override
                    public void handle(String consumerTag) throws IOException {
            
                    }
                });
            
            }
        
        }
        

        消费者2收不到消息

        public class TopicRecv2 {
        
            public static final String QUEUE_NAME = "topic_queues2";
            
            public static final String EXCHANGE_NAME = "topic_logs";
            
            public static void main(String[] args) throws Exception{
                //获取连接
                Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
            
                //声明通道
                Channel channel = connection.createChannel();
            
                channel.exchangeDeclare(EXCHANGE_NAME, "topic");
                //声明队列队列
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //4.绑定队列到交换器,指定路由key为topics
                //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topics.#");
            
                //
                channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
            
                DeliverCallback deliverCallback = new DeliverCallback(){
                    @Override
                    public void handle(String consumerTag, Delivery delivery) throws IOException {
                        String message = new String(delivery.getBody(), "UTF-8");
                        System.out.println(" [x] Received '" + message + "'");
                    }
                };
            
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){
                    @Override
                    public void handle(String consumerTag) throws IOException {
            
                    }
                });
            
            }
        
        }
        

    相关代码链接: https://github.com/albert-liu435/springmq

  • 相关阅读:
    我想操作的是利用SqlDataAdapter的几个Command属性(InsertCommand,UpdateCommand,DeleteCommand)来更新数据库
    有两个数据库A和B,数据库A中有表a,如何把表a映射到数据库B中,sql 2005
    代码生成器
    IWorkSpace接口介绍
    空间数据库介绍
    IGeoFeatureLayer
    IFeatureLayer
    Python ML环境搭建与学习资料推荐
    Python ML环境搭建与学习资料推荐
    TypeError: Can not convert a float32 into a Tensor or Operation.
  • 原文地址:https://www.cnblogs.com/haizhilangzi/p/12301736.html
Copyright © 2011-2022 走看看