zoukankan      html  css  js  c++  java
  • Rabbit mq订阅方式获取消息并可设置持久化

    Rabbit 通过方式获取消息:订阅方式事实上是向queue注冊consumer,通过rpc向queue server发送注冊consumer的消息。rabbitMQ Server在收到消息后,依据消息的内容类型推断这是一个订阅消息。这样当MQ 中queue有消息时,会自己主动把消息通过该socket(长连接)通道发送出去。

    能够通过

    channel.basicQos(1); 
    设置RabbitMQ调度分发消息的方式。也就是告诉RabbitMQ每次仅仅给消费者处理一条消息。也就是等待消费者处理完而且已经对刚才处理的消息进行确认之后, 才发送下一条消息。防止消费者太过于忙碌。例如以下图所看到的:

     

    整理代码例如以下:


    Produce

    public class RabbitMQProduce {
    	public static void main(String[] args) throws IOException, InterruptedException {
    		ConnectionFactory factory =new ConnectionFactory();
    		String routingKey="test";
    		String exchange="test";
    	    factory.setHost("localhost");
    	    Connection conn = factory.newConnection();
    	    Channel channel =conn.createChannel();
    	    
    	    //发送消息
    	    for(int i=0;i<8000;i++){
    	    	if(i%5==0){
    	    		Thread.sleep(200);
    	    	}
    	    	byte[] messageBodyBytes =(i+"").getBytes();
    	    	//假设将队列设置为持久化之后,还须要将消息也设为可持久化的。MessageProperties.PERSISTENT_TEXT_PLAIN
    	    	//也就是将队列设置为持久化之后。还须要将发送的消息也要设置为持久化才干保证队列和消息一直存在
    	    	//消费者在声明时也要做持久化声明
    		    channel.basicPublish(exchange, routingKey, null, messageBodyBytes);
    		    System.out.println("发送.."+i);
    	    }
    	    channel.close();  
    	    conn.close();
    	}
    }


    Customer

    public class RabbitMqCustomer {
    	private static ConnectionFactory factory;
    	private static String QueryName="test";
    	private static Connection conn;
    	private static Channel channel;
    	private static String exchange="test";
    	private static String routingKey="test";
    	public static void main(String[] args) throws Exception {
    		start();
    		/**
    		 * 採用订阅的方式获取消息
    		 */
            channel.basicConsume(QueryName, false, new DefaultConsumer(channel){
            	@Override
            	public void handleShutdownSignal(String consumerTag,
            			ShutdownSignalException sig) {
            	System.out.println("==="+consumerTag+"====="+sig.getMessage());
            	boolean isOpenConnect = conn!=null&&conn.isOpen();
            	boolean isOpenChannel = channel != null && channel.isOpen();
            	while(!isOpenChannel||!isOpenConnect){
            		try {
            			System.out.println("连接失败重连接....");
    					start();
    					Thread.sleep(3000);
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
            	  }
            	}
            	
            	@Override
            	public void handleDelivery(String consumerTag, Envelope envelope,
            			BasicProperties properties, byte[] body) throws IOException {
            		//消息序号
            		long deliveryTag = envelope.getDeliveryTag();
            		String mes = new String(body,"UTF-8");
            		System.out.println("接受到消息:"+mes);
            		//确认收到,消息回执
            		channel.basicAck(deliveryTag, true);
            	}
            });
    	}
    
    	public static  void start() throws IOException {
    		factory = new ConnectionFactory();
    		factory.setHost("localhost");
    		factory.setUsername("test");
    		factory.setPassword("test");
    		conn = factory.newConnection();
    		channel = conn.createChannel();
    		channel.exchangeDeclare(exchange, "topic");
    		channel.queueDeclare(QueryName, false, false, false, null);//声明消息队列,且为可持久化的
    		channel.queueBind(QueryName, exchange, routingKey);
    		channel.basicQos(1); //消息分发处理
    	}
    }
    


  • 相关阅读:
    微服务- 认识我们的服务2
    微服务- 用于定位日志的会话的传播简单实现4
    微服务-Feign性能调优3
    微服务-为什么要微服务1
    GateWay与熔断器
    slf4j-api、slf4j-log4j12、log4j之间关系
    关于tomcat的axConnections、maxThreads、acceptCount
    Spring注解@Resource和@Autowired区别对比详解
    关于spring MVC 关键组件 & 流程
    关于spring MVC的全局异常处理
  • 原文地址:https://www.cnblogs.com/brucemengbm/p/7380173.html
Copyright © 2011-2022 走看看