zoukankan      html  css  js  c++  java
  • Rabbit mq订阅方式获取消息并可设置持久化

    Rabbit 通过方式获取消息:订阅方式事实上是向queue注冊consumer,通过rpc向queue server发送注冊consumer的消息。rabbitMQ Server在收到消息后,依据消息的内容类型推断这是一个订阅消息。这样当MQ 中queue有消息时,会自己主动把消息通过该socket(长连接)通道发送出去。

    能够通过

    channel.basicQos(1); 
    设置RabbitMQ调度分发消息的方式。也就是告诉RabbitMQ每次仅仅给消费者处理一条消息。也就是等待消费者处理完而且已经对刚才处理的消息进行确认之后, 才发送下一条消息。防止消费者太过于忙碌。例如以下图所看到的:

     

    整理代码例如以下:


    Produce

    public class RabbitMQProduce {
    	public static void main(String[] args) throws IOException, InterruptedException {
    		ConnectionFactory factory =new ConnectionFactory();
    		String routingKey="test";
    		String exchange="test";
    	    factory.setHost("localhost");
    	    Connection conn = factory.newConnection();
    	    Channel channel =conn.createChannel();
    	    
    	    //发送消息
    	    for(int i=0;i<8000;i++){
    	    	if(i%5==0){
    	    		Thread.sleep(200);
    	    	}
    	    	byte[] messageBodyBytes =(i+"").getBytes();
    	    	//假设将队列设置为持久化之后,还须要将消息也设为可持久化的。MessageProperties.PERSISTENT_TEXT_PLAIN
    	    	//也就是将队列设置为持久化之后。还须要将发送的消息也要设置为持久化才干保证队列和消息一直存在
    	    	//消费者在声明时也要做持久化声明
    		    channel.basicPublish(exchange, routingKey, null, messageBodyBytes);
    		    System.out.println("发送.."+i);
    	    }
    	    channel.close();  
    	    conn.close();
    	}
    }


    Customer

    public class RabbitMqCustomer {
    	private static ConnectionFactory factory;
    	private static String QueryName="test";
    	private static Connection conn;
    	private static Channel channel;
    	private static String exchange="test";
    	private static String routingKey="test";
    	public static void main(String[] args) throws Exception {
    		start();
    		/**
    		 * 採用订阅的方式获取消息
    		 */
            channel.basicConsume(QueryName, false, new DefaultConsumer(channel){
            	@Override
            	public void handleShutdownSignal(String consumerTag,
            			ShutdownSignalException sig) {
            	System.out.println("==="+consumerTag+"====="+sig.getMessage());
            	boolean isOpenConnect = conn!=null&&conn.isOpen();
            	boolean isOpenChannel = channel != null && channel.isOpen();
            	while(!isOpenChannel||!isOpenConnect){
            		try {
            			System.out.println("连接失败重连接....");
    					start();
    					Thread.sleep(3000);
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
            	  }
            	}
            	
            	@Override
            	public void handleDelivery(String consumerTag, Envelope envelope,
            			BasicProperties properties, byte[] body) throws IOException {
            		//消息序号
            		long deliveryTag = envelope.getDeliveryTag();
            		String mes = new String(body,"UTF-8");
            		System.out.println("接受到消息:"+mes);
            		//确认收到,消息回执
            		channel.basicAck(deliveryTag, true);
            	}
            });
    	}
    
    	public static  void start() throws IOException {
    		factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		factory.setUsername("test");
    		factory.setPassword("test");
    		conn = factory.newConnection();
    		channel = conn.createChannel();
    		channel.exchangeDeclare(exchange, "topic");
    		channel.queueDeclare(QueryName, false, false, false, null);//声明消息队列,且为可持久化的
    		channel.queueBind(QueryName, exchange, routingKey);
    		channel.basicQos(1); //消息分发处理
    	}
    }
    


  • 相关阅读:
    Android直方图递增View
    分析实现Android自定义View之扇形图
    可折叠的ToolBar+抽屉菜单NavigationView+浮动按钮FloatButton
    走着官方的教程入门Material Design(一)
    AndroidStudio — Error:Failed to resolve: junit:junit:4.12错误解决
    Win10提示没有权限使用网络资源问题解决
    Android Socket连接PC出错问题及解决
    Android Studio —— 创建Menu菜单项
    Eclipse出现"Running Android Lint has encountered a problem"解决方案
    关于leal和mov
  • 原文地址:https://www.cnblogs.com/brucemengbm/p/7380173.html
Copyright © 2011-2022 走看看