zoukankan      html  css  js  c++  java
  • 86--spring cloud (RabbitMQ/工作模式)

    RabbitMQ

    消息队列服务,消息中间件(Broker),消息服务

    几种常用消息服务器:

    • Rabbitmq
    • Activemq
    • Rocketmq
    • Tubemq

    mq服务器的地位,和redis相当,现在分布式项目中,redis和mq服务应该是必备的

    Rabbitmq

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密

    耦合

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

    解耦

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

    低流量

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

    流量峰值

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景

    流量销峰

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢

    阻塞

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    异步调用

    rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    rabbitmq

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    exchange

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    主要端口介绍

    • 4369 – erlang发现口
    • 5672 – client端通信口
    • 15672 – 管理界面ui端口
    • 25672 – server间内部通信口

    rabbitmq六种工作模式

    简单模式

    简单

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    • 发送消息的程序是生产者
    • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    • 消费者等待从队列接收消息

    简单模式

    测试项目创建

    添加依赖

    创建一个maven项目

    添加以下依赖于pom.xml文件中

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.tedu</groupId>
        <artifactId>rabbitmq</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.8.0-alpha2</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.8.0-alpha2</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    测试生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("www.gavin6.xyz");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    
    

    测试结果:

    消息已发送
    

    测试消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("www.gavin6.xyz");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    测试结果:

    收到:helloworld
    

    工作模式

    工作

    工作模式

    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    package m2_work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //工作模式下的生产者
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
            //定义队列
            c.queueDeclare("helloworld",false,false,false,null);
            //发送消息
            while (true){
                System.out.println("输入消息");
                String string =new Scanner(System.in).nextLine();
                //如果输入的是"exit"则结束生产者进程
                if ("exit".equals(string)) {
                    break;
                }
                c.basicPublish("","helloworld",null,string.getBytes());
            }
            c.close();
    
        }
    }
    
    

    消费者接收消息

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    System.out.println("处理完毕结束-----------");
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
            //消费数据
            c.basicConsume("helloworld",true,deliverCallback,cancelCallback);
        }
    }
    
    

    运行测试

    运行:

    • 一个生产者
    • 两个消费者

    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    • 消费者一: 1,3,5
    • 消费者二: 2,4

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

    如果生产者发送以下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

    • 消费者一: 1…, 3, 5
    • 消费者二: 2, 4

    当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    ACK消息回执

    如果设置成手动 ACK,服务器会等待收到回执后才删除消息,如果收到回执之前消费者离线,消息会被重新投递

    手动ACK设置

    1. 消费数据时,设置成手动ACK模式

      c.basicConsume("helloworld", false, ...)

    2. 处理完消息后,要执行手动发送回执操作

      c.basicAck(回执, 是否同时确认多条消息)

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("helloworld",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    1
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    Qos

    Qualtity of Service

    每次抓取的消息数量,默认没有数量限制

    设置 qos=1

    每次只抓取一条数据,处理完之前,不抓取下一条,就可以做到合理分发数据

    qos必须在手动ACK模式下才有效

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    合理分发

    合理分发数据的前提是:

    要在手动ACK的前提下,才可以对qos=1进行设置,完成数据的合理分发

    //设置每次抓取的数量 --每次只抓取一条数据
    c.basicQos(1);
    
    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //设置每次抓取的数量 --每次只抓取一条数据
            c.basicQos(1);
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("helloworld",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    Rabbitmq消息持久化

    介绍

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列持久化

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    c.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    c.queueDeclare("task_queue", true, false, false, null);
    

    消息持久化

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    最终完成的工作模式下的生产者和消费者代码

    生产者代码

    package m2_work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //工作模式下的生产者
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
            //定义队列
            c.queueDeclare("task_queue",
                    true,
                    false,
                    false,
                    null);
            //发送消息
            while (true){
                System.out.println("输入消息");
                String string =new Scanner(System.in).nextLine();
                //如果输入的是"exit"则结束生产者进程
                if ("exit".equals(string)) {
                    break;
                }
                c.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,string.getBytes());
                System.out.println("消息已经发送");
            }
            c.close();
        }
    
    }
    
    

    消费者代码

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("task_queue",
                    true, //持久化队列
                    false,
                    false,
                    null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //设置每次抓取的数量 --每次只抓取一条数据
            c.basicQos(1);
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("task_queue",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    发布订阅模式

    发布订阅

    模型图

    发布订阅

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queue = ch.queueDeclare().getQueue();
    

    或者采用UUID的方式生成

    String queue = UUID.randomUUID().toString();
    c.queueDeclare(queue,false,true,true,null);
    

    绑定 Bindings

    绑定

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    c.queueBind(queue, "logs", "");
    

    列出绑定关系:

    rabbitmqctl list_bindings

    代码实现

    完成代码

    生产者代码

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。路由模式下需要配置

    package m3_publishsubstribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //订阅者发布模式
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
    
            //定义交换机
    //        c.exchangeDeclare("logs","fanout",true,false,null);
            c.exchangeDeclare("logs","fanout");
            //发送消息
            while (true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                //第二个参数对fanout交换机无效 给予空串
                c.basicPublish("logs","",null,msg.getBytes());
            }
    
    
        }
    
    }
    
    

    消费者代码

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。

    package m3_publishsubstribe;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    //消费者
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
    
            //定义随机队列
            //方式一 自己给出所有参数
    //        String queue = UUID.randomUUID().toString();
    //        c.queueDeclare(queue,false,true,true,null);
            //方式二 服务器自动命名,并获取队列名字
            String queue = c.queueDeclare().getQueue();
            //绑定
            c.queueBind(queue,"logs","");
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到消息"+msg);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
        }
    }
    
    

    路由模式

    路由

    路由模式

    通过路由模式,我们将向其添加一个特性—我们将只订阅所有消息中的一部分

    绑定 Bindings

    在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

    c.queueBind(queue, "direct_logs", "");
    

    绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

    绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

    c.queueBind(queue,"direct_logs",key);
    

    bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

    直连交换机 Direct exchange

    上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

    前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

    我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

    路由模式

    其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

    这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

    多重绑定 Multiple bindings

    多重绑定

    ​ 使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

    发送消息

    我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

    和前面一样,我们首先需要创建一个exchange:

    //参数1: 交换机名
    //参数2: 交换机类型
    c.exchangeDeclare("direct_logs", "direct");
    

    接着来看发送消息的代码

    //参数1: 交换机名
    //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    //参数3: 其他配置属性
    //参数4: 发布的消息数据 
    c.basicPublish("direct_logs", "error", null, message.getBytes());
    

    订阅(绑定)

    接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

    ///用绑定键用来绑定
    c.queueBind(queue, "logs", "info");
    c.queueBind(queue, "logs", "warning");
    

    完整的代码

    完整代码

    生产者代码

    package m4_routing;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的生产者
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
    
            //定义直连交换机
            c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    
            //向交换机发送消息,在消息上携带路由键
            while(true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("输入路由键");
                String key = new Scanner(System.in).nextLine();
                c.basicPublish("direct_logs",key,null,msg.getBytes());
    
            }
    
        }
    }
    
    

    消费者代码

    package m4_routing;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的消费者
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
            //定义随机队列
            String queue = c.queueDeclare().getQueue();
    
            System.out.println("输入绑定键,用空格隔开");
            String keys = new Scanner(System.in).nextLine();
            String[] a = keys.split("\s+");
            //用绑定键用来绑定
            for (String key : a) {
                c.queueBind(queue,"direct_logs",key);
            }
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    Envelope envelope = message.getEnvelope();
                    String key = envelope.getRoutingKey();
                    System.out.println("收到消息"+msg+",key="+key);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
    
        }
    }
    
    

    测试

    测试步骤:

    首先启动一个生产者,两个消费者

    然后分别在两个消费者上绑定路由键

    最后通过生产者发送消息,发送消息以及携带路由键

    观察控制台输出:

    image-20200829201148248

    image-20200829201219409

    image-20200829201253148

    有以上的输出,表示路由模式使用正确

    主题模式

    主题

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    主题

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    生产者代码

    package m5_topic;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //主题模式下的生产者
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("www.gavin6.xyz");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    //        factory.setVirtualHost("/lq");
            Channel c = factory.newConnection().createChannel();
    
            //定义交换机
            //参数1: 交换机名
    		//参数2: 交换机类型
            c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    
    
            //发送消息,并且携带路由键
            while(true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("输入路由键");
                String key = new Scanner(System.in).nextLine();
                //参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
                c.basicPublish("topic_logs",key,null,msg.getBytes());
            }
        }
    }
    
    

    消费者代码

    package m5_topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的消费者
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
            //定义随机队列
    
            String queue = UUID.randomUUID().toString();
            c.queueDeclare(queue,false,true,true,null);
    
            System.out.println("输入绑定键,用空格隔开");
            String keys = new Scanner(System.in).nextLine();
            String[] a = keys.split("\s+");
            //用绑定键用来绑定
            for (String key : a) {
                c.queueBind(queue,"topic_logs",key);
            }
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    Envelope envelope = message.getEnvelope();
                    String key = envelope.getRoutingKey();
                    System.out.println("收到消息"+msg+",key="+key);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
    
        }
    }
    
    

    测试

    开启一个生产者,开启两个消费者

    ​ 开启一个生产者

    image-20200831190126106

    ​ 开启两个生产者

    image-20200831190159845

    image-20200831190218792

  • 相关阅读:
    Android JNI的使用方法
    Android4.4中jni的native的方法无法找到的解决方案
    如何解决Your project contains C++ files but it is not using a supported native build system
    android-studio开发NDK错误记录:bash: ../../build/intermediates/classes/debug: is a directory
    Android Studio 编译: Program type already present: XXX 解决方案
    ScriptEngineManager is not available in Android and causes a NoClassDefFoundError
    Android 机器人项目Pepper_Android
    Android 优秀开源项目
    准备开发一个运行在Android上的JavaME模拟器
    上周热点回顾(2.3-2.9)团队
  • 原文地址:https://www.cnblogs.com/liqbk/p/13579521.html
Copyright © 2011-2022 走看看