Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer
Procedure:
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
public class Procedure { private static final String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * RabbitMQ实现延迟队列一:在队列上设置TTL * Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer */ channel.exchangeDeclare("delay.exchange", "topic", true, false, false, null); //延迟队列start Map<String,Object> map = new HashMap<String,Object>(); map.put("x-message-ttl", 10000);//消息过期时间 map.put("x-max-length", 500000);//最大积压的消息个数 map.put("x-dead-letter-exchange", "delay.exchange");//消息过期后会投递到delay.exchange channel.queueDeclare("delay.5m.queue", true, false, false, map); channel.exchangeDeclare("delaysync.exchange", "topic", true, false, false, null); channel.queueBind("delay.5m.queue", "delaysync.exchange", "deal.message"); //正常队列 channel.queueDeclare(queue_name, true, false, false, null); channel.queueBind(queue_name, "delay.exchange", "deal.message"); System.out.println("send start"); String msg = "hello!+"+new Date().toString(); channel.basicPublish("delaysync.exchange", "deal.message", false, false, null, msg.getBytes()); System.out.println("send msg:"+msg); channel.close(); connection.close(); } }
Consumers:
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
public class Consumers { private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * RabbitMQ实现延迟队列一:在队列上设置TTL * Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer */ channel.exchangeDeclare("delay.exchange", "topic", true, false, false, null); //延迟队列start Map<String,Object> map = new HashMap<String,Object>(); map.put("x-message-ttl", 10000);//消息过期时间 map.put("x-max-length", 500000);//最大积压的消息个数 map.put("x-dead-letter-exchange", "delay.exchange");//消息过期后会投递到delay.exchange channel.queueDeclare("delay.5m.queue", true, false, false, map); channel.exchangeDeclare("delaysync.exchange", "topic", true, false, false, null); channel.queueBind("delay.5m.queue", "delaysync.exchange", "deal.message"); //正常队列 channel.queueDeclare(queue_name, true, false, false, null); channel.queueBind(queue_name, "delay.exchange", "deal.message"); System.out.println("receive start"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("receive msg:"+new String(body)); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue_name, false, consumer); } }