交换机属性
1. Name:交换机名称
2. Type:交换机类型 Direct 、 Topic 、 fanout 、headers
3. Durability:是否需要持久化,true为持久化
4. Auto Delete 当最后一个绑定到Exchange上的队列被删除后,自动删除该Exchange
(相关概念:代码里的队列的Auto Delete : 当此队列没有任何Exchange与其相关联的时候,自动删除此队列)
5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认是false
6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用
Binding - 绑定
1. Exchange和Exchange、Queue之间的连接关系
2. Bingding中可以包含RoutingKey或者参数
Queue - 消息队列
1. 消息队列,实际存储消息数据
2. Durability:是否持久化,Durable:是,Transient:否
3. Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除
Message - 消息
1. 服务器和应用程序之间传送的数据
2. 本质就是一段数据,由Properties和Payload(body)组成
3. 常用属性:delivery mode、headers(自定义属性)
4. content_type、content_encoding、priority
5. correlation_id(一般作为消息唯一id,eg:业务+时间字符串的拼接)、可以做ack,消息的路由,幂等时可以用到
6. reply_to:做重会队列的时候,可以指定消息失败了可以返回哪个队列。
7. expiration:消息的过期时间
8. message_id:消息id
9. timestamp、type、user_id、app_id、cluster_id(可以自定义取指定的属性)
设置了属性的生产者

/** * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了 * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可 */ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE_NAME = "test_exchange_topic"; private final static String ROUTING_KEY_NAME = "order.update"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //【设置属性参数】:AMQP.BasicProperties() start Map<String,Object> headers = new HashMap(); headers.put("id", 1); headers.put("name", "xiaochao"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .contentEncoding("UTF-8") .deliveryMode(2) .expiration("10000") .headers(headers) .build(); //【设置属性参数】:AMQP.BasicProperties() end //发送五条消息,属性参数properties for (int i = 0; i < 5; i++) { String message = "匹配insert" + i; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,false,false,properties,message.getBytes()); } channel.close(); connection.close(); System.out.println("game over"); } }
队列里有5条消息(此时未启动消费者端进行消费)
过了10秒钟后,消息失效了
启动解析了属性的消费者

import java.io.IOException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; private final static String ROUTING_KEY_NAME = "order.*";//order.# public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME); channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("消息的header是:" + properties.getHeaders()); System.out.println("消息的过期时间是:" + properties.getExpiration()); super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(new String(body,"UTF-8")); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
如何查看消费者启动了?
1. 消费者个数不为零
2. 有Connections
3. 有channel
Virtual host - 虚拟主机
1. 虚拟地址,用于进行逻辑隔离,最上层的消息路由,类似于Redis的db0 ~ db15
2. 一个Virtual Host里面可以有若干个Exchange和Queue
3. 同一个Virtual Host里面不能有相同名称的Exchange和Queue
本章小结:
RabbitMQ的概念、安装与使用、管控台操作、结合RabbitMQ特性、Exchange、Queue、Binding、RoutingKey、Message进行核心API讲解。通过本章的学习,希望大家对RabbitMQ有个初步的了解。