zoukankan      html  css  js  c++  java
  • RabbitMQ消息

    Message (消息) 是指服务器和应用程序之间传输的数据,它由 PropertiesPayload (Body) 组成。

    一、消息属性

    1.1 消息常用属性

    属性名称 属性含义
    Delivery mode 是否持久化,1:Non-persistent,2:Persistent
    headers 自定义属性
    content_type 消息内容的类型
    content_encoding 消息内容的编码格式
    priority 消息的优先级
    correlation_id 关联id
    reply_to 用于指定回复的队列的名称
    expiration 消息的失效时间
    message_id 消息id
    timestamp 消息的时间戳
    type 类型
    user_id 用户id
    app_id 应用程序id
    cluster_id 集群id

    1.2 实例

    消费者

    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
            String exchangeType = "topic";
    
            // 4.声明交换机
            String exchangeName = "messageExchange";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
    
            // 5.申明队列
            String queueName = "messageQueue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 6.将交换机和队列进行绑定关系
            String routingKey1 = "#";
            channel.queueBind(queueName, exchangeName, routingKey1);
    
            // 7.循环消费
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(queueName, true, queueingConsumer);
            System.err.println("消费端启动");
            while (true) {
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.err.println("消费端消费: " + msg);
    
                System.out.println("消息持久化属性:" + delivery.getProperties().getDeliveryMode());
                System.out.println("消息编码格式:" + delivery.getProperties().getContentEncoding());
                System.out.println("消息过期时间:" + delivery.getProperties().getExpiration());
                System.out.println("==========自定义属性值==========");
                Map<String, Object> headers = delivery.getProperties().getHeaders();
                for (String key : headers.keySet()) {
                    System.out.println("key: " + key + " ,value: " + headers.get(key));
                }
    
    
            }
        }
    }
    

    生产者:

    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1.创建连接工厂对象
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置主机
            connectionFactory.setHost("111.231.83.100");
            // 设置端口
            connectionFactory.setPort(5672);
            // 设置虚拟主机
            connectionFactory.setVirtualHost("/");
    
            // 2.获取一个连接对象
            final Connection connection = connectionFactory.newConnection();
    
            // 3.创建 Channel
            final Channel channel = connection.createChannel();
    
    
            // 4.声明交换机
            String exchangeName = "messageExchange";
            // 5.设置消息属性
            Map<String, Object> headers = new HashMap<>();
            headers.put("key1", 1);
            headers.put("key2", 2);
            headers.put("key3", 3);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    // 设置消息是否持久化
                    .deliveryMode(2)
                    // 设置消息编码格式
                    .contentEncoding("UTF-8")
                    // 设置消息过期时间
                    .expiration("10000")
                    // 设置自定义属性
                    .headers(headers)
                    .build();
            // 6.发送消息
            channel.basicPublish(exchangeName, "message", properties, "拥有有属性的消息".getBytes());
    
            // 7.关闭资源
            channel.close();
            connection.close();
            connectionFactory.clone();
        }
    }
    

    先启动消费者,然后启动生产者,查看控制台输出:

    消费端启动
    消费端消费: 拥有有属性的消息
    消息持久化属性:2
    消息编码格式:UTF-8
    消息过期时间:10000
    ==========自定义属性值==========
    key: key1 ,value: 1
    key: key2 ,value: 2
    key: key3 ,value: 3
    

    二、消息应用

    2.1 TTL 消息

    对于消息来说可以为它设置一个过期时间,如果超过这个时间还没有被消费的话就自动删除。

    在代码中有两种方法设置某个队列的消息过期时间:

    • 针对队列来说,可以使用 x-message-ttl 参数设置当前队列中所有消息的过期时间,即当前队列中所有的消息过期时间都一样;
    • 针对单个消息来说,在发布消息时,可以使用 Expiration 参数来设置单个消息的过期时间。

    以上两个参数的单位都是毫秒,即1000毫秒为1秒。如果以上两个都设置,则以当前消息最短的那个过期时间为准。

    2.2 延时消息

    RabbitMQ 不支持延时消息的使用,可以采用以下方式实现:

    • 先存储到数据库,用定时任务扫描,登记时刻+延时时间,就是需要投递的时刻;

    • 利用 RabbitMQ 的死信队列实现;

    • 利用 rabbitmq-delayed-message-exchange 插件;

  • 相关阅读:
    3、二进制的秘闻和不同进制间的转换
    Hello World!
    HDU5883 The Best Path(欧拉回路 | 通路下求XOR的最大值)
    Codeforces 722C(并查集 + 思维)
    Floyd 算法求多源最短路径
    vim 配置
    STL容器 -- Vector
    STL容器 -- Bitset
    HDU 5707 Combine String(动态规划)
    HDU 5876 Sparse Graph(补图上BFS)
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/13265123.html
Copyright © 2011-2022 走看看