Publish --> default exchange --> delay_queue(延迟队列) --> amq.direct --> message_ttl_queue(正常队列) --> Consumer
Procedure:

public class Procedure { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * RabbitMQ实现延迟队列二:在消息上设置TTL * Publish --> default exchange --> delay_queue(延迟队列) --> amq.direct --> message_ttl_queue(正常队列) --> Consumer */ //延迟队列 Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("x-dead-letter-exchange", "amq.direct");//消息过期后会投递到amq.direct arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");//出现dead letter之后将重新按照指定的routing-key发送 channel.queueDeclare("delay_queue", true, false, false, arguments); //正常队列 channel.queueDeclare(queue_name, true, false, false, null); //绑定路由 channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); String msg = "hello:"+System.currentTimeMillis(); //设置延迟属性 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); //deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值) AMQP.BasicProperties properties = builder.expiration("10000").deliveryMode(2).build(); channel.basicPublish("", "delay_queue", properties, msg.getBytes()); System.out.println("send msg:"+msg); channel.close(); connection.close(); } }
Consumers:

public class Consumers { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * RabbitMQ实现延迟队列二:在消息上设置TTL * Publish --> default exchange --> delay_queue(延迟队列) --> amq.direct --> message_ttl_queue(正常队列) --> Consumer */ //延迟队列 Map<String,Object> arguments = new HashMap<String,Object>(); arguments.put("x-dead-letter-exchange", "amq.direct");//消息过期后会投递到amq.direct arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");//出现dead letter之后将重新按照指定的routing-key发送 channel.queueDeclare("delay_queue", true, false, false, arguments); //正常队列 channel.queueDeclare(queue_name, true, false, false, null); //绑定路由 channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); System.out.println("receive start"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("receive msg:"+new String(body)); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue_name, false, consumer); } }