zoukankan      html  css  js  c++  java
  • RabbitMQ 第三课 Exchange交换机详解

    交换机属性

    1. Name:交换机名称

    2. Type:交换机类型 Direct 、 Topic 、 fanout 、headers

    3. Durability:是否需要持久化,true为持久化

    4. Auto Delete 当最后一个绑定到Exchange上的队列被删除后,自动删除该Exchange

         (相关概念:代码里的队列的Auto Delete : 当此队列没有任何Exchange与其相关联的时候,自动删除此队列)

    5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认是false

    6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用

     

    Binding - 绑定

    1. Exchange和Exchange、Queue之间的连接关系

    2. Bingding中可以包含RoutingKey或者参数

    Queue - 消息队列

    1. 消息队列,实际存储消息数据

    2. Durability:是否持久化,Durable:是,Transient:否

    3. Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

    Message - 消息

    1. 服务器和应用程序之间传送的数据

    2. 本质就是一段数据,由Properties和Payload(body)组成

    3. 常用属性:delivery mode、headers(自定义属性)

    4. content_type、content_encoding、priority

    5. correlation_id(一般作为消息唯一id,eg:业务+时间字符串的拼接)、可以做ack,消息的路由,幂等时可以用到

    6. reply_to:做重会队列的时候,可以指定消息失败了可以返回哪个队列。

    7. expiration:消息的过期时间

    8. message_id:消息id

    9. timestamp、type、user_id、app_id、cluster_id(可以自定义取指定的属性)


     设置了属性的生产者

    /**
     * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了
     * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可
     */
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Producer {
        
         private static final String EXCHANGE_NAME = "test_exchange_topic";
         private final static String ROUTING_KEY_NAME = "order.update";
         
         public static void main(String[] args) throws IOException, TimeoutException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             //声明交换机
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             
             //【设置属性参数】:AMQP.BasicProperties() start
             Map<String,Object> headers = new HashMap();
             headers.put("id", 1);
             headers.put("name", "xiaochao");
             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                     .contentEncoding("UTF-8")
                     .deliveryMode(2)
                     .expiration("10000")
                     .headers(headers)
                     .build();
             //【设置属性参数】:AMQP.BasicProperties() end
             
             //发送五条消息,属性参数properties
             for (int i = 0; i < 5; i++) {
                 String message = "匹配insert" + i;
                 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,false,false,properties,message.getBytes());
             }
             channel.close();
             connection.close();
             System.out.println("game over");
         }
     }
    Producer.java

    队列里有5条消息(此时未启动消费者端进行消费)

      过了10秒钟后,消息失效了

     

    启动解析了属性的消费者

    import java.io.IOException;
    
    import com.everjiankang.dependency.ConnectionUtil;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
     
     public class Consumer1 {
         
         private static final String EXCHANGE_NAME = "test_exchange_topic";
         private  static final String QUEUE_NAME = "test_queue_topic_1";
         private final static String ROUTING_KEY_NAME = "order.*";//order.#
     
         public static void main(String[] args) throws IOException {
             Connection connection = ConnectionUtil.getConnection();
             Channel channel = connection.createChannel();
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             channel.exchangeDeclare(EXCHANGE_NAME,"topic");
             channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME);
             channel.basicQos(1);
             Consumer consumer = new DefaultConsumer(channel) {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                     throws IOException {
                     System.out.println("消息的header是:" + properties.getHeaders());
                     System.out.println("消息的过期时间是:" + properties.getExpiration());
                     super.handleDelivery(consumerTag, envelope, properties, body);
                     System.out.println(new String(body,"UTF-8"));
                 }
             };
             channel.basicConsume(QUEUE_NAME,true,consumer);
         }
     }
    Consumer1.java

     如何查看消费者启动了?

    1. 消费者个数不为零

     2. 有Connections

     

    3. 有channel

     

    Virtual host - 虚拟主机

    1. 虚拟地址,用于进行逻辑隔离,最上层的消息路由,类似于Redis的db0 ~ db15

    2. 一个Virtual Host里面可以有若干个Exchange和Queue

    3. 同一个Virtual Host里面不能有相同名称的Exchange和Queue

    本章小结:

    RabbitMQ的概念、安装与使用、管控台操作、结合RabbitMQ特性、Exchange、Queue、Binding、RoutingKey、Message进行核心API讲解。通过本章的学习,希望大家对RabbitMQ有个初步的了解。

  • 相关阅读:
    JSON 操作
    生成下面的模块时,启用了优化或没有调试信息
    Emacs
    Integration rules
    Testing tools
    软件架构(读书笔记1)
    依赖于自己做计算
    POJO
    软件架构(读书笔记2)
    设计得不好
  • 原文地址:https://www.cnblogs.com/guchunchao/p/13097586.html
Copyright © 2011-2022 走看看