zoukankan      html  css  js  c++  java
  • RabbitDemo —— 延迟队列(在队列上设置TTL)

    Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer

    Procedure:

    public class Procedure {
        private static final String queue_name = "test.queue";
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = Common.getFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /**
             * RabbitMQ实现延迟队列一:在队列上设置TTL
             * Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer
             */
            channel.exchangeDeclare("delay.exchange", "topic", true, false, false, null);
            //延迟队列start
            Map<String,Object> map = new HashMap<String,Object>();
            map.put("x-message-ttl", 10000);//消息过期时间
            map.put("x-max-length", 500000);//最大积压的消息个数
            map.put("x-dead-letter-exchange", "delay.exchange");//消息过期后会投递到delay.exchange
            channel.queueDeclare("delay.5m.queue", true, false, false, map);
            channel.exchangeDeclare("delaysync.exchange", "topic", true, false, false, null);
            channel.queueBind("delay.5m.queue", "delaysync.exchange", "deal.message");
            //正常队列
            channel.queueDeclare(queue_name, true, false, false, null);
            channel.queueBind(queue_name, "delay.exchange", "deal.message");
            
            System.out.println("send start");
            
            String msg = "hello!+"+new Date().toString();
            channel.basicPublish("delaysync.exchange", "deal.message", false, false, null, msg.getBytes());
            System.out.println("send msg:"+msg);
            
            channel.close();
            connection.close();
        }
    }
    View Code

    Consumers:

    public class Consumers {
        private static String queue_name = "test.queue";  
        
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = Common.getFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            /**
             * RabbitMQ实现延迟队列一:在队列上设置TTL
             * Publish --> delaysync.exchange --> delay.5m.queue(延迟队列) --> delay.exchange --> test.queue(正常队列) --> Consumer
             */
            channel.exchangeDeclare("delay.exchange", "topic", true, false, false, null);
            //延迟队列start
            Map<String,Object> map = new HashMap<String,Object>();
            map.put("x-message-ttl", 10000);//消息过期时间
            map.put("x-max-length", 500000);//最大积压的消息个数
            map.put("x-dead-letter-exchange", "delay.exchange");//消息过期后会投递到delay.exchange
            channel.queueDeclare("delay.5m.queue", true, false, false, map);
            channel.exchangeDeclare("delaysync.exchange", "topic", true, false, false, null);
            channel.queueBind("delay.5m.queue", "delaysync.exchange", "deal.message");
            //正常队列
            channel.queueDeclare(queue_name, true, false, false, null);
            channel.queueBind(queue_name, "delay.exchange", "deal.message");
            
            System.out.println("receive start");
            
            Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    System.out.println("receive msg:"+new String(body));
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(queue_name, false, consumer);
        }
    }
    View Code
  • 相关阅读:
    无法在WEB服务器上启动调试
    Zedgraph悬停时显示内容闪烁的解决
    用ZedGraph控件作图圆
    34.node.js之Url & QueryString模块
    33.Node.js 文件系统fs
    32.Node.js中的常用工具类util
    31.Node.js 常用工具 util
    30.Node.js 全局对象
    28.Node.js 函数和匿名函数
    27.Node.js模块系统
  • 原文地址:https://www.cnblogs.com/yifanSJ/p/9022422.html
Copyright © 2011-2022 走看看