zoukankan      html  css  js  c++  java
  • RabbitMQ:TTL队列/消息&&死信队列

    一.TTL队列/消息

    TTL:Time To Live  生存时间

    RabbitMQ支持消息的过期时间,在消息发送时可以进行指定

    new AMQP.BasicProperties().builder().expiration("").build();

    RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。在声明队列的queueDeclare方法的arguments参数中设置。

     二.死信队列

    DLX:Dead Letter Exchange 死信队列

    利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

    消息变成死信的情况:

    1.消息被拒绝(basic.reject/basic.nack),并且requeue=false

    2.消息TTL过期。

    3.队列达到最大长度。

    DLX也是一个正常的Wxchange,和一般的Exchange没有区别,实际上就是设置队列的某一属性。

    当这个队列中有死信时,rabbitmq就会自动将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

    可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

    我们正常声明交换机、队列、绑定,只不过我们需要在队列上加一个参数即可,arguments.put("x-dead-letter-exchange",exchangeName),这样当有消息变成死信时,消息直接可以路由到该死信队列。

    生产端:

    public static void main(String[] args) throws Exception {

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.10.132");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();

    String exchange = "test_dlx_exchange";
    String routingKey = "dlx.save";

    String msg = "Hello RabbitMQ DLX Message";

    for(int i =0; i<1; i ++){

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)
    .contentEncoding("UTF-8")
    .expiration("10000")
    .build();
    channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
    }

    消费端:

    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.10.132");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/");

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();

    // 这就是一个普通的交换机 和 队列 以及路由
    String exchangeName = "test_dlx_exchange";
    String routingKey = "dlx.#";
    String queueName = "test_dlx_queue";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);

    Map<String, Object> agruments = new HashMap<String, Object>();
    agruments.put("x-dead-letter-exchange", "dlx.exchange");
    //这个agruments属性,要设置到声明队列上
    channel.queueDeclare(queueName, true, false, false, agruments);
    channel.queueBind(queueName, exchangeName, routingKey);

    //要进行死信队列的声明:
    channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
    channel.queueDeclare("dlx.queue", true, false, false, null);
    channel.queueBind("dlx.queue", "dlx.exchange", "#");

    channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
  • 相关阅读:
    2.12 使用@DataProvider
    2.11 webdriver中使用 FileUtils ()
    Xcode8 添加PCH文件
    The app icon set "AppIcon" has an unassigned child告警
    Launch Image
    iOS App图标和启动画面尺寸
    iPhone屏幕尺寸、分辨率及适配
    Xcode下载失败 使用已购项目页面再试一次
    could not find developer disk image
    NSDate与 NSString 、long long类型的相互转化
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/13032031.html
Copyright © 2011-2022 走看看