zoukankan      html  css  js  c++  java
  • Topic路由模式

    原本理解的不够彻底,程序总是不太对。所以查询了资料,关于这种模式的意思做了仔细的解读,复制的文字。

    一:介绍

    1.模式

      

    2.知识点

      其中,#可以匹配一个或者多个字符

      其中,*可以匹配一个字符

    3.仔细解读上面的图

    在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

           我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

           路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

           如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

           另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

    二:程序

    1.生产者

     1 package com.mq.topic;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class TopicSend {
     8     private static final String EXCHANGE_NAME="test_exchange_topic";
     9     public static void main(String[] args)throws Exception{
    10         Connection connection= ConnectionUtil.getConnection();
    11         Channel channel=connection.createChannel();
    12         channel.exchangeDeclare(EXCHANGE_NAME,"topic");
    13         String msg="hello topic";
    14         //重点是第二个参数routingKey
    15         String routingKey="商品";
    16         channel.basicPublish(EXCHANGE_NAME,"good.query.pro",null,msg.getBytes());
    17         System.out.println("msg send:"+msg);
    18         channel.close();
    19         connection.close();
    20     }
    21 }

    2.消费者一

     1 package com.mq.topic;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class TopicReceive1 {
     9     private static final String EXCHANGE_NAME="test_exchange_topic";
    10     private static final String QUEUE_NAME="test_queue_topic_1";
    11     public static void main(String[] args)throws Exception{
    12         Connection connection= ConnectionUtil.getConnection();
    13         final Channel channel=connection.createChannel();
    14         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    15         //good.query.pro
    16         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"good.#");
    17         channel.basicQos(1);
    18         Consumer consumer=new DefaultConsumer(channel){
    19             @Override
    20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    21                 String msg = new String(body, "utf-8");
    22                 System.out.println("[1] receive:" + msg);
    23                 try {
    24                     Thread.sleep(200);
    25                 }catch (Exception e){
    26                     e.printStackTrace();
    27                 }finally {
    28                     System.out.println("[done]");
    29                     channel.basicAck(envelope.getDeliveryTag(),false);
    30                 }
    31             }
    32         };
    33         boolean autoAck=false;
    34         channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    35     }
    36 }

    3.消费者二

     1 package com.mq.topic;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 
     6 import java.io.IOException;
     7 
     8 public class TopicReceive2 {
     9     private static final String EXCHANGE_NAME="test_exchange_topic";
    10     private static final String QUEUE_NAME="test_queue_topic_2";
    11     public static void main(String[] args)throws Exception{
    12         Connection connection= ConnectionUtil.getConnection();
    13         final Channel channel=connection.createChannel();
    14         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    15         //good.query.pro
    16         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"good.add.*");  //不可以
    17         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"good.query.*");//可以
    18         channel.basicQos(1);
    19         Consumer consumer=new DefaultConsumer(channel){
    20             @Override
    21             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    22                 String msg = new String(body, "utf-8");
    23                 System.out.println("[2] receive:" + msg);
    24                 try {
    25                     Thread.sleep(200);
    26                 }catch (Exception e){
    27                     e.printStackTrace();
    28                 }finally {
    29                     System.out.println("[done]");
    30                     channel.basicAck(envelope.getDeliveryTag(),false);
    31                 }
    32             }
    33         };
    34         boolean autoAck=false;
    35         channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    36     }
    37 }
  • 相关阅读:
    单调队列
    2019牛客暑期多校训练营(第一场)
    没有上司的舞会
    飞碟解除器
    最小费用最大流
    prim
    merge_sort
    CCF认证之——相反数
    CCF考试认证模拟练习——数字排序
    算法之分治法
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8620540.html
Copyright © 2011-2022 走看看