zoukankan      html  css  js  c++  java
  • 7、RabbitMQ-主题模式

    1、模式图

    发送到主题交换的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。
    单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“ stock.usd.nyse ”,“ nyse.vmw”,“ quick.orange.rabbit ”。路由密钥中可以包
    含任意数量的单词,最多可达255个字节。
     
    绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换- 使用特定路
    由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是绑定键有两个重
    要的特殊情况:
    *(星号)可以替代一个单词。
    #(hash)可以替换零个或多个单词。

     类型是topic

    2、实践 

     生产者

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Send {
        
        private static final String EXCHANGE_NAME = "exchange_topic";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            
            String msg = "商品.....";
            
            //绑定路由
            String routingKey = "goods.add";
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
            
            channel.close();
            conn.close();
            
        }
    
    }

     消费者1:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive {
        
        private static final String QUEUE_NAME="test_topic1";
        private static final String EXCHANGE_NAME = "exchange_topic";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            
            Connection conn = ConnectionUtils.getConnection();
            
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            channel.basicQos(1);
            
            //绑定队列到交换机转发器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
    
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者1接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者1处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }

     消费者2:

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.util.ConnectionUtils;
    
    public class Receive2 {
        
        private static final String QUEUE_NAME="test_topic2";
        private static final String EXCHANGE_NAME = "exchange_topic";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection conn = ConnectionUtils.getConnection();
            Channel channel = conn.createChannel();
            
            //队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            
            //绑定队列到交换机转发器
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
            
                    //定义一个消费者
                    Consumer consumer = new DefaultConsumer(channel){
                        //收到消息就会触发这个方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                                throws IOException {
                            String msg = new String(body,"utf-8");
                            System.out.println("消费者2接收到的消息" + msg);
                            
                            try {
                                Thread.sleep(1500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally{
                                System.out.println("消费者2处理完成!");
                                //手动回执
                                channel.basicAck(envelope.getDeliveryTag(), false);
                            }
                        }
                    };
                    //监听队列
                    //自动应答false
                    boolean autoAck = false;
                    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
    }
    此时如果生产者的String routingKey = "goods.add";
    此时2个消费者都可以收到消息
    若:String routingKey = "goods.del";
    此时只有消费者2收到消息

  • 相关阅读:
    Fetch the result from result set
    Variable shadowing
    What is Servlet Container
    What is a servletcontext
    Inner Class
    Java中HashMap和TreeMap的区别深入理解
    枚举类
    (转载) 安卓开发学习笔记
    【C和C++】停车场系统
    【算法】常用的排序方法
  • 原文地址:https://www.cnblogs.com/Mrchengs/p/10531483.html
Copyright © 2011-2022 走看看