zoukankan      html  css  js  c++  java
  • 6、RabbitMQ-路由模式

    Exchange(交换机 转换器)

    Exchange分发消息时根据类型的不同分发策略有区别,
    目前共四种类型:direct、fanout、topic、headers 。
     
    一方面是接受生产者的消息,一方面是向队列推送消息
     
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException
    queueBind(String queue, String exchange, String routingKey) throws IOException
    “”匿名转发
     
    fanout:不处理路由键
    每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,
    只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有
    队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消
    息是最快的。

    direct
    消息中的路由键(routing key)如果和 Binding 中的 binding key 一致,
     交换器就将消息发到对应的队列中。路由键与队列名完全匹配,
    如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key
     标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。
    它是完全匹配、单播的模式

     

    topic
    topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑
    定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识
    别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词

     

     路由模式

     http://www.rabbitmq.com/tutorials/tutorial-four-java.html

     1、模型

     

    2、代码实践

     生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    public class Send {
         
         private static final String EXCHANGE_NAME =  "exchange_routing_direct";
         
         public static void main(String[] args) throws IOException,  TimeoutException {
               
               Connection conn = ConnectionUtils.getConnection();
               Channel channel = conn.createChannel();
               
               //exchange
               channel.exchangeDeclare(EXCHANGE_NAME, "direct");
               
               String msg = "hello direct";
               
               //绑定路由
               String routingKey = "error";
               channel.basicPublish(EXCHANGE_NAME, routingKey, null,  msg.getBytes());
               
               channel.close();
               conn.close();
         }
    }

    消费者1:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive {
        
        private static final String QUEUE_NAME="test_route";
        private static final String EXCHANGE_NAME = "exchange_routing_direct";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            channel.basicQos(1);
            
            //绑定队列到交换机转发器
            String routingKey = "error";
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
    
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者1接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者1处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }

    消费者2:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive2 {
        
        private static final String QUEUE_NAME="test_queue";
        private static final String EXCHANGE_NAME = "exchange_routing_direct";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            channel.basicQos(1);
            
            //绑定队列到交换机转发器
            //可以同时绑定多个
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
            
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者2接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者2处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    此时的测试只有在error的情况下两者均可收到信息
    在其他的模式下只有消费者2可以获取消息

     

  • 相关阅读:
    上百个Linux、BSD、Unix学习电子书+视频下载汇总
    测试管理杂谈之理论篇
    浅谈软件测试思维
    细节决定完美——色彩在测试工作中的应用
    图像视频测试基础
    [转]什么是三码、五码机?
    一组经典测试思想观点
    关于LCD分辨率、色素相关知识介绍
    QTP 学习视频汇总
    【诗词涂鸦】满江红 别情
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531318.html
Copyright © 2011-2022 走看看