zoukankan      html  css  js  c++  java
  • spring in action day07 RabbitMq

    一:安装RabbitMq

    记录下本人在win10环境下安装RabbitMQ的步骤,以作备忘。

    第一步:下载并安装erlang

     

    • 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang
    • 下载地址:http://www.erlang.org/downloads

     

    根据本机位数选择erlang下载版本。

     

    • 下载完是这么个东西:

     

     

    • 双击,点next就可以。

     

     

    • 选择一个自己想保存的地方,然后nextfinish就可以。

     

     

    • 安装完事儿后要记得配置一下系统的环境变量。

    此电脑-->鼠标右键“属性”-->高级系统设置-->环境变量-->“新建”系统环境变量

     

    变量名:ERLANG_HOME

    变量值就是刚才erlang的安装地址,点击确定。

    然后双击系统变量path

     

    点击新建,将%ERLANG_HOME%in加入到path中。

     

    第二步:下载并安装RabbitMQ

     

     

    • 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
    • RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQsbin目录。

    我的目录是:D:Program FilesRabbitMQ Server abbitmq_server-3.7.3sbin

    然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装

     

     

    打开命令行命令行,进入RabbitMQ的安装目录: sbin

    ,输入 rabbitmqctl status , 如果出现以下的图,说明安装是成功的,并且说明现在RabbitMQ Server已经启动了,运行正常。

     

     

    打开sbin目录,双击rabbitmq-server.bat

     

    等几秒钟看到这个界面后,访问http://localhost:15672

     

    然后可以看到如下界面

     

    默认用户名和密码都是guest

    登陆即可。

    二:RabbitMq介绍

    2.1运行过程介绍

      jms是生产者指定消息的地址,消费者指定接收的地址相比,两种直接对应。与jms不同,rabbitmq是通过消息的routing key-交换机-队列的binding key来进行消息的沟通。生产者把消息推到交换机,交换机根据规则把消息分配到它绑定的对应的队列。而消费者只需要关心消费哪个队列即可,而不需要去关心要消费哪个消息。

    0.消息队列运转过程

     

     

    生产者生产过程:
    (1)生产者连接到 RabbitMQ Broker 建立一个连接( Connection) ,开启 个信道 (Channel)
    (2) 生产者声明一个或多个交换器 ,并设置相关属性,比如交换机类型、是否持久化等
    (3)生产者声明队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
    (4)生产者通过路由键将交换器和队列绑定起来。
    (5)生产者发送消息至 RabbitMQ Broker ,其中包含路由键、交换器等信息。
    (6) 相应的交换器根据接收到的路由键查找相匹配的队列 如果找到 ,则将从生产者发送过来的消息存入相应的队列中。
    (7) 如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者
    (8) 关闭信道。
    (9) 关闭连接。

    消费者接收消息的过程:
    (1)消费者连接到 RabbitMQ Broker ,建立一个连接(Connection ,开启信道(Channel)
    (2) 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数以及做些准备工作。
    (3)等待 RabbitMQ Broker 回应并投递相应队列中的消息消费者接收消息。
    (4) 消费者确认接收到的消息
    (5) RabbitMQ 从队列中删除相应己经被确认的消息
    (6) 关闭信道。
    (7)关闭连接。

    2.2交换机介绍

    Default:将消息路由到队列名字和消息的routing key相同的队列。且该交换机绑定所有队列

    Direct:消息的routing key和队列的binding相同,则路由到该队列。这种交换机是一对一的直连交换机,一条消息会被一个消费者消费,不能重复消费。如果存在多个消费者消费该交换机绑定的队列,会轮循消费。

    Topic消息的routing key和队列的binding匹配。注意,这里是匹配,而不是相同。因为binding可以使用通配符*# #表示匹配一个或一个以上*表示只匹配一个

    Fanout:所有发送到该交换机的消息会被路由到该交换机绑定的所有队列。

    Header:和topic类似。不过匹配是消息的和队列的binding进行匹配而不是routing ke

    Dead letter:捕获所有无法路由到队列的消息

    2.3路由的基本机制

    1) 配置交换机(可以多个)

    2) 配置队列(可以多个)

    3) 绑定交换机和队列(一个交换机可以绑定多个队列)

    4) 生产者发送消息的时候,指定交换机(不指定的时候发到Default交换机)。消息到了交换机,根据交换机的路由规则,根据消息的rooting key(或者其他)和该交换机绑定的队列进行匹配,分配到对应的队列上。

    5) 消费者只需要指定消费那条队列的消息进行消费即可。

    2.4编码前准备工作

    1) 引入依赖

            <!--rabbitmq-->
    
            <dependency>
    
                <groupId>org.springframework.boot</groupId>
    
                <artifactId>spring-boot-starter-amqp</artifactId>
    
        </dependency>

    2) 配置

    #配置rabbitMq 服务器

    spring:
    
      rabbitmq:
    
        host: 127.0.0.1
    
        port: 5672
    
        username: guest
    
        password: guest
    
        #虚拟host 可以不设置,使用server默认host
    
        #virtual-host: JCcccHost

    2.4编码-配置交换机-队列-交换机绑定队列

     

    下面创建了一个Direct交换机,名字是TestDirectExchange,创建了一个队列,名字叫做TestDirectQueue,然后将他们绑定,并且设置了队列的routing key(binding key)TestDirectRouting

     

    package tacos.config;
    
     
    
    import org.springframework.amqp.core.Binding;
    
    import org.springframework.amqp.core.BindingBuilder;
    
    import org.springframework.amqp.core.DirectExchange;
    
    import org.springframework.amqp.core.Queue;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
    /**
    
     * @Author : JCccc
    
     * @CreateTime : 2019/9/3
    
     * @Description :
    
     **/
    
    @Configuration
    
    public class RabbitMQConfig {
    
     
    
        //1.创建队列 名字:TestDirectQueue
    
        @Bean
    
        public Queue TestDirectQueue() {
    
          //Queue的几个参数说明
    
          //name  队列名字
    
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    
            // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    
            // autoDelete:默认也是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    
            //   return new Queue("TestDirectQueue",true,true,false);
    
     
    
            //一般设置一下队列的持久化就好,其余两个就是默认false
    
        
    
            return new Queue("TestDirectQueue",true);
    
        }
    
     
    
        //2.创建直连交换机 Direct,交换机名字:TestDirectExchange
    
        @Bean
    
        DirectExchange TestDirectExchange() {
    
          //  return new DirectExchange("TestDirectExchange",true,true);
    
            return new DirectExchange("TestDirectExchange",true,false);
    
        }
    
     
    
        //3.将队列和交换机绑定, 并设置队列的用于匹配键routing key:TestDirectRouting
    
        @Bean
    
        Binding bindingDirect() {
    
            return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    
        }
    
    }

    2.5编码-发送消息

    2.5.1发送消息的方法说明

    第一类方法:

    下面三个方法,发送的是Message对象,需要把数据对象转换成Message对象再进行发送。转换需要转换器,默认使用的转换器是SimpleMessageConverter。转换器的说见2.5.2

    第一个方法:一个参数 Message,没有指定交换机和routing key,消息会被发送到default交换机。没有routing key 账目匹配队列呢?

    第二个方法:两个参数 routing keyMessage。消息会被发送到default交换机。并且使用routing key和该交换机绑定的队列的名字去匹配。

    第三个方法:三个参数 交换机和routing keyMessage。消息会被发送到该交换机,并且使用routing key和该交换机绑定的队列的binding key值匹配

    rabbitTemplate.send(message); 
    
    rabbitTemplate.send("TestDirectQueue",message);
    
    rabbitTemplate.send("TestDirectExchange", "TestDirectRouting", message);

    第二类方法:

    同第一类方法相比,下面三个方法不是传输Message对象,而是直接参数数据对象。少了手动将数据对象转换成Message的步骤。自动转换了。

    rabbitTemplate.convertAndSend(map);
    
    rabbitTemplate.convertAndSend( "TestDirectRouting",map);
    
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",map);

    第三类方法:

    下面三个方法,和上面三个方法相比,多了一个参数,MessagePostProsser。它可以携带数据对象之外的参数,对数据对象的一个补充。 

    rabbitTemplate.convertAndSend(map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });
    
     rabbitTemplate.convertAndSend( "TestDirectRouting",map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });
    
     rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });    

    2.5.2转换器

    Spring提供了5类转换器,默认的是SimpleMessageConverter

     

    如果想要使用别的转换器替代默认的转换器,只需要配置如下代码

     @Bean
    
        public MessageConverter getConverter(){
    
         return new Jackson2JsonMessageConverter();
    
     }

    Springboot发现这个配置,会替换掉默认的转换器

    2.5.3发送消息9个方法实例

    试了一下,最好使的还是每二类的第三方法

    package tacos.web;
    
     
    
    import java.time.LocalDateTime;
    
    import java.time.format.DateTimeFormatter;
    
    import java.util.HashMap;
    
    import java.util.Map;
    
    import java.util.UUID;
    
     
    
    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.core.MessageProperties;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import org.springframework.amqp.support.converter.MessageConverter;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.core.ParameterizedTypeReference;
    
    import org.springframework.web.bind.annotation.GetMapping;
    
    import org.springframework.web.bind.annotation.RestController;
    
     
    
    /**
    
     * @Author : JCccc
    
     * @CreateTime : 2019/9/3
    
     * @Description :
    
     **/
    
    @RestController
    
    public class RabbitMqController {
    
     
    
        @Autowired
    
        RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
    
     
    
        
    
        
    
        @GetMapping("/sendDirectMessage0")
    
        public String sendDirectMessage0() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            
    
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    
            MessageProperties po = new MessageProperties();
    
            Message message = messageConverter.toMessage(map, po);
    
            
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            rabbitTemplate.send(message);
    
            return "ok";
    
        }
    
        
    
       
    
     
    
        
    
        @GetMapping("/sendDirectMessage1")
    
        public String sendDirectMessage1() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            
    
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    
            MessageProperties po = new MessageProperties();
    
            Message message = messageConverter.toMessage(map, po);
    
            
    
            //将消息发送到DefaultExchange,绑定到名字与消息rooting key相同的队列上,也就是TestDirectQueue队列
    
            System.out.println("发送消息");
    
            rabbitTemplate.send("TestDirectQueue",message);
    
            return "ok";
    
        }
    
        
    
       
    
        
    
        
    
        
    
        @GetMapping("/sendDirectMessage2")
    
        public String sendDirectMessage2() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            
    
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    
            MessageProperties po = new MessageProperties();
    
            Message message = messageConverter.toMessage(map, po);
    
            
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            rabbitTemplate.send("TestDirectExchange", "TestDirectRouting", message);
    
            return "ok";
    
        }
    
        
    
     
    
        @GetMapping("/sendDirectMessag3")
    
        public String sendDirectMessage3() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            rabbitTemplate.convertAndSend(map);
    
            return "ok";
    
        }
    
        
    
        @GetMapping("/sendDirectMessag4")
    
        public String sendDirectMessage4() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            rabbitTemplate.convertAndSend( "TestDirectRouting",map);
    
            return "ok";
    
        }
    
        
    
        @GetMapping("/sendDirectMessag5")
    
        public String sendDirectMessage5() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",map);
    
            return "ok";
    
        }
    
     
    
        
    
        
    
        @GetMapping("/sendDirectMessag6")
    
        public String sendDirectMessage6() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            
    
            rabbitTemplate.convertAndSend(map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });
    
            
    
            return "ok";
    
        }
    
        
    
        @GetMapping("/sendDirectMessag7")
    
        public String sendDirectMessage7() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            
    
            rabbitTemplate.convertAndSend( "TestDirectRouting",map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });
    
            
    
            return "ok";
    
        }
    
        
    
        @GetMapping("/sendDirectMessag8")
    
        public String sendDirectMessage8() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            
    
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",map,x ->{
    
             x.getMessageProperties().getHeaders().put("tenantId", "111");
    
             return x;
    
            });
    
            
    
            return "ok";
    
        }
    
     
    
    }

     

    2.6编码-接收消息-拉取

    接收消息有12方法,也是3类,没类4

    第一类方法

    下面4个方法接收的是Message对象,接收到之后需要使用转换器转换成数据对象

    第一个方法:没有参数,接收default交换机的消息队列的消息?接收那个队列呢

    第二个方法:一个参数:队列的名称

    第三个方法:一个参数:指定接收消息的超时时间,这里就和JMS不同了。JMS是一致等着消息,而这里可以指定时间,时间过了没有消息就返回null。默认时间是0.也就是立即返回,不等待。

    第四个方法:两个参数:队列名称和超时时间

    Message receive = rabbitTemplate.receive();
    
    Message receive = rabbitTemplate.receive("TestDirectQueue");
    
    Message receive = rabbitTemplate.receive(1000);
    
    Message receive = rabbitTemplate.receive("TestDirectQueue",1000);

    第二类方法

    和第一类方法相比,接收到的直接就是数据对象

    Object fromMessage = rabbitTemplate.receiveAndConvert();
    
    Object fromMessage = rabbitTemplate.receiveAndConvert("TestDirectQueue");
    
    Object fromMessage = rabbitTemplate.receiveAndConvert(1000);
    
    Object fromMessage = rabbitTemplate.receiveAndConvert("TestDirectQueue",1000);

    第三类方法

    和第二类方法相比,多了一个参数ParameterizedTypeReference可以指定接收到的数据对象的类型,可以限制传输的数据类型。而且接收到了不用强转。缺点:必须使用Jackson2JsonMessageConverter这个转换器,默认转换器是不行的

    Map fromMessage = rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Map>() {
    
    });
    
     Map fromMessage = rabbitTemplate.receiveAndConvert("TestDirectQueue",new ParameterizedTypeReference<Map>() {
    
    });
    
    Map fromMessage = rabbitTemplate.receiveAndConvert(1000,new ParameterizedTypeReference<Map>() {
    
    });
    
     Map fromMessage = rabbitTemplate.receiveAndConvert("TestDirectQueue",1000,new ParameterizedTypeReference<Map>() {
    
    });
    
     

     

    2.6.1接收实例(只例举了几个)

    @GetMapping("/receiveDirectMessage1")
    
        public String receiveDirectMessage() {
    
        
    
            System.out.println("进来了---");
    
            Message receive = rabbitTemplate.receive("TestDirectQueue");
    
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    
            if(receive != null){
    
              Object fromMessage =  messageConverter.fromMessage(receive);
    
                 
    
                 //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
                 System.out.println("接收消息1" + fromMessage);
    
            }else{
    
             System.out.println("空");
    
            }
    
           
    
            return "ok";
    
        }
    
     
    
    @GetMapping("/receiveDirectMessage2")
    
        
    
        public String receiveDirectMessage2() {
    
        
    
            System.out.println("进来了---");
    
            Message receive = rabbitTemplate.receive("TestDirectQueue",1000);
    
            MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
    
            if(receive != null){
    
              Map<String,Object> fromMessage = (Map<String, Object>) messageConverter.fromMessage(receive);
    
                 
    
                 //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
                 System.out.println("接收消息1" + fromMessage);
    
            }else{
    
             System.out.println("空");
    
            }
    
           
    
            return "ok";
    
        }
    
     
    
    @GetMapping("/receiveDirectMessage5")
    
        
    
        public String receiveDirectMessage5() {
    
        
    
            System.out.println("进来了---");
    
            Object fromMessage =  rabbitTemplate.receiveAndConvert("TestDirectQueue");
    
            if(fromMessage != null){
    
                 //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
                 System.out.println("接收消息1" + fromMessage);
    
            }else{
    
             System.out.println("空");
    
            }
    
           
    
            return "ok";
    
        }
    
     
    
    /**
    
     * 这种方式需要使用Jackson2JsonMessageConverter转换器
    
     @Bean
    
        public MessageConverter getConverter(){
    
         return new Jackson2JsonMessageConverter();
    
        }
    
     * @return
    
     */
    
    @GetMapping("/receiveDirectMessage6")
    
    public String receiveDirectMessage6() {
    
     
    
        System.out.println("进来了---");
    
        Map fromMessage =  rabbitTemplate.receiveAndConvert("TestDirectQueue",new ParameterizedTypeReference<Map>() {
    
    });
    
        if(fromMessage != null){
    
             //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
             System.out.println("接收消息1" + fromMessage);
    
        }else{
    
         System.out.println("空");
    
        }
    
       
    
        return "ok";
    
    }

    2.7编码-接收消息-推送

    通过 @RabbitListener(queues = "TestDirectQueue")指定接收的队列。下面例举了两个方法接收同一个队列,队列名是TestDirectQueue

    package tacos.web;
    
    import java.util.Map;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    
    import org.springframework.stereotype.Component;
    
    @Component
    
    public class RabbitMQReceiveController {
    
        @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    
        public void process(Map testMessage) {
    
            System.out.println("DirectReceiver消费者1收到消息  : " + testMessage.toString());
    
        }
    
        
    
        @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    
        public void process2(Map testMessage) {
    
            System.out.println("DirectReceiver消费者2收到消息  : " + testMessage.toString());
    
        }
    
    }

    2.8发布订阅模式

    上面我们发送消息举例采用的是Direct交换机,是点对点模式,一条消息只能被分发到一个队列上。下面我们看topic交换机,一条消息可以被分发到多个队列上。注意:一个队列上的一条消息还是只能被一个消费者消费。Topic模式是把一条消息分到多个队列,相当于复制成了多条消息,供多个消费者消费。

    例子

    下面配置了一个叫做topicExchangetopic交换机,配置了两个队列,名字是topic1topic1并且把他们绑定到了topicExchange交换机。切两个队列的binding key分别是topic.mantopic.woman

    package tacos.config;
    
     
    
    import org.springframework.amqp.core.Binding;
    
    import org.springframework.amqp.core.BindingBuilder;
    
    import org.springframework.amqp.core.DirectExchange;
    
    import org.springframework.amqp.core.Queue;
    
    import org.springframework.amqp.core.TopicExchange;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    
    import org.springframework.amqp.support.converter.MessageConverter;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
    /**
    
     * @Author : JCccc
    
     * @CreateTime : 2019/9/3
    
     * @Description :
    
     **/
    
    @Configuration
    
    public class RabbitMQConfig {
    
     
    
     
    
        @Bean
    
        public Queue firstQueue() {
    
            return new Queue("topic1");
    
        }
    
     
    
        @Bean
    
        public Queue secondQueue() {
    
            return new Queue("topic2");
    
        }
    
     
    
        @Bean
    
        TopicExchange exchange() {
    
            return new TopicExchange("topicExchange");
    
        }
    
     
    
     
    
     
    
        @Bean
    
        Binding bindingExchangeMessage() {
    
            return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
    
        }
    
     
    
       
    
        @Bean
    
        Binding bindingExchangeMessage2() {
    
            return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.woman");
    
        }
    
        
    
    }

     

    发送消息-发送消息到topicExchange交换器,routing keytopic.man

    @GetMapping("/sendTopicMessag1")
    
        public String sendTopicMessag1() {
    
            String messageId = String.valueOf(UUID.randomUUID());
    
            String messageData = "test message, hello!";
    
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
    
            Map<String,Object> map=new HashMap<>();
    
            map.put("messageId",messageId);
    
            map.put("messageData",messageData);
    
            map.put("createTime",createTime);
    
            //将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
    
            System.out.println("发送消息");
    
            
    
            rabbitTemplate.convertAndSend("topicExchange", "topic.man",map);
    
            
    
            return "ok";
    
        }

     

    接收消息

    @RabbitListener(queues = "topic1")//监听的队列名称 topic1
    
        public void process3(Map testMessage) {
    
            System.out.println("TopicReceiver - 消费者1收到消息  : " + testMessage.toString());
    
        }
    
        
    
        @RabbitListener(queues = "topic1")//监听的队列名称 topic1
    
        public void process4(Map testMessage) {
    
            System.out.println("TopicReceiver -  消费者2收到消息  : " + testMessage.toString());
    
        }
    
     @RabbitListener(queues = "topic2")//监听的队列名称 topic2
    
        public void process5(Map testMessage) {
    
            System.out.println("TopicReceiver -  消费者3收到消息  : " + testMessage.toString());
    
        }
    
        @RabbitListener(queues = "topic2")//监听的队列名称 topic2
    
        public void process6(Map testMessage) {
    
            System.out.println("TopicReceiver - 消费者4收到消息  : " + testMessage.toString());
    
       }

     

    当发送一条消息只会有一个消费者接收到消息。消费者12轮循接收。我们来看流程。

    消息发到topicExchange交换机,进行匹配,只有一个队列匹配上,所以只有队列topic1有这条消息,它只能被一个消费者消费。所以被两个消费者轮循环消费

     

    让我们把交换机和队列的绑定做一点变化

    @Bean
    
        Binding bindingExchangeMessage() {
    
            return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
    
        }
    
     
    
        @Bean
    
        Binding bindingExchangeMessage2() {
    
            return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    
        }

    第二个队列的binding key变为了topic.#

    此时,再发送消息。接收结果消费者13和消费者24轮循接收消息,每次会有两个消费者接收到消息。

    2.9其它交换机模式

    不做更多地说明

    3消息回调

    就是消息确认(生产者推送消息成功,消费者接收消息成功)

    3.1生产者消息回调

    1) 加入配置

        #确认消息已发送到交换机(Exchange)

          publisher-confirms: true

          #确认消息已发送到队列(Queue)

          publisher-returns: true

    2)配置类

    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
     
    
    package tacos.web;
    
     
    
    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    
    import org.springframework.amqp.rabbit.support.CorrelationData;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
     
    
    /**
    
     * @Author : JCccc
    
     * @CreateTime : 2019/9/3
    
     * @Description :
    
     **/
    
    @Configuration
    
    public class RabbitMqBack {
    
     
    
        @Bean
    
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
    
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
    
            rabbitTemplate.setConnectionFactory(connectionFactory);
    
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    
            rabbitTemplate.setMandatory(true);
    
     
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
                @Override
    
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
                    System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
    
                    System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
    
                    System.out.println("ConfirmCallback:     "+"原因:"+cause);
    
                }
    
            });
    
     
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    
                @Override
    
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    
                    System.out.println("ReturnCallback:     "+"消息:"+message);
    
                    System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
    
                    System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
    
                    System.out.println("ReturnCallback:     "+"交换机:"+exchange);
    
                    System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
    
                }
    
            });
    
     
    
            return rabbitTemplate;
    
        }

    /*    

        可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;

        那么以上这两种回调函数都是在什么情况会触发呢?

        先从总体的情况分析,推送消息存在四种情况:

        ①消息推送到server,但是在server里找不到交换机 触发ConfirmCallback 回调函数

        ②消息推送到server,找到交换机了,但是没找到队列 触发的是 ConfirmCallback和RetrunCallback两个回调函数

        ③消息推送到sever,交换机和队列啥都没找到 触发的是 ConfirmCallback 回调函数

        ④消息推送成功 触发的是 ConfirmCallback 回调函数

        那么我先写几个接口来分别测试和认证下以上4种情况,消息确认触发回调函数的情况:

        

     */

    }

    3.2消费者消息回调

    这个我尝试了下,会消费一条消息,也就是相当于一个消费者。那么既然这个相当于一个消费者,但是这里我们可以主动返回消息是否消费成功。而如果直接写个方法来消费消息,那么消息发送到消费者就认为消息被成功消费了。而如果消息的内容有问题无法消费,比如消费者方法报错了,是无法告诉RabbitMQ的。

    所以,消费者消息回调相当于一个特殊的消费者,它可以手动确认消息是否成功消费。

     

    和生产者的消息确认机制不同,因为消息接收本来就是在监听消息,符合条件的消息就会消费下来。
    所以,消息接收的确认机制主要存在三种模式:

    自动确认 这也是默认的消息确认情况。  AcknowledgeMode.NONE
    RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
    所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
    一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

    根据情况确认, 这个不做介绍
    手动确认  这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
    basic.ack用于肯定确认 
    basic.nack用于否定确认(注意:这是AMQP 0-9-1RabbitMQ扩展) 
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息 

    消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理。
    basic.nack,basic.reject表示没有被正确处理:

    着重讲下reject,因为有时候一些场景是需要重新入列的。

    channel.basicReject(deliveryTag, true);  拒绝消费当前消息,如果第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。设置false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

    使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。

    但是如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

     

    顺便也简单讲讲 nack,这个也是相当于设置不消费某条消息。

    channel.basicNack(deliveryTag, false, true);
    第一个参数依然是当前消息到的数据的唯一id;
    第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
    第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

    同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

    1) 配置类

    它是绑定队列的,可以绑定多个队列。对这些队列进行回调

    package tacos.config;
    
     
    
     
    
    import org.springframework.amqp.core.AcknowledgeMode;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
    /**
    
    * @Author : JCccc
    
    * @CreateTime : 2019/9/4
    
    * @Description :
    
    **/
    
    @Configuration
    
    public class MessageListenerConfig {
    
     
    
       @Autowired
    
       private CachingConnectionFactory connectionFactory;
    
       
    
       @Autowired
    
       private MyAckReceiver myAckReceiver;//消息接收处理类-自己创建实现 ChannelAwareMessageListener 接口
    
     
    
       @Bean
    
       public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    
           SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    
           container.setConcurrentConsumers(1);
    
           container.setMaxConcurrentConsumers(1);
    
           container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
    
           //设置一个队列
    
           container.setQueueNames("topic1");
    
           container.setMessageListener(myAckReceiver);
    
     
    
           return container;
    
       }
    
     
    
     
    
    }

    2) 回调处理类

    package tacos.config;
    
     
    
    import java.util.HashMap;
    
    import java.util.Map;
    
     
    
    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    
    import org.springframework.stereotype.Component;
    
     
    
    import com.rabbitmq.client.Channel;
    
     
    
    @Component
    
     
    
    public class MyAckReceiver implements ChannelAwareMessageListener {
    
     
    
        @Override
    
        public void onMessage(Message message, Channel channel) throws Exception {
    
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
    
                //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
    
                String msg = message.toString();
    
                String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
    
                Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
    
                String messageId=msgMap.get("messageId");
    
                String messageData=msgMap.get("messageData");
    
                String createTime=msgMap.get("createTime");
    
                System.out.println("  MyAckReceiver  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
    
                System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
    
                channel.basicAck(deliveryTag, true);
    
    // channel.basicReject(deliveryTag, true);//为true会重新放回队列
    
            } catch (Exception e) {
    
                channel.basicReject(deliveryTag, false);
    
                e.printStackTrace();
    
            }
    
        }
    
     
    
         //{key=value,key=value,key=value} 格式转换成map
    
        private Map<String, String> mapStringToMap(String str,int entryNum ) {
    
            str = str.substring(1, str.length() - 1);
    
            String[] strs = str.split(",",entryNum);
    
            Map<String, String> map = new HashMap<String, String>();
    
            for (String string : strs) {
    
                String key = string.split("=")[0].trim();
    
                String value = string.split("=")[1];
    
                map.put(key, value);
    
            }
    
            return map;
    
        }
    
    }

    3.3更多场景-配置多个队列,且可以对不同的队列的回调做不同的处理 - 好像有问题

    但是这个场景往往不够! 因为很多伙伴之前给我评论反应,他们需要这个消费者项目里面,监听的好几个队列都想变成手动确认模式,而且处理的消息业务逻辑不一样。

    场景: 除了直连交换机的队列TestDirectQueue需要变成手动确认以外,我们还需要将一个其他的队列或者多个队列也变成手动确认,而且不同队列实现不同的业务处理。

     

     

    1)和之前一样,同样式配置类,只不过前面只配置了一个队列,在这里配置多个队列

    package tacos.config;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.context.annotation.Configuration;
    
     
    
    /**
    
    * @Author : JCccc
    
    * @CreateTime : 2019/9/4
    
    * @Description :
    
    **/
    
    @Configuration
    
    public class MessageListenerConfig {
    
     
    
       @Autowired
    
       private CachingConnectionFactory connectionFactory;
    
       
    
       @Autowired
    
       private MyAckReceiver myAckReceiver;//消息接收处理类-自己创建实现 ChannelAwareMessageListener 接口
    
     
    
       @Bean
    
       public SimpleMessageListenerContainer simpleMessageListenerContainer() {
    
           SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    
           container.setConcurrentConsumers(1);
    
           container.setMaxConcurrentConsumers(1);
    
           container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
    
           //同时设置多个如下: 前提是队列都是必须已经创建存在的
    
             container.setQueueNames("topic1","topic2");
    
     
    
     
    
           //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
    
           //container.setQueues(new Queue("topic1",true));
    
           //container.addQueues(new Queue("topic1",true));
    
        
    
           container.setMessageListener(myAckReceiver);
    
     
    
           return container;
    
       }
    
    }

     

    2)回调处理类,对不同的额队列做不同的处理

    但是我们需要做不用的业务逻辑处理,那么只需要  根据消息来自的队列名进行区分处理即可,如:

    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.core.Message;
    
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    
    import java.util.Map;
    
     
    
    @Component
    
    public class MyAckReceiver implements ChannelAwareMessageListener {
    
     
    
        @Override
    
        public void onMessage(Message message, Channel channel) throws Exception {
    
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
    
                //因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
    
                String msg = message.toString();
    
                String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
    
                Map<String, String> msgMap = mapStringToMap(msgArray[1].trim(),3);
    
                String messageId=msgMap.get("messageId");
    
                String messageData=msgMap.get("messageData");
    
                String createTime=msgMap.get("createTime");
    
                
    
                if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())){
    
                    System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
    
                    System.out.println("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
    
                    System.out.println("执行TestDirectQueue中的消息的业务处理流程......");
    
                    
    
                }
    
     
    
                if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue())){
    
                    System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
    
                    System.out.println("消息成功消费到  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);
    
                    System.out.println("执行fanout.A中的消息的业务处理流程......");
    
     
    
                }
    
             
    
                channel.basicAck(deliveryTag, true);
    
    // channel.basicReject(deliveryTag, true);//为true会重新放回队列
    
            } catch (Exception e) {
    
                channel.basicReject(deliveryTag, false);
    
                e.printStackTrace();
    
            }
    
        }
    
        //{key=value,key=value,key=value} 格式转换成map
    
        private Map<String, String> mapStringToMap(String str,int enNum) {
    
            str = str.substring(1, str.length() - 1);
    
            String[] strs = str.split(",",enNum);
    
            Map<String, String> map = new HashMap<String, String>();
    
            for (String string : strs) {
    
                String key = string.split("=")[0].trim();
    
                String value = string.split("=")[1];
    
                map.put(key, value);
    
            }
    
            return map;
    
        }
    
    }

    如果你还想新增其他的监听队列,也就是按照这种方式新增配置即可(或者完全可以分开多个消费者项目去监听处理)。 

     

     

  • 相关阅读:
    无线鼠标换电池了
    Jython Interactive Servlet Console YOU WILL NEVER KNOW IT EXECLLENT!!! GOOD
    Accessing Jython from Java Without Using jythonc
    jython podcast cool isnt't it?
    Python里pycurl使用记录
    Creating an Interactive JRuby Console for the Eclipse Environment
    微软为AJAX和jQuery类库提供CDN服务
    Download A File Using Cygwin and cURL
    What is JMRI?这个是做什么用的,我真没看懂但看着又很强大
    用curl 发送指定的大cookie的http/https request
  • 原文地址:https://www.cnblogs.com/jthr/p/13786746.html
Copyright © 2011-2022 走看看