zoukankan      html  css  js  c++  java
  • RabbitMQ消息的交换

    消息的交换

    目录

    RabbitMQ-从基础到实战(1)— Hello RabbitMQ

    RabbitMQ-从基础到实战(2)— 防止消息丢失

    1.简介

    在前面的例子中,每个消息都只对应一个消费者,即使有多个消费者在线,也只会有一个消费者接收并处理一条消息,这是消息中间件的一种常用方式。

    另外一种方式,生产者生产一条消息,广播给一个或多个队列,所有订阅了这个队列的消费者,都可以消费这条消息,这就是消息订阅。

    官方教程列举了这样一个场景,生产者发出一条记录日志的消息,消费者1接收到后写日志到硬盘,消费者2接收到后打印日志到屏幕。工作中还有很多这样的场景有待发掘,适当的使用消息订阅后可以成倍的增加效率。

    2.RabbitMQ的交换中心(Exchange)

    在前两章的例子中,我们涉及到了三个概念

    1. 生产者
    2. 队列
    3. 消费者

    这不禁让我们以为,生产者生产消息后直接到发送到队列,消费者从队列中获取消息,再消费掉。

    其实这是错误的,在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。

    正确的概念是,生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期--发送给某些队列,或者直接丢弃掉。

    这个概念在官方文档中被称作RabbitMQ消息模型的核心思想(core idea)

    如下图,其中X代表的是Exchange。

    image

    RabbitMQ中,有4种类型的Exchange

    • direct    通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
      • 默认类型   本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
    • topic    话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
    • headers    当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
    • fanout    分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景

    更详细的介绍,请看官方文档

    3.临时队列

    可以对一个队列命名是十分重要的,在消费者消费消息时,要指明消费哪个队列的消息(下面的queue),这样就可以让多个消费者同时分享一个队列

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

    上述记录日志的场景中,有以下几个特点

    • 所有消费者都需要监听所有的日志消息,因此每个消费者都需要一个单独的队列,不需要和别人分享
    • 消费者只关心最新的消息,连接到RabbitMQ之前的消息不需要关心,因此,每次连接时需要创建一个队列,绑定到相应的exchange上,连接断开后,删除该队列

    自己声明队列是比较麻烦的,因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁

    String queueName = channel.queueDeclare().getQueue();

    这行代码会获取一个名字类似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的临时队列

    4.绑定

    再次注意,在RabbitMQ中,消息是发送到Exchange的,不是直接发送的Queue。因此,需要把Queue和Exchange进行绑定,告诉RabbitMQ把指定的Exchange上的消息发送的这个队列上来

    绑定队列使用此方法

    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

    其中,queue是队列名,exchange是要绑定的交换中心,routingKey就是这个queue的routingKey

    5.实践

    下面来实现上述场景,生产者发送日志消息,消费者1记录日志,消费者2打印日志

    下面的代码中,把连接工厂等方法放到了构造函数中,也就是说,每new一个对象,都会创建一个连接,在生产环境这样做是很浪费性能的,每次创建一个connection都会建立一次TCP连接,生产环境应使用连接池。而Channel又不一样,多个Channel是共用一个TCP连接的,因此可以放心的获取Channel(本结论出自官方文档对Channel的定义)

    AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".

    For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

    日志消息发送类 LogSender

    复制代码
     1 import java.io.IOException;
     2 import java.util.concurrent.TimeoutException;
     3 
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 
     7 import com.rabbitmq.client.Channel;
     8 import com.rabbitmq.client.Connection;
     9 import com.rabbitmq.client.ConnectionFactory;
    10 
    11 public class LogSender {
    12 
    13     private Logger logger = LoggerFactory.getLogger(LogSender.class);
    14     private  ConnectionFactory factory;
    15     private  Connection connection;
    16     private  Channel channel;
    17     
    18     /**
    19      * 在构造函数中获取连接
    20      */
    21     public LogSender(){
    22         super();
    23         try {
    24             factory = new ConnectionFactory();
    25             factory.setHost("127.0.0.1");
    26             connection = factory.newConnection();
    27             channel = connection.createChannel();
    28         } catch (Exception e) {
    29             logger.error(" [X] INIT ERROR!",e);
    30         }
    31     }
    32     /**
    33      * 提供个关闭方法,现在并没有什么卵用
    34      * @return
    35      */
    36     public boolean closeAll(){
    37         try {
    38             this.channel.close();
    39             this.connection.close();
    40         } catch (IOException | TimeoutException e) {
    41             logger.error(" [X] CLOSE ERROR!",e);
    42             return false;
    43         }
    44         return true;
    45     }
    46     
    47     /**
    48      * 我们更加关心的业务方法
    49      * @param message
    50      */
    51     public void sendMessage(String message) {
    52             try {
    53                 //声明一个exchange,命名为logs,类型为fanout
    54                 channel.exchangeDeclare("logs", "fanout");
    55                 //exchange是logs,表示发送到此Exchange上
    56                 //fanout类型的exchange,忽略routingKey,所以第二个参数为空
    57                 channel.basicPublish("logs", "", null, message.getBytes());
    58                 logger.debug(" [D] message sent:"+message);
    59             } catch (IOException e) {
    60                 e.printStackTrace();
    61             }
    62     }
    63 }
    复制代码

    在LogSender中,和之前的例子不一样的地方是,我们没有直接声明一个Queue,取而代之的是声明了一个exchange

    发布消息时,第一个参数填了我们声明的exchange名字,routingKey留空,因为fanout类型忽略它。

    在前面的例子中,我们routingKey填的是队列名,因为默认的exchange(exchange位空字符串时使用默认交换中心)会把队列的routingKey设置为queueName(声明队列的时候设置的,不是发送消息的时候),又是direct类型,所以可以通过queueName当做routingKey找到队列。

    消费类 LogConsumer

    复制代码
     1 package com.liyang.ticktock.rabbitmq;
     2 
     3 import java.io.IOException;
     4 import java.util.concurrent.TimeoutException;
     5 
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 
     9 import com.rabbitmq.client.AMQP;
    10 import com.rabbitmq.client.Channel;
    11 import com.rabbitmq.client.Connection;
    12 import com.rabbitmq.client.ConnectionFactory;
    13 import com.rabbitmq.client.Consumer;
    14 import com.rabbitmq.client.DefaultConsumer;
    15 import com.rabbitmq.client.Envelope;
    16 
    17 public class LogConsumer {
    18 
    19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
    20     private ConnectionFactory factory;
    21     private Connection connection;
    22     private Channel channel;
    23 
    24     /**
    25      * 在构造函数中获取连接
    26      */
    27     public LogConsumer() {
    28         super();
    29         try {
    30             factory = new ConnectionFactory();
    31             factory.setHost("127.0.0.1");
    32             connection = factory.newConnection();
    33             channel = connection.createChannel();
    34             // 声明exchange,防止生产者没启动,exchange不存在
    35             channel.exchangeDeclare("logs","fanout");
    36         } catch (Exception e) {
    37             logger.error(" [X] INIT ERROR!", e);
    38         }
    39     }
    40 
    41     /**
    42      * 提供个关闭方法,现在并没有什么卵用
    43      * 
    44      * @return
    45      */
    46     public boolean closeAll() {
    47         try {
    48             this.channel.close();
    49             this.connection.close();
    50         } catch (IOException | TimeoutException e) {
    51             logger.error(" [X] CLOSE ERROR!", e);
    52             return false;
    53         }
    54         return true;
    55     }
    56 
    57     /**
    58      * 我们更加关心的业务方法
    59      */
    60     public void consume() {
    61         try {
    62             // 获取一个临时队列
    63             String queueName = channel.queueDeclare().getQueue();
    64             // 把刚刚获取的队列绑定到logs这个交换中心上,fanout类型忽略routingKey,所以第三个参数为空
    65             channel.queueBind(queueName, "logs", "");
    66             //定义一个Consumer,消费Log消息
    67             Consumer consumer = new DefaultConsumer(channel) {
    68                 @Override
    69                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    70                         byte[] body) throws IOException {
    71                     String message = new String(body, "UTF-8");
    72                     logger.debug(" [D] 我是来打印日志的:"+message);
    73                 }
    74             };
    75             //这里自动确认为true,接收到消息后该消息就销毁了
    76             channel.basicConsume(queueName, true, consumer);
    77         } catch (IOException e) {
    78             e.printStackTrace();
    79         }
    80     }
    81 }
    复制代码

    复制一个项目,把72行改为如下代码,代表两个做不同工作的消费者

    1 logger.debug(" [D] 我已经把消息写到硬盘了:"+message);

    消费者App

    复制代码
    1 public class App 
    2 {
    3     public static void main( String[] args )
    4     {
    5         LogConsumer consumer = new LogConsumer();
    6         consumer.consume();
    7     }
    8 }
    复制代码

    生产者App

    复制代码
    1 public class App {
    2     public static void main( String[] args ) throws InterruptedException{
    3         LogSender sender = new LogSender();
    4         while(true){
    5             sender.sendMessage(System.nanoTime()+"");
    6             Thread.sleep(1000);
    7         }
    8     }
    9 }
    复制代码

    把消费者打包成两个可执行的jar包,方便观察控制台

    用java -jar 命令执行,结果如下

    6.结束语

    本章介绍了RabbitMQ中消息的交换,再次强调,RabbitMQ中,消息是通过交换中心转发到队列的,不要被默认的exchange混淆,默认的exchange会自动把queue的名字设置为它的routingKey,所以消息发布时,才能通过queueName找到该队列,其实此时queueName扮演的角色就是routingKey。

  • 相关阅读:
    《DSP using MATLAB》Problem 6.17
    一些老物件
    《DSP using MATLAB》Problem 6.16
    《DSP using MATLAB》Problem 6.15
    《DSP using MATLAB》Problem 6.14
    《DSP using MATLAB》Problem 6.13
    《DSP using MATLAB》Problem 6.12
    《DSP using MATLAB》Problem 6.11
    P1414 又是毕业季II
    Trie树
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/6581982.html
Copyright © 2011-2022 走看看