zoukankan      html  css  js  c++  java
  • RabbitMQ小记(三)

    1、RabbitMQ中mandatory和immediate以及备份交换机

    (1)mandatory为true时,若交换机无法根据自身类型和路由键找到符合条件的对列,那么RabbitMQ会回调Basic.Return将消息返回生产者。

      生产者可以调用channel.addReturnListener来添加ReturnListener实现获取被返回的消息。  

      channel.basicPublish(EXCHANGE NAME , "", true ,MessageProperties . PERSISTENT TEXT PLAIN ,"mandatory test" . getBytes());

      channel.addReturnListener(new ReturnListener() (

        public void handleReturn(int replyCode , String replyText ,String exchange , String routingKey ,AMQP.BasicProperties basicProperties ,byte[] body) throws IOException{

          String message = new String(body);

          System.out.print("返回结果是:"+message);

        }

      });

    (2)mandatory为false时,若交换机无法根据自身类型和路由键找到符合条件的对列,那么RabbitMQ会丢弃消息。

    (3)immediate为true时,若交换机发现队列上不存在消费者时,那么消息不会存入队列,,那么RabbitMQ会回调Basic.Return将消息返回生产者。Rabbit3.0开始不使用immediate。

    (4)immediate和mandatory的差别

      mandatory告诉服务器至少将消息存储到一个队列中,否则将消息返回生产者。

      immediate告诉服务器若队列上有消费者则存储,否则将消息返回生产者。

    (5)备份交换机,简称AE,生产者在发送消息时,未规定mandatory参数或mandatory参数为false时,消息服务器未找到对应的对列,会将消息丢弃,而生产者不想消息被丢弃,这时可以将消息存储到备份交换机(AE)中。

      实现备份交换机有两种方式:

      1)声明交换器(channel.exchangeDeclare)时添加alternate-exchange参数来实现。 

        Map<String , Object> args = new HashMap <String , Object>();
        args.put("alternate-exchange" , "myAe");
        channe1.exchangeDeclare( "ExchangeName" , " direct" , true , false , args);
        channe1.exchangeDeclare( "myAe " , "fanout" , true , false , null) ;

      2)通过策略(Policy)的方式来实现。

      rabbitmqctl set_policy AE “^normalExchange” '{“alternate-exchange” :"myAE"," " }'

    2、过期时间(TTL)

    设置消息的过期时间,有两种方式:

    1)通过对列属性来设置,这样队列中所有消息的过期时间是一样的。

      Map<String , Object> args = new HashMqp<String , Object>{};

      args . put( " x-expires" , 1800000);

      channel . queueDeclare("myqueue " , false , false , false , args) ;

    2)通过对每条消息来设置过期时间。

      AMQP . BasicProperties.Builder builder = new AMQP.BasicProperties . Builder();

      builder.deliveryMode(2);//持久化消息

      builder.expiration(“6000”);//设置过期时间

      AMQP.BasicProperties properties = builder . build() ;channel.basicPublish(exchangeName , routingKey , mandatory , properties ," ttlTestMessage".getBytes());

    两种都设置过期时间时,取最短时间,超过过期时间,消息就会变成死信。

    3、死信队列、延迟队列、优先级队列

    (1)死信队列,简称DLX,当消息被拒绝、过期、队列达到最大长度时,消息就会变成死信,而死信就会被存储在死信队列里。

      死信队列可以在队列属性中设置:

      channel . exchangeDeclare("dlx_exchange " ,“direct”);//创建direct类型交换机

      Map<String , Object> args = new HashMap<String , Object>();
      args . put("x-dead-letter-exchange" , " dlx_exchange ");//设置交换机属性
      channel . queueDeclare("myqueue" , false , false , false , args);//将myqueue设置为死信对列

    (2)延迟队列,生产者发出消息后想在特定的时间让消费者获取消息,可以在发消息时设置消息延迟时间。

      可以通过死信得方式来实现消息延迟,例如当生产者通过A1交换机把消息发送到B1队列上,设置过期时间为20秒,当消息过期后,会存储到死信队列B2上,而消费者去订阅B2对列,从而实现消息延迟。

    (3)优先级队列,优先级高的对列中存储的消息会被最先消费。

      Map<String , Object> args= new HashMap<String , Object>() ;

      args.put( " x-rnax-priority " , 10) ;//设置优先级为10

      channel.queueDeclare( " Queue_priority " , true , false , false , args) ;

    4、远程过程调用(RPC)

      通过远程计算机从网络发起服务请求,有助于降低分布式的构建难度。

      RabbitMQ实现RPC,客户端发送请求消息,为每一条消息指定唯一的ID,服务端回复响应消息,为了接收相应的消息,需要在请求消息中发送一个回调对列,回调对列收到服务端返回的消息后根据ID来将返回消息和请求消息进行匹配,如果匹配失败,回复消息会被丢弃,RPC的流程图如下:

    5、消息持久化

      消息持久化可以提高RabbitMQ的可靠性,以防在重启、宕机时发生的数据丢失,RabbitMQ中持久化可分为交换器持久化、对列持久化、消息持久化。

      交换器持久化:声明交换器时把durable设置为true,交换器持久化会保证消息不会丢失,但自身的数据会丢失,无法再将消息发送到交换器上。

      对列持久化:声明对列时把durable设置为true,对列持久化可以保证自身的数据不会丢失,但存储的消息会丢失。

      消息持久化:将消息发送模式BasicProperties中的deliveryMode设置为2,重启、宕机消息不会丢失。

      对列和消息、交换器都设置持久化,会很大概率保证数据不会丢失,但是会影响效率,因为持久化会将数据写到磁盘里。

     

  • 相关阅读:
    40个极简WordPress主题
    一些上流的CSS3图片样式
    15个最好的 HTML5 视频播放器
    优秀的IOS界面
    25 个超棒的 WordPress 主题(2012)
    Codekit 为Web前端打造的全能型神器(附推荐各种工具)
    10 个精彩的、激发灵感的 HTML5 游戏
    20个非常绚丽的 CSS3 特性应用演示
    25+ 个免费精美的商业风格的 WordPress 主题
    10 款非常棒的CSS代码格式化工具推荐
  • 原文地址:https://www.cnblogs.com/carblack/p/12653815.html
Copyright © 2011-2022 走看看