zoukankan      html  css  js  c++  java
  • Rabbitmq高级特性-消费端特性讲解_流控服务和ACK重回队列

     代码演示

        生成端

    package com.bfxy.rabbitmq.api.limit;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Sender {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String queueName = "test001";  
            //参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
            channel.queueDeclare(queueName, true, false, false, null);
            
            Map<String, Object> headers = new HashMap<String, Object>();
            
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
            .deliveryMode(2)
            .contentEncoding("UTF-8")
            .headers(headers).build();
            
            
            for(int i = 0; i < 5;i++) {
                String msg = "Hello World RabbitMQ " + i;
                channel.basicPublish("", queueName , props , msg.getBytes());             
            }
        }
        
    }
    View Code

      消费端

      

    package com.bfxy.rabbitmq.api.limit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.76");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            
            String queueName = "test001";  
            //    durable 是否持久化消息
            channel.queueDeclare(queueName, true, false, false, null);  
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            channel.basicQos(0, 1, false);
            //    参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, false, consumer);  
            //    循环获取消息  
            while(true){  
                //    获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } 
        }
    }
    View Code

     生成端

       

    package com.bfxy.rabbitmq.api.requeue;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Sender {
    
        
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String queueName = "test001";  
            //参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数
            channel.queueDeclare(queueName, true, false, false, null);
            
            for(int i = 0; i < 5;i++) {
                String msg = "Hello World RabbitMQ " + i;
                Map<String, Object> headers = new HashMap<String, Object>();
                headers.put("flag", i);
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers).build();
                channel.basicPublish("", queueName , props , msg.getBytes());             
            }
        }
        
    }
    View Code

    消费端

    package com.bfxy.rabbitmq.api.requeue;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Receiver {
    
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            
            connectionFactory.setHost("192.168.11.71");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            Connection connection = connectionFactory.newConnection();
            
            Channel channel = connection.createChannel();  
            
            String queueName = "test001";  
            //durable 是否持久化消息
            channel.queueDeclare(queueName, true, false, false, null);  
            QueueingConsumer consumer = new QueueingConsumer(channel);
            
            //    参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, false, consumer);  
            //    循环获取消息  
            while(true){  
                //    获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
                Thread.sleep(1000);
                
                if((Integer)delivery.getProperties().getHeaders().get("flag") == 0) {
                    //throw new RuntimeException("异常");
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                } else {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            } 
        }
    }
    View Code
  • 相关阅读:
    查看linux cpu和内存利用率__linux - top命令
    Maven仓库管理Nexus(转帖后加强版)
    实现系统菜单的两种方式
    使用Iterator遍历数组
    Android自定义退出弹出框
    AsyncTask的学习
    Android中常用到的权限
    Java集合
    Android中以文件的形式保存数据
    Android仿微信的开机滑动界面
  • 原文地址:https://www.cnblogs.com/callbin/p/14543513.html
Copyright © 2011-2022 走看看