zoukankan      html  css  js  c++  java
  • Rabbitmq高级特性-TTL消息与死信队列详解

    生成端发消息

    package com.bfxy.rabbitmq.api.delay;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ReturnListener;
    
    public class Producer4Delay {  
          
        public static void main(String[] args) throws Exception {  
              
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            Connection connection = connectionFactory.newConnection();
            //创建一个渠道  
            Channel channel = connection.createChannel() ;  
            
            //5 监听
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode,
                                        String replyText,
                                        String exchange,
                                        String routingKey,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                        throws IOException {
                    System.out.println("**************handleReturn**********");
                    System.out.println("replyCode: " + replyCode);
                    System.out.println("replyText: " + replyText);
                    System.out.println("exchange: " + exchange);
                    System.out.println("routingKey: " + routingKey);
                    System.err.println("properties:x-delay: " + properties.getHeaders().get("x-delay"));
                    System.out.println("body: " + new String(body));
                }
            });
            Map<String, Object> headers2 = new HashMap<String, Object>();
            headers2.put("x-delay", 5000L);
            AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
            channel.basicPublish("delay.exchange", "delay.1111", true, props2.build(), "hello two".getBytes());
    //        channel.close();   
    //        connection.close();   
               
        }  
      
    }  
    View Code

    消费端

    package com.bfxy.rabbitmq.api.delay;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    /**
     * <B>中文类名:</B>Consumer4Delay<BR>
     * <B>概要说明:</B><BR>
     * @author bhz(Alienware)
     * @since 2016年7月6日
     */
    public class Consumer4Delay {  
      
        public static void main(String[] args) throws Exception {  
              
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
              
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel() ;  
            
            //延迟队列
            Map<String, Object> map = new HashMap<String, Object>();
            map.put("x-delayed-type", "topic");
            channel.exchangeDeclare("delay.exchange", "x-delayed-message", true, false, map);
            channel.queueDeclare("delay.queue", true, false, false, null);
            channel.queueBind("delay.queue", "delay.exchange", "delay.#");
            QueueingConsumer consumer = new QueueingConsumer(channel) ;
            channel.basicConsume("delay.queue", false, consumer) ; 
            System.out.println("---------consume queue---------"); 
            while(true){  
                Delivery delivery = consumer.nextDelivery() ; 
                String msg = new String(delivery.getBody()) ;  
                try {
                    System.out.println("received: " + msg); 
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            }  
              
        }  
          
    }  
    View Code
  • 相关阅读:
    BZOJ2140: 稳定婚姻(tarjan解决稳定婚姻问题)
    BZOJ2124: 等差子序列(树状数组&hash -> bitset 求是否存在长度为3的等差数列)
    HDU 1217 Arbitrage(Bellman-Ford判断负环+Floyd)
    HDU 2112 Today(Dijkstra+map)
    HDU 2066 一个人的旅行(dijkstra水题+判重边)
    POJ 1511 Invitation Cards(Dijkstra(优先队列)+SPFA(邻接表优化))
    HDU 2544 最短路(floyd+bellman-ford+spfa+dijkstra队列优化)
    POJ 2431 Expedition (贪心 + 优先队列)
    POJ 3253 Fence Repair(哈夫曼编码)
    优先队列的使用(转)
  • 原文地址:https://www.cnblogs.com/callbin/p/14543796.html
Copyright © 2011-2022 走看看