zoukankan      html  css  js  c++  java
  • RabbitMQ之Exchange(五)

    Exchange总共有4种类型:direct, topic, fanout and headers,这里主要介绍前三种。

    direct转发规则是根据routing key(由消费者来绑定,可以简单理解为消费者感兴趣的事件,如新闻中的:娱乐,社会,汽车,国际),下图中routing key为info,waring,error

    生产者发送消息的时候,会指定消息对应的routing key(也就是指定消息的类型,如新闻中的:娱乐,那么所有关注娱乐新闻的人都能看到)

    topic也是根据binding key,但是多了" *,# "两个通配符(*匹配单个.号分开的单词,#匹配多个)

    发送quick.orange.rabbit或者lazy.orange.elephant,C1,C2均能收到

    发送quick.orange.fox只有C1能收到,发送lazy.brown.fox只有C2能收到

    lazy.pink.rabbit只会向C2发送一次

    quick.brown.fox不被匹配,所以直接丢弃。

    fanout转发规则是广播,即转发直连到这个exchange上的queue

     

    Exchange-direct

    Producer

     1 package direct;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 
     9 //direct为exchange的一种模式,主要是按照binding key来决定将消息放到哪些队列里
    10 //相对于广播模式fanout就是多了一层routingKey的匹配,向匹配的queue广播
    11 //大致流程时 producer -> exchange -> queue -> consumer
    12 //producer可以选择像哪个exchange中发送数据,同时可以指定routingKey,也可以显示声明queue
    13 //exchange则根据当前的模式决定将消息放到其关联的哪些queue里,广播就是全部,direct就是根据routingKey匹配,routingKey是exchange和queue之间的识别符
    14 //queue就是用来具体的存放数据的队列,其内的同一条消息只会发送给一个consumer(假设多个人绑定同一个queue),可以用来负载均衡
    15 public class Send {
    16     private static final String EXCHANGE_NAME = "direct_logs";
    17     public static void main(String[] args) {
    18         // TODO Auto-generated method stub
    19         foo();
    20     }
    21     
    22     private static void foo() {
    23         try {
    24             ConnectionFactory factory = new ConnectionFactory();
    25             factory.setHost("localhost");
    26             Connection connection = factory.newConnection();
    27             Channel channel = connection.createChannel();
    28             //声明一个直连的exchange
    29             channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    30             for (int i = 0; i < 3; i++) {
    31                 //设定了routing key为info1,info2,info3
    32                 String serverity = "info"+i;
    33                 String message = "info test"+i;
    34                 //在名为direct_logs的Exchange上绑定了三个routing key,并发送消息
    35                 channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes());
    36                 System.out.println("Send:"+message);
    37             }
    38             channel.close();
    39             connection.close();
    40         } catch (Exception e) {
    41             // TODO Auto-generated catch block
    42             e.printStackTrace();
    43         }
    44         
    45     }
    46 
    47 }

    Consumer

    package direct;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class Recv {
        private static final String EXCHANGE_NAME = "direct_logs";
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            args = new String[]{"info1","info2"};
            foo(args);
        }
    
        private static void foo(String[] args) {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                //声明一个直连的exchange,direct的意思是根据routing key进行转发
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                String queueName = channel.queueDeclare().getQueue();
                
                //这里将同一个queue绑定了info1,info2两个routing key(binding key),所以可以收到info1,info2的消息
                for(String severity : args) {
                    channel.queueBind(queueName, EXCHANGE_NAME, severity);
                    System.out.println("routingKey:"+severity);
                }
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String message = new String(body,"UTF-8");
                        System.out.println(queueName+" Recv "+message);
                    }
                };
                channel.basicConsume(queueName, true, consumer);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    }

    运行结果

    Procuder

    Send:info test0
    Send:info test1
    Send:info test2

    Consumer

    routingKey:info1
    routingKey:info2
    amq.gen-tMcTNq9K_Zp3tNLCoNFqEg Recv info test1
    amq.gen-tMcTNq9K_Zp3tNLCoNFqEg Recv info test2

    备注:这里需要先运行Consumer,然后运行Producer。因为Producer没有绑定Queue,所以如果Producer先运行,那么没有queue来存放数据,导致数据直接丢失。

    如下图:

    运行结果图解:(与上述代码操作上有些许差别)

    下图构造同上述代码,图中圆代表生产者,六边形代表Exchange,长方形代表queue,锯齿形代表消费者(下图中的两个是同一个消费者,队列也是同一个队列,当然可以是不同的队列,这里是为了符合代码)

    下面的info1,info2,info3正好代表三个routing key(其中info3没有绑定queue,这里是为了画图方便)

    连续发送两条routing key为info1的消息就是下图这种效果

      

    Exchange-topic

    Producer

     1 package topic;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 
     9 //topic与direct的区别是,topic绑定的routingKey支持通配符
    10 public class Send {
    11     private static final String EXCHANGE_NAME = "topic_logs";
    12     public static void main(String[] args) {
    13         // TODO Auto-generated method stub
    14         foo();
    15     }
    16     
    17     private static void foo() {
    18         try {
    19             ConnectionFactory factory = new ConnectionFactory();
    20             factory.setHost("localhost");
    21             Connection connection = factory.newConnection();
    22             Channel channel = connection.createChannel();
    23             channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    24             for (int i = 0; i < 3; i++) {
    25                 String serverity = "info.warn"+i;
    26                 String message = "info.warn"+i;
    27                 channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes());
    28                 System.out.println("Send:"+message);
    29             }
    30             channel.close();
    31             connection.close();
    32         } catch (Exception e) {
    33             // TODO Auto-generated catch block
    34             e.printStackTrace();
    35         }
    36         
    37     }
    38 
    39 }

    Consumer

     1 package topic;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 import com.rabbitmq.client.Consumer;
     9 import com.rabbitmq.client.DefaultConsumer;
    10 import com.rabbitmq.client.Envelope;
    11 import com.rabbitmq.client.AMQP.BasicProperties;
    12 
    13 public class Recv {
    14     private static final String EXCHANGE_NAME = "topic_logs";
    15     public static void main(String[] args) {
    16         // TODO Auto-generated method stub
    17         //args = new String[]{"#"};     //可以用来匹配所有的topic,#是指匹配0个或多个关键字
    18         //args = new String[]{"*"};     //只能匹配一个一个关键字,这里的关键字指的的以.分割的,如:send端topic为"info warn1"可以匹配,为"info.warn1"无法匹配
    19         args = new String[]{"info.*"};
    20         foo(args);
    21     }
    22 
    23     private static void foo(String[] args) {
    24         try {
    25             ConnectionFactory factory = new ConnectionFactory();
    26             factory.setHost("localhost");
    27             Connection connection = factory.newConnection();
    28             Channel channel = connection.createChannel();
    29             channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    30             String queueName = channel.queueDeclare().getQueue();
    31             
    32             for(String severity : args) {
    33                 channel.queueBind(queueName, EXCHANGE_NAME, severity);
    34                 System.out.println("routingKey:"+severity);
    35             }
    36             Consumer consumer = new DefaultConsumer(channel) {
    37                 @Override
    38                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    39                         byte[] body) throws IOException {
    40                     String message = new String(body,"UTF-8");
    41                     System.out.println(queueName+" Recv: "+message);
    42                 }
    43             };
    44             channel.basicConsume(queueName, true, consumer);
    45         } catch (Exception e) {
    46             // TODO Auto-generated catch block
    47             e.printStackTrace();
    48         }
    49         
    50     }
    51 }

    运行结果:

      Procuder

    Send:info.warn0
    Send:info.warn1
    Send:info.warn2

    Consumer

    routingKey:info.*
    amq.gen-n5I6CxBz00azD2KPnq7GcA Recv: info.warn0
    amq.gen-n5I6CxBz00azD2KPnq7GcA Recv: info.warn1
    amq.gen-n5I6CxBz00azD2KPnq7GcA Recv: info.warn2

    备注:这里需要先运行Consumer,然后运行Producer。因为Producer没有绑定Queue,所以如果Producer先运行,那么没有queue来存放数据,导致数据直接丢失。

    图解:

    这里由于工具问题,topic未能正常工作,所以使用广播模式绘制下图(导致看不到routing key,广播fanout不关心routing key),但是结果符合代码实际运行。

     

    Exchange-topic

     Producer

     1 package PubSub;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.ConnectionFactory;
     6 
     7 public class Send1 {
     8     private static final String QUEUE_NAME = "workqueue";
     9     private static final String EXCHANGE_NAME = "ex_log";
    10     public static void main(String[] args) {
    11         // TODO Auto-generated method stub
    12         foo();
    13     }
    14 
    15     /**
    16      *  测试Exchange的fanout类型:即广播模式,不关心queue和exchange的binding key。
    17      *  本例测试场景是发送端发送日志消息到mq,消费者打印实时日志
    18      *  所以这里没有为exchange绑定queue,这意味着如果生产者先发送数据,那么消息会丢失,消费者只能收到其起来之后生产者发送的消息
    19      *  因为只有queue才能存数据,本例中只有消费者声明了queue(绑定的是随机queue,生命周期同消费者)
    20      *  这样做的好处是,一个消费者一个queue,消费者挂了queue也会自动删除
    21      */
    22     private static void foo() {
    23         try{
    24             ConnectionFactory factory = new ConnectionFactory();
    25             factory.setHost("localhost");
    26             Connection connection = factory.newConnection();
    27             Channel channel = connection.createChannel();
    28             
    29             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    30             String message = "this is a log";
    31             //下面第一个参数时exchange的名字,之前为空"",表示为匿名,只要对端也为空""即可
    32             channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    33             System.out.println("Send:"+message);
    34             channel.close();
    35             connection.close();
    36         } catch(Exception e) {
    37             e.printStackTrace();
    38         }
    39     }
    40 }

    Consumer

     1 package PubSub;
     2 
     3 import java.io.IOException;
     4 
     5 import com.rabbitmq.client.Channel;
     6 import com.rabbitmq.client.Connection;
     7 import com.rabbitmq.client.ConnectionFactory;
     8 import com.rabbitmq.client.DefaultConsumer;
     9 import com.rabbitmq.client.Envelope;
    10 import com.rabbitmq.client.AMQP.BasicProperties;
    11 
    12 public class Recv1 {
    13     private static final String QUEUE_NAME = "workqueue";
    14     private static final String EXCHANGE_NAME = "ex_log";
    15     public static void main(String[] args) {
    16         foo();
    17     }
    18 
    19     private static void foo() {
    20         try {
    21             ConnectionFactory factory = new ConnectionFactory();
    22             factory.setHost("localhost");
    23             Connection connection = factory.newConnection();
    24             Channel channel = connection.createChannel();
    25             channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    26             //创建一个随机的queue(非持久化、独占的、随着client session断开而自动删除的)
    27             String queueName = channel.queueDeclare().getQueue();
    28             channel.queueBind(queueName, EXCHANGE_NAME, "");
    29             DefaultConsumer consumer = new DefaultConsumer(channel){
    30                 @Override
    31                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
    32                         byte[] body) throws IOException {
    33                     String message = new String(body,"UTF-8");
    34                     System.out.println("Recv:"+message);
    35                 }
    36             };
    37             //只能接收到绑定道queue之后producer生产的数据,如果生产者生产数据时没有queue绑定到exchange,那么消息自动丢弃
    38             channel.basicConsume(queueName, true, consumer);
    39         } catch (Exception e) {
    40             e.printStackTrace();
    41         }
    42     }
    43 }

    运行结果:

      Procuder

        Send:this is a log

      Consumer

        Recv:this is a log

       备注:先运行Consumer再运行Producer才能收到,因为这里Producer没有声明queue

    运行图解

    注意下面有个命名queue和随机queue的区别(个人理解命名,官方可能没有这两个概念)

      命名queue指的是Prodocer和Consumer都会去声明的Queue,即双方约定好的

      随机queue指Producer没有去声明这个queue,其也不知道这个queue的存在,由消费者动态声明,用以接收数据,好处是这个queue的生命周期同consumer(上述代码用的就是随机queue)

  • 相关阅读:
    gitlab: git clone/pull / push: The project you were looking for could not be found
    转载: MySQL启动出错InnoDB: Check that you do not already have another mysqld process解决方法
    root用户删除文件,提示:Operation not permitted
    使用dockerfile打包新镜像
    kubernets创建Deployment
    代理全家福
    Spring事务传播详解
    [FFmpeg]Centos7 yum安装
    [Redis]存放字典
    [Docker]开放2375端口
  • 原文地址:https://www.cnblogs.com/gc65/p/8993172.html
Copyright © 2011-2022 走看看