Message (消息) 是指服务器和应用程序之间传输的数据,它由 Properties 和 Payload (Body) 组成。
一、消息属性
1.1 消息常用属性
属性名称 | 属性含义 |
---|---|
Delivery mode | 是否持久化,1:Non-persistent,2:Persistent |
headers | 自定义属性 |
content_type | 消息内容的类型 |
content_encoding | 消息内容的编码格式 |
priority | 消息的优先级 |
correlation_id | 关联id |
reply_to | 用于指定回复的队列的名称 |
expiration | 消息的失效时间 |
message_id | 消息id |
timestamp | 消息的时间戳 |
type | 类型 |
user_id | 用户id |
app_id | 应用程序id |
cluster_id | 集群id |
1.2 实例
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
String exchangeType = "topic";
// 4.声明交换机
String exchangeName = "messageExchange";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
// 5.申明队列
String queueName = "messageQueue";
channel.queueDeclare(queueName, false, false, false, null);
// 6.将交换机和队列进行绑定关系
String routingKey1 = "#";
channel.queueBind(queueName, exchangeName, routingKey1);
// 7.循环消费
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
System.err.println("消费端启动");
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端消费: " + msg);
System.out.println("消息持久化属性:" + delivery.getProperties().getDeliveryMode());
System.out.println("消息编码格式:" + delivery.getProperties().getContentEncoding());
System.out.println("消息过期时间:" + delivery.getProperties().getExpiration());
System.out.println("==========自定义属性值==========");
Map<String, Object> headers = delivery.getProperties().getHeaders();
for (String key : headers.keySet()) {
System.out.println("key: " + key + " ,value: " + headers.get(key));
}
}
}
}
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机
connectionFactory.setHost("111.231.83.100");
// 设置端口
connectionFactory.setPort(5672);
// 设置虚拟主机
connectionFactory.setVirtualHost("/");
// 2.获取一个连接对象
final Connection connection = connectionFactory.newConnection();
// 3.创建 Channel
final Channel channel = connection.createChannel();
// 4.声明交换机
String exchangeName = "messageExchange";
// 5.设置消息属性
Map<String, Object> headers = new HashMap<>();
headers.put("key1", 1);
headers.put("key2", 2);
headers.put("key3", 3);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
// 设置消息是否持久化
.deliveryMode(2)
// 设置消息编码格式
.contentEncoding("UTF-8")
// 设置消息过期时间
.expiration("10000")
// 设置自定义属性
.headers(headers)
.build();
// 6.发送消息
channel.basicPublish(exchangeName, "message", properties, "拥有有属性的消息".getBytes());
// 7.关闭资源
channel.close();
connection.close();
connectionFactory.clone();
}
}
先启动消费者,然后启动生产者,查看控制台输出:
消费端启动
消费端消费: 拥有有属性的消息
消息持久化属性:2
消息编码格式:UTF-8
消息过期时间:10000
==========自定义属性值==========
key: key1 ,value: 1
key: key2 ,value: 2
key: key3 ,value: 3
二、消息应用
2.1 TTL 消息
对于消息来说可以为它设置一个过期时间,如果超过这个时间还没有被消费的话就自动删除。
在代码中有两种方法设置某个队列的消息过期时间:
- 针对队列来说,可以使用 x-message-ttl 参数设置当前队列中所有消息的过期时间,即当前队列中所有的消息过期时间都一样;
- 针对单个消息来说,在发布消息时,可以使用 Expiration 参数来设置单个消息的过期时间。
以上两个参数的单位都是毫秒,即1000毫秒为1秒。如果以上两个都设置,则以当前消息最短的那个过期时间为准。
2.2 延时消息
RabbitMQ 不支持延时消息的使用,可以采用以下方式实现:
-
先存储到数据库,用定时任务扫描,登记时刻+延时时间,就是需要投递的时刻;
-
利用 RabbitMQ 的死信队列实现;
-
利用 rabbitmq-delayed-message-exchange 插件;