zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记3 RabbitMQ的六种工作模式

    1、简单队列

      其实上篇文章末尾给出的代码就是简单队列。

      

      一个生产者对应一个消费者!!!

    生产者将消息发送到“hello”队列。消费者从该队列接收消息。

      ①、pom文件

      必须导入rabbitmq 依赖包

    1     <dependency>
    2       <groupId>com.rabbitmq</groupId>
    3       <artifactId>amqp-client</artifactId>
    4       <version>3.4.1</version>
    5     </dependency>

      ②、工具类

    复制代码
     1 package com.ys.utils;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5 
     6 /**
     7  * Create by hadoop
     8  */
     9 public class ConnectionUtil {
    10 
    11     public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{
    12         //1、定义连接工厂
    13         ConnectionFactory factory = new ConnectionFactory();
    14         //2、设置服务器地址
    15         factory.setHost(host);
    16         //3、设置端口
    17         factory.setPort(port);
    18         //4、设置虚拟主机、用户名、密码
    19         factory.setVirtualHost(vHost);
    20         factory.setUsername(userName);
    21         factory.setPassword(passWord);
    22         //5、通过连接工厂获取连接
    23         Connection connection = factory.newConnection();
    24         return connection;
    25     }
    26 }
    复制代码

      ③、生产者 Producer

    复制代码
     1 package com.ys.simple;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.ys.utils.ConnectionUtil;
     6 
     7 /**
     8  * Create by YSOcean
     9  */
    10 public class Producer {
    11     private final static String QUEUE_NAME = "hello";
    12 
    13     public static void main(String[] args) throws Exception{
    14         //1、获取连接
    15         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    16         //2、声明信道
    17         Channel channel = connection.createChannel();
    18         //3、声明(创建)队列
    19         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    20         //4、定义消息内容
    21         String message = "hello rabbitmq ";
    22         //5、发布消息
    23         channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    24         System.out.println("[x] Sent'"+message+"'");
    25         //6、关闭通道
    26         channel.close();
    27         //7、关闭连接
    28         connection.close();
    29     }
    30 }
    复制代码

      ④、消费者Consumer

    复制代码
     1 package com.ys.simple;
     2 
     3 import com.rabbitmq.client.Channel;
     4 import com.rabbitmq.client.Connection;
     5 import com.rabbitmq.client.QueueingConsumer;
     6 import com.ys.utils.ConnectionUtil;
     7 
     8 
     9 /**
    10  * Create by YSOcean
    11  */
    12 public class Consumer {
    13 
    14     private final static String QUEUE_NAME = "hello";
    15 
    16     public static void main(String[] args) throws Exception{
    17         //1、获取连接
    18         Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest");
    19         //2、声明通道
    20         Channel channel = connection.createChannel();
    21         //3、声明队列
    22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    23         //4、定义队列的消费者
    24         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    25         //5、监听队列
    26         /*
    27             true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息已经成功消费
    28             false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
    29                    如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其
    30                    发送消息,直到该消费者反馈。
    31          */
    32 
    33         channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
    34         //6、获取消息
    35         while (true){
    36             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
    37             String message = new String(delivery.getBody());
    38             System.out.println(" [x] Received '" + message + "'");
    39         }
    40     }
    41 
    42 }
    复制代码

      注意这里消费者有自动确认消息和手动确认消息两种模式。

    2、work 模式

      

      一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!

      竞争消费者模式。

      ①、生产者

     View Code

      ②、消费者

      这里创建两个消费者

      消费者1:每接收一条消息后休眠10毫秒

     View Code

      消费者2:每接收一条消息后休眠1000毫秒

     View Code

      ③、测试结果

      首先生产者一次打印从0-9条消息

      

      接着我们看消费者1:结果为打印偶数条消息

      

      消费者2:结果为打印奇数条消息

      

      ④、分析结果

      消费者1和消费者2获取到的消息内容是不同的,也就是说同一个消息只能被一个消费者获取。

      消费者1和消费者2分别获取奇数条消息和偶数条消息,两种获取消息的条数是一样的。

      前面我们说这种模式是竞争消费者模式,一条队列被多个消费者监听,这里两个消费者,其中消费者1和消费者2在获取消息后分别休眠了10毫秒和1000毫秒,也就是说两个消费者获取消息的效率是不一样的,但是结果却是两者获得的消息条数是一样的,这根本就不构成竞争关系,那么我们应该怎么办才能让工作效率高的消费者获取消息更多,也就是消费者1获取消息更多呢?

      PS:在增加一个消费者其实获取消息条数也是一样的,消费者1获取0,3,6,9,消费者2获取1,4,7,消费者3获取2,5,8

      ⑤、能者多劳

    channel.basicQos(1);

      增加如上代码,表示同一时刻服务器只会发送一条消息给消费者。消费者1和消费者2获取消息结果如下:

      

      

      ⑥、应用场景

      效率高的消费者消费消息多。可以用来进行负载均衡。

    3、发布/订阅模式

        

      一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

      ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。

      ①、生产者

     View Code

      ②、消费者

      消费者1:

     View Code

      消费者2:

     View Code

      注意:消费者1和消费者2两者监听的队列名称是不一样的,我们可以通过前台管理系统看到:

      

      ③、测试结果

      

      

      

      消费1和消费者2都消费了该消息。

      ps:这是因为消费者1和消费者2都监听了被同一个交换器绑定的队列。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

      ④、应用场景

      比如一个商城系统需要在管理员上传商品新的图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统更新图片,另一个用于日志系统记录日志。

    4、路由模式

      

      生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

      也就是让消费者有选择性的接收消息。

      ①、生产者

     View Code

      ②、消费者

      消费者1:

     View Code

      消费者2:

     View Code

      ③、测试结果

      我们首先看代码,生产者发布消息,指定的路由key为update。消费者1绑定队列和交换机时key分别是update/delete/add;消费者2绑定队列和交换器时key是select。

      所以我们可以猜测生产者发送的消息,只有消费者1能够接收并消费,而消费者2是不能接收的。

      

      

      

      ④、应用场景

      利用消费者能够有选择性的接收消息的特性,比如我们商城系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作确不需要,那么这两个队列分开接收消息就比较好。

    5、主题模式

       

      上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

      符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。

      ①、生产者

     View Code

      ②、消费者

      消费者1:

     View Code

      消费2:

     View Code

      ③、分析结果

      生产者发送消息绑定的路由key为update.Name;消费者1监听的队列和交换器绑定路由key为update.#;消费者2监听的队列和交换器绑定路由key为select.#。

      很显然,消费者1会接收到消息,而消费者2接收不到。

     

    6、RPC

    ·rabbitmq实现rpc的原理是:客户端向一个队列中发送消息,并注册一个回调的队列用于接收服务端返回的消息,该消息需要声明一个叫做correaltionId的属性,该属性将是该次请求的唯一标识。服务端在接受到消息(在需要时可以验证correaltionId)后,处理消息,并将消息发送到客户端注册的回调队列中。原理图如下:  

      

    2、代码实现

      下面我们将模拟实现一个rpc客户端和rpc服务端。客户端给服务端发送message,服务端收到后处理message,再将处理后的消息返给客户端

      rpc客户端

      

    复制代码
    /**
     * rpc客户端
     */
    public class RpcClient {
        //发送消息的队列名称
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = null;
            Channel channel = null;
            try {
               connection = connectionFactory.newConnection();
               channel = connection.createChannel();
               //创建回调队列
               String callbackQueue = channel.queueDeclare().getQueue();
               //创建回调队列,消费者从回调队列中接收服务端传送的消息
                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(callbackQueue,true,consumer);
    
                //创建消息带有correlationId的消息属性
                String correlationId = UUID.randomUUID().toString();
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(callbackQueue).build();
                String message = "hello rabbitmq";
                channel.basicPublish("",RPC_QUEUE_NAME,basicProperties,message.getBytes());
                System.out.println("RpcClient send message " + message + ", correaltionId = " + correlationId);
    
                //接收回调消息
                while (true){
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String receivedCorrelationId = delivery.getProperties().getCorrelationId();
                    if(correlationId.equals(receivedCorrelationId)){
                        System.out.println("RpcClient receive format message " + new String(delivery.getBody(), "UTF-8") + ", correaltionId = " + correlationId);
                        break;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    复制代码

       rpc服务端

      

    复制代码
    /**
     * rpc服务器
     */
    public class RpcServer {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        private static String format(String message){
            return "......" + message + "......";
        }
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = null;
            try {
                connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
                QueueingConsumer consumer = new QueueingConsumer(channel);
                //声明消费者预取的消息数量
                channel.basicQos(1);
                channel.basicConsume(RPC_QUEUE_NAME,false,consumer);//采用手动回复消息
                System.out.println("RpcServer waitting for receive message");
    
                while (true){
                    //接收并处理消息
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("RpcServer receive message " + message);
                    String response = format(message);
                    //确认收到消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
    
                    //取出消息的correlationId
                    AMQP.BasicProperties properties = delivery.getProperties();
                    String correlationId = properties.getCorrelationId();
    
                    //创建具有与接收消息相同的correlationId的消息属性
                    AMQP.BasicProperties replyProperties = new AMQP.BasicProperties().builder().correlationId(correlationId).build();
                    channel.basicPublish("",properties.getReplyTo(),replyProperties,response.getBytes());
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    复制代码

       先运行服务端,再运行客户端,结果如下:

      RpcClient

      

      RpcServer

      

    7、四种交换器

      前面五种队列模式介绍完了,但是实际上只有三种,第一种简单队列,第二种工作模式,剩下的三种都是和交换器绑定的合起来称为一种,这小节我们就来详细介绍交换器。

      交换器分为四种,分别是:direct、fanout、topic和 headers。

      前面三种分别对应路由模式、发布订阅模式和通配符模式,headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器,这里也不详细介绍。

      ①、direct

      如果路由键完全匹配的话,消息才会被投放到相应的队列。

       

      ②、fanout

      当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

       

      ③、topic

      设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

      

    8、总结

      关于 RabbitMQ 的五种队列,其实实际使用最多的是最后一种主题模式,通过模糊匹配,使得操作更加自如。那么我们总结一下有交换器参与的队列(最后三种队列)工作方式如下:

      

  • 相关阅读:
    A real ROCA using Bootstrap, jQuery, Thymeleaf, Spring HATEOAS and Spring MVC
    CTP交易接口
    PHP版实现友好的时间显示方式(例如:2小时前)
    php实现文件上传的源码
    获取 Windows Phone 的 User-Agent 字符串
    实现弹出收回菜单效果ios源码
    孤岛能源安卓游戏安卓源码
    火影快打游戏安卓源码
    java编程的78条黄金法则
    php中实现17种正则表达式
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/12192926.html
Copyright © 2011-2022 走看看