zoukankan      html  css  js  c++  java
  • RabbitMQ基础篇

      RabbitMQ是基于AMQP(Advanced Message Queue)标准协议规范的实现,由Erlang语言开发。

      RabbitMQ结构图:

        


    一、名词概念

      Broker:消息队列服务器实体。

      Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

      Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

      Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

      Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

      vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

      producer:消息生产者,就是投递消息的程序。

      consumer:消息消费者,就是接受消息的程序,消费者分为持续订阅(basicConsumer)和单条订阅(basicGet)。

      channel:消息通道,建立在TCP连接之上。在客户端的每个连接里,可建立多个channel【实际上信道的创建是没有限制的】,每个channel代表一个会话任务。


    二、使用流程

          

      生产者(客户端):

        (1)客户端连接到消息队列服务器,打开一个channel。

        (2)客户端声明一个exchange,并设置相关属性。

        (3)客户端声明一个queue,并设置相关属性。【注:若不声明队列,则rabbitmq会默认生成个随机队列】

        (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

        (5)客户端投递消息到exchange。   

     1 package producer.normal;
     2 
     3 import com.rabbitmq.client.BuiltinExchangeType;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.ConnectionFactory;
     7 
     8 import java.io.IOException;
     9 import java.util.concurrent.TimeoutException;
    10 
    11 public class DirectProducer {
    12 
    13     private final static String EXCHANGE_NAME = "direct_log";
    14     public static void main(String[] args) throws IOException, TimeoutException {
    15         // 创建连接工厂
    16         ConnectionFactory cf = new ConnectionFactory();
    17         // 工厂属性配置,列举如下几个,其他属性自行点击ConnectionFactory进去查看
    18         cf.setHost("127.0.0.1");// ip
    19         cf.setPort(5672);// 端口
    20         cf.setUsername("guest");// 用户名
    21         cf.setPassword("guest");// 密码
    22         // 创建连接
    23         Connection con = cf.newConnection();
    24         // 创建信道
    25         Channel channel = con.createChannel();
    26         // 声明交换器
    27         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    28         String[] logLevels = {"info", "error", "warning"};
    29         for (int i = 0; i < logLevels.length; i++) {
    30             String logLevel = logLevels[i];
    31             String message = logLevel + ":" + "hello rabbitmq";
    32             channel.basicPublish(EXCHANGE_NAME, logLevel, null, message.getBytes());
    33             System.out.println("send message:" + message);
    34         }
    35         channel.close();
    36         con.close();
    37     }
    38 }
    生产者

      消费者:

        (1)连接到消息队列服务器,打开一个channel。

        (2)声明一个exchange,并设置相关属性。

        (3)声明一个queue,并设置相关属性。【注:若不声明队列,则rabbitmq会默认生成个随机队列】

        (4)使用routing key,在exchange和queue之间建立好绑定关系。

        (5)接收exchange中的消息进行消费处理。

     1 package consumer.normal;
     2 
     3 import com.rabbitmq.client.*;
     4 
     5 import java.io.IOException;
     6 import java.nio.charset.Charset;
     7 import java.util.concurrent.TimeoutException;
     8 
     9 public class DirectConsumerAll {
    10     private final static String EXCHANGE_NAME = "direct_log";
    11 
    12     public static void main(String[] args) throws IOException, TimeoutException {
    13         ConnectionFactory cf = new ConnectionFactory();
    14         Connection con = cf.newConnection();
    15         Channel channel = con.createChannel();
    16         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    17         String queueName = channel.queueDeclare().getQueue();//声明随机队列
    18         String[] logLevels = {"info", "error", "warning"};
    19         for (String logLevel : logLevels) {
    20             channel.queueBind(queueName, EXCHANGE_NAME, logLevel);
    21         }
    22         System.out.println("waiting message...");
    23         Consumer consumer = new DefaultConsumer(channel) {
    24             @Override
    25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    26                                        byte[] body) {
    27                 String message = new String(body, Charset.defaultCharset());
    28                 System.out.println("accept " + envelope.getRoutingKey() + ":" + message);
    29             }
    30         };
    31         channel.basicConsume(queueName, true, consumer);
    32 
    33     }
    34 }
    消费者

    三、交换器类型

      DIRECT:完全路由键匹配,例如,绑定时设置了routing key为"abc",那么客户端提交的消息,只有设置了key为"abc"的才会投递到队列。

      FANOUT:不需要路由键,采取广播模式。例如,一条消息进来时,投递到与该交换机绑定的所有队列。

      TOPIC:主题,根据路由键进行模式匹配投递消息,符号"#"匹配一个或多个词,符号"*"匹配正好一个词。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。

      HEADERS:消息头,不作过多讲解,使用场景很少。


     四、消息持久化

      RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑。但相应会带来性能的下降,大致10倍。消息队列持久化包括以下3个部分:

        (1)exchange持久化,在声明时指定durable => 1

        (2)queue持久化,在声明时指定durable => 1

        (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

      如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。


    五、消息确认和发送方确认

      1、消息确认:

        消费者收到的每一条消息都必须进行确认,分为自动确认和消费者自行确认。

        消费者在声明队列时,指定autoAck参数,true自动确认,false时rabbitmq会等到消费者显示的发回一个ack信号才会删除消息。autoAck=false,有足够时间让消费者处理消息,直到消费者显示调用basicAck为止。

        Rabbitmq中消息分为了两部分:1、等待投递的消息;2、已经投递,但是还没有收到ack信号的。如果消费者断连了,服务器会把消息重新入队,投递给下一个消费者。未ack的消息是没有超时时间的。

      2、发送方确认

        生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。

        AMQP协议层面为我们提供的事务机制解决了这个问题,但是事务机制本身也会带来问题:

           1、严重的性能问题

           2、使生产者应用程序产生同步

         RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性 能影响几乎可以忽略不计—— 发送方确认模式的机制。

      注意:发送方确认模式和消费者对消息的确认是不同的。

     1 package producer.confirm;
     2 
     3 import com.rabbitmq.client.*;
     4 import java.io.IOException;
     5 import java.nio.charset.Charset;
     6 import java.util.concurrent.TimeoutException;
     7 
     8 /**
     9  * 发送方确认模式——异步模式
    10  */
    11 public class ProducerConfirmAsync {
    12     private final static String EXCHANGE_NAME = "confirm_log";
    13 
    14     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    15         ConnectionFactory cf = new ConnectionFactory();
    16         Connection con = cf.newConnection();
    17         // 监听连接关闭事件,一般用于重连机制
    18         con.addShutdownListener(new ShutdownListener() {
    19             @Override
    20             public void shutdownCompleted(ShutdownSignalException cause) {
    21 
    22             }
    23         });
    24         Channel channel = con.createChannel();
    25         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    26         // 将信道设置为发送方确认
    27         channel.confirmSelect();
    28         // 监听信道事件
    29         channel.addShutdownListener(new ShutdownListener() {
    30             @Override
    31             public void shutdownCompleted(ShutdownSignalException cause) {
    32 
    33             }
    34         });
    35         // deliveryTag:代表消息在信道中的唯一一次投递,单调递增
    36         // multiple:是否批处理,默认false
    37         // 监听已被投递的消息
    38         channel.addConfirmListener(new ConfirmListener() {
    39             @Override
    40             public void handleAck(long deliveryTag, boolean multiple) {
    41                 System.out.println("Ack deliveryTag=" + deliveryTag + ",multiple=" + multiple);
    42             }
    43 
    44             @Override
    45             public void handleNack(long deliveryTag, boolean multiple) {
    46                 System.out.println("Ack deliveryTag=" + deliveryTag + ",multiple=" + multiple);
    47             }
    48         });
    49 
    50         // 监听未被投递到队列的消息
    51         channel.addReturnListener(new ReturnListener() {
    52             @Override
    53             public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
    54                                      AMQP.BasicProperties properties, byte[] body) {
    55                 System.out.println("replyCode:" + replyCode);
    56                 System.out.println("replyText:" + replyText);
    57                 System.out.println("exchange:" + exchange);
    58                 System.out.println("routingKey:" + routingKey);
    59                 System.out.println("message:" + new String(body));
    60             }
    61         });
    62         String[] logLevels = {"error", "info", "warning"};
    63         System.out.println("waiting message sent...");
    64         // mandatory参数为true,当投递消息无法找到合适的消息队列,则返回生成者;false为缺省,丢弃消息
    65         for (int i = 0; i < 3; i++) {
    66             String message =  " hello rabbitMq" + (i + 1);
    67             channel.basicPublish(EXCHANGE_NAME, logLevels[i], true, null,
    68                     message.getBytes(Charset.defaultCharset()));
    69             System.out.println("---------------------------------------------------");
    70             System.out.println("sent message: [" +logLevels[i] +"]" + message);
    71             Thread.sleep(200);
    72         }
    73     }
    74 }
    发送方确认——异步模式

     

    面试环节可能会问到几个基础问题:

      1.  如果消息达到无人订阅的队列会怎么办

       消息会一直在队列中等待,rabbitmq会默认队列是无限长度的。

      2.  多个消费者订阅到同一队列怎么办

      消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者

      3.  消息路由到了不存在的队列怎么办?

      会忽略,当消息不存在,消息丢失了。

      4.  如何明确拒绝消息?

       1、消费者断连,2、消费者使用reject命令(requeue=true,重新分发消息,false移除消息),3、nack命令(批量的拒绝)

          

     

     

     

  • 相关阅读:
    【转】C#进阶系列——WebApi 接口参数不再困惑:传参详解
    微信内测小程序,苹果你怎么看?
    给你一个团队,你应该怎么管?
    ios修改产品名
    【原创】windows下搭建vue开发环境+IIS部署
    【原】“系统”重新启动
    Ubuntu root密码修改
    【转】网络编程常见问题总结
    Python + Selenium -Python 3.6 3.7 安装 PyKeyboard PyMouse
    python3 获取当前路径及os.path.dirname sys.path.dirname的使用
  • 原文地址:https://www.cnblogs.com/light-sunset/p/12702317.html
Copyright © 2011-2022 走看看