zoukankan      html  css  js  c++  java
  • Rabbitmq高级特性-TTL消息与死信队列详解

    生成端发消息

    package com.bfxy.rabbitmq.api.delay;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ReturnListener;
    
    public class Producer4Delay {  
          
        public static void main(String[] args) throws Exception {  
              
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            //RabbitMQ-Server安装在本机,所以直接用127.0.0.1
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            Connection connection = connectionFactory.newConnection();
            //创建一个渠道  
            Channel channel = connection.createChannel() ;  
            
            //5 监听
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode,
                                        String replyText,
                                        String exchange,
                                        String routingKey,
                                        AMQP.BasicProperties properties,
                                        byte[] body)
                        throws IOException {
                    System.out.println("**************handleReturn**********");
                    System.out.println("replyCode: " + replyCode);
                    System.out.println("replyText: " + replyText);
                    System.out.println("exchange: " + exchange);
                    System.out.println("routingKey: " + routingKey);
                    System.err.println("properties:x-delay: " + properties.getHeaders().get("x-delay"));
                    System.out.println("body: " + new String(body));
                }
            });
            Map<String, Object> headers2 = new HashMap<String, Object>();
            headers2.put("x-delay", 5000L);
            AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
            channel.basicPublish("delay.exchange", "delay.1111", true, props2.build(), "hello two".getBytes());
    //        channel.close();   
    //        connection.close();   
               
        }  
      
    }  
    View Code

    消费端

    package com.bfxy.rabbitmq.api.delay;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    /**
     * <B>中文类名:</B>Consumer4Delay<BR>
     * <B>概要说明:</B><BR>
     * @author bhz(Alienware)
     * @since 2016年7月6日
     */
    public class Consumer4Delay {  
      
        public static void main(String[] args) throws Exception {  
              
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
              
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setAutomaticRecoveryEnabled(true);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel() ;  
            
            //延迟队列
            Map<String, Object> map = new HashMap<String, Object>();
            map.put("x-delayed-type", "topic");
            channel.exchangeDeclare("delay.exchange", "x-delayed-message", true, false, map);
            channel.queueDeclare("delay.queue", true, false, false, null);
            channel.queueBind("delay.queue", "delay.exchange", "delay.#");
            QueueingConsumer consumer = new QueueingConsumer(channel) ;
            channel.basicConsume("delay.queue", false, consumer) ; 
            System.out.println("---------consume queue---------"); 
            while(true){  
                Delivery delivery = consumer.nextDelivery() ; 
                String msg = new String(delivery.getBody()) ;  
                try {
                    System.out.println("received: " + msg); 
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            }  
              
        }  
          
    }  
    View Code
  • 相关阅读:
    左右切换+焦点图
    php中的preg系列函数
    php中的修饰符
    换行符‘ ’和回车符‘ ’
    已经安装php后,再增加扩展模块(不重新编辑php)
    什么是php?以及mysqlnd与libmysqlclient
    cli下的php(并传递参数)
    lnmp安装--php与nginx结合
    FastCgi与PHP-fpm关系[转] 读完本文瞬间明朗了很多
    epoll和select区别
  • 原文地址:https://www.cnblogs.com/callbin/p/14543796.html
Copyright © 2011-2022 走看看