zoukankan      html  css  js  c++  java
  • 86--spring cloud (RabbitMQ/工作模式)

    RabbitMQ

    消息队列服务,消息中间件(Broker),消息服务

    几种常用消息服务器:

    • Rabbitmq
    • Activemq
    • Rocketmq
    • Tubemq

    mq服务器的地位,和redis相当,现在分布式项目中,redis和mq服务应该是必备的

    Rabbitmq

    RabbitMQ 使用场景

    服务解耦

    假设有这样一个场景, 服务A产生数据, 而服务B,C,D需要这些数据, 那么我们可以在A服务中直接调用B,C,D服务,把数据传递到下游服务即可

    但是,随着我们的应用规模不断扩大,会有更多的服务需要A的数据,如果有几十甚至几百个下游服务,而且会不断变更,再加上还要考虑下游服务出错的情况,那么A服务中调用代码的维护会极为困难

    这是由于服务之间耦合度过于紧密

    耦合

    再来考虑用RabbitMQ解耦的情况

    A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

    解耦

    流量削峰

    假设我们有一个应用,平时访问量是每秒300请求,我们用一台服务器即可轻松应对

    低流量

    而在高峰期,访问量瞬间翻了十倍,达到每秒3000次请求,那么单台服务器肯定无法应对,这时我们可以考虑增加到10台服务器,来分散访问压力

    但如果这种瞬时高峰的情况每天只出现一次,每次只有半小时,那么我们10台服务器在多数时间都只分担每秒几十次请求,这样就有点浪费资源了

    流量峰值

    这种情况,我们就可以使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

    这是消息队列服务器非常典型的应用场景

    流量销峰

    异步调用

    考虑定外卖支付成功的情况

    支付后要发送支付成功的通知,再寻找外卖小哥来进行配送,而寻找外卖小哥的过程非常耗时,尤其是高峰期,可能要等待几十秒甚至更长

    这样就造成整条调用链路响应非常缓慢

    阻塞

    而如果我们引入RabbitMQ消息队列,订单数据可以发送到消息队列服务器,那么调用链路也就可以到此结束,订单系统则可以立即得到响应,整条链路的响应时间只有200毫秒左右

    寻找外卖小哥的应用可以以异步的方式从消息队列接收订单消息,再执行耗时的寻找操作

    异步调用

    rabbitmq 基本概念

    RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

    rabbitmq

    Exchange

    接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的有direct、Fanout和Topic三种。

    exchange

    Message Queue

    消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到相应的queue则数据会丢失),等待消费者来取。

    Binding Key

    它表示的是Exchange与Message Queue是通过binding key进行联系的,这个关系是固定。

    Routing Key

    生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与Exchange Type及binding key联合使用才能生,我们的生产者只需要通过指定routing key来决定消息流向哪里。

    主要端口介绍

    • 4369 – erlang发现口
    • 5672 – client端通信口
    • 15672 – 管理界面ui端口
    • 25672 – server间内部通信口

    rabbitmq六种工作模式

    简单模式

    简单

    RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

    • 发送消息的程序是生产者
    • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
    • 消费者等待从队列接收消息

    简单模式

    测试项目创建

    添加依赖

    创建一个maven项目

    添加以下依赖于pom.xml文件中

    添加 slf4j 依赖, 和 rabbitmq amqp 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.tedu</groupId>
        <artifactId>rabbitmq</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.4.3</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.8.0-alpha2</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.8.0-alpha2</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    测试生产者发送消息

    package rabbitmq.simple;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Test1 {
    	public static void main(String[] args) throws Exception {
    		//创建连接工厂,并设置连接信息
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("www.gavin6.xyz");
    		f.setPort(5672);//可选,5672是默认端口
    		f.setUsername("admin");
    		f.setPassword("admin");
    
    		/*
    		 * 与rabbitmq服务器建立连接,
    		 * rabbitmq服务器端使用的是nio,会复用tcp连接,
    		 * 并开辟多个信道与客户端通信
    		 * 以减轻服务器端建立连接的开销
    		 */
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    
    		/*
    		 * 声明队列,会在rabbitmq中创建一个队列
    		 * 如果已经创建过该队列,就不能再使用其他参数来创建
    		 * 
    		 * 参数含义:
    		 *   -queue: 队列名称
    		 *   -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
    		 *   -exclusive: 排他,true表示限制仅当前连接可用
    		 *   -autoDelete: 当最后一个消费者断开后,是否删除队列
    		 *   -arguments: 其他参数
    		 */
    		ch.queueDeclare("helloworld", false,false,false,null);
    
    		/*
    		 * 发布消息
    		 * 这里把消息向默认交换机发送.
    		 * 默认交换机隐含与所有队列绑定,routing key即为队列名称
    		 * 
    		 * 参数含义:
    		 * 	-exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null 
    		 * 	-routingKey: 对于默认交换机,路由键就是目标队列名称
    		 * 	-props: 其他参数,例如头信息
    		 * 	-body: 消息内容byte[]数组
    		 */
    		ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
    
    		System.out.println("消息已发送");
    		c.close();
    	}
    }
    
    

    测试结果:

    消息已发送
    

    测试消费者接收消息

    package rabbitmq.simple;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    
    public class Test2 {
    	public static void main(String[] args) throws Exception {
    		//连接工厂
    		ConnectionFactory f = new ConnectionFactory();
    		f.setHost("www.gavin6.xyz");
    		f.setUsername("admin");
    		f.setPassword("admin");
    		//建立连接
    		Connection c = f.newConnection();
    		//建立信道
    		Channel ch = c.createChannel();
    		//声明队列,如果该队列已经创建过,则不会重复创建
    		ch.queueDeclare("helloworld",false,false,false,null);
    		System.out.println("等待接收数据");
    		
    		//收到消息后用来处理消息的回调对象
    		DeliverCallback callback = new DeliverCallback() {
    			@Override
    			public void handle(String consumerTag, Delivery message) throws IOException {
    				String msg = new String(message.getBody(), "UTF-8");
    				System.out.println("收到: "+msg);
    			}
    		};
    		
    		//消费者取消时的回调对象
    		CancelCallback cancel = new CancelCallback() {
    			@Override
    			public void handle(String consumerTag) throws IOException {
    			}
    		};
    		
    		ch.basicConsume("helloworld", true, callback, cancel);
    	}
    }
    

    测试结果:

    收到:helloworld
    

    工作模式

    工作

    工作模式

    工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

    我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

    使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

    生产者发送消息

    package m2_work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //工作模式下的生产者
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
            //定义队列
            c.queueDeclare("helloworld",false,false,false,null);
            //发送消息
            while (true){
                System.out.println("输入消息");
                String string =new Scanner(System.in).nextLine();
                //如果输入的是"exit"则结束生产者进程
                if ("exit".equals(string)) {
                    break;
                }
                c.basicPublish("","helloworld",null,string.getBytes());
            }
            c.close();
    
        }
    }
    
    

    消费者接收消息

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    System.out.println("处理完毕结束-----------");
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
            //消费数据
            c.basicConsume("helloworld",true,deliverCallback,cancelCallback);
        }
    }
    
    

    运行测试

    运行:

    • 一个生产者
    • 两个消费者

    生产者发送多条消息,
    如: 1,2,3,4,5. 两个消费者分别收到:

    • 消费者一: 1,3,5
    • 消费者二: 2,4

    rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

    消息确认

    一个消费者接收消息后,在消息没有完全处理完时就挂掉了,那么这时会发生什么呢?

    就现在的代码来说,rabbitmq把消息发送给消费者后,会立即删除消息,那么消费者挂掉后,它没来得及处理的消息就会丢失

    如果生产者发送以下消息:

    1…

    2

    3

    4

    5

    两个消费者分别收到:

    • 消费者一: 1…, 3, 5
    • 消费者二: 2, 4

    当消费者一收到所有消息后,要话费7秒时间来处理第一条消息,这期间如果关闭该消费者,那么1未处理完成,3,5则没有被处理

    我们并不想丢失任何消息, 如果一个消费者挂掉,我们想把它的任务消息派发给其他消费者

    为了确保消息不会丢失,rabbitmq支持消息确认(回执)。当一个消息被消费者接收到并且执行完成后,消费者会发送一个ack (acknowledgment) 给rabbitmq服务器, 告诉他我已经执行完成了,你可以把这条消息删除了。

    如果一个消费者没有返回消息确认就挂掉了(信道关闭,连接关闭或者TCP链接丢失),rabbitmq就会明白,这个消息没有被处理完成,rebbitmq就会把这条消息重新放入队列,如果在这时有其他的消费者在线,那么rabbitmq就会迅速的把这条消息传递给其他的消费者,这样就确保了没有消息会丢失。

    这里不存在消息超时, rabbitmq只在消费者挂掉时重新分派消息, 即使消费者花非常久的时间来处理消息也可以

    手动消息确认默认是开启的,前面的例子我们通过autoAck=ture把它关闭了。我们现在要把它设置为false,然后工作进程处理完意向任务时,发送一个消息确认(回执)。

    ACK消息回执

    如果设置成手动 ACK,服务器会等待收到回执后才删除消息,如果收到回执之前消费者离线,消息会被重新投递

    手动ACK设置

    1. 消费数据时,设置成手动ACK模式

      c.basicConsume("helloworld", false, ...)

    2. 处理完消息后,要执行手动发送回执操作

      c.basicAck(回执, 是否同时确认多条消息)

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("helloworld",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    使用以上代码,就算杀掉一个正在处理消息的工作进程也不会丢失任何消息,工作进程挂掉之后,没有确认的消息就会被自动重新传递。

    忘记确认(ack)是一个常见的错误, 这样后果是很严重的, 由于未确认的消息不会被释放, rabbitmq会吃掉越来越多的内存

    可以使用下面命令打印工作队列中未确认消息的数量

    rabbitmqctl list_queues name messages_ready messages_unacknowledged
    1
    

    当处理消息时异常中断, 可以选择让消息重回队列重新发送.
    nack 操作可以是消息重回队列, 可以使用 basicNack() 方法:

    // requeue为true时重回队列, 反之消息被丢弃或被发送到死信队列
    c.basicNack(tag, multiple, requeue)
    

    Qos

    Qualtity of Service

    每次抓取的消息数量,默认没有数量限制

    设置 qos=1

    每次只抓取一条数据,处理完之前,不抓取下一条,就可以做到合理分发数据

    qos必须在手动ACK模式下才有效

    合理地分发

    rabbitmq会一次把多个消息分发给消费者, 这样可能造成有的消费者非常繁忙, 而其它消费者空闲. 而rabbitmq对此一无所知, 仍然会均匀的分发消息

    我们可以使用 basicQos(1) 方法, 这告诉rabbitmq一次只向消费者发送一条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者

    合理分发

    合理分发数据的前提是:

    要在手动ACK的前提下,才可以对qos=1进行设置,完成数据的合理分发

    //设置每次抓取的数量 --每次只抓取一条数据
    c.basicQos(1);
    
    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("helloworld",false,false,false,null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //设置每次抓取的数量 --每次只抓取一条数据
            c.basicQos(1);
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("helloworld",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    Rabbitmq消息持久化

    介绍

    当rabbitmq关闭时, 我们队列中的消息仍然会丢失, 除非明确要求它不要丢失数据

    要求rabbitmq不丢失数据要做如下两点: 把队列和消息都设置为可持久化(durable)

    队列持久化

    队列设置为可持久化, 可以在定义队列时指定参数durable为true

    //第二个参数是持久化参数durable
    c.queueDeclare("helloworld", true, false, false, null);
    

    由于之前我们已经定义过队列"hello"是不可持久化的, 对已存在的队列, rabbitmq不允许对其定义不同的参数, 否则会出错, 所以这里我们定义一个不同名字的队列"task_queue"

    //定义一个新的队列,名为 task_queue
    //第二个参数是持久化参数 durable
    c.queueDeclare("task_queue", true, false, false, null);
    

    消息持久化

    生产者和消费者代码都要修改

    这样即使rabbitmq重新启动, 队列也不会丢失. 现在我们再设置队列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN参数

    //第三个参数设置消息持久化
    ch.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                msg.getBytes());
    

    最终完成的工作模式下的生产者和消费者代码

    生产者代码

    package m2_work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //工作模式下的生产者
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
            //定义队列
            c.queueDeclare("task_queue",
                    true,
                    false,
                    false,
                    null);
            //发送消息
            while (true){
                System.out.println("输入消息");
                String string =new Scanner(System.in).nextLine();
                //如果输入的是"exit"则结束生产者进程
                if ("exit".equals(string)) {
                    break;
                }
                c.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,string.getBytes());
                System.out.println("消息已经发送");
            }
            c.close();
        }
    
    }
    
    

    消费者代码

    package m2_work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义队列
    
            c.queueDeclare("task_queue",
                    true, //持久化队列
                    false,
                    false,
                    null);
    
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到:"+msg);
                    for (int i = 0; i <msg.length() ; i++) {
                        //遍历到 . 就休息一秒
                        if ('.' == msg.charAt(i)){
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    //发送回执
                    //参数解释 参数1 回执 参数2 是否确认多条消息回执
                    c.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    System.out.println("处理完毕结束-----------");
    
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //设置每次抓取的数量 --每次只抓取一条数据
            c.basicQos(1);
    
            //消费数据
            //手动ack设置 将true改为false
            c.basicConsume("task_queue",false,deliverCallback,cancelCallback);
    
    
        }
    }
    
    

    发布订阅模式

    发布订阅

    模型图

    发布订阅

    Exchanges 交换机

    RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

    相反,生产者只能向交换机(Exchange)发送消息。交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。

    有几种可用的交换类型:direct、topic、header和fanout。我们将关注最后一个——fanout。让我们创建一个这种类型的交换机,并称之为 logs: ch.exchangeDeclare("logs", "fanout");

    fanout交换机非常简单。它只是将接收到的所有消息广播给它所知道的所有队列。这正是我们的日志系统所需要的。

    我们前面使用的队列具有特定的名称(还记得hello和task_queue吗?)能够为队列命名对我们来说至关重要——我们需要将工作进程指向同一个队列,在生产者和消费者之间共享队列。

    但日志记录案例不是这种情况。我们想要接收所有的日志消息,而不仅仅是其中的一部分。我们还只对当前的最新消息感兴趣,而不是旧消息。

    要解决这个问题,我们需要两件事。首先,每当我们连接到Rabbitmq时,我们需要一个新的空队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的方法是让服务器为我们选择一个随机队列名称。其次,一旦断开与使用者的连接,队列就会自动删除。在Java客户端中,当我们不向queueDeclare()提供任何参数时,会创建一个具有生成名称的、非持久的、独占的、自动删除队列

    //自动生成队列名
    //非持久,独占,自动删除
    String queue = ch.queueDeclare().getQueue();
    

    或者采用UUID的方式生成

    String queue = UUID.randomUUID().toString();
    c.queueDeclare(queue,false,true,true,null);
    

    绑定 Bindings

    绑定

    我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。

    //指定的队列,与指定的交换机关联起来
    //成为绑定 -- binding
    //第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey
    c.queueBind(queue, "logs", "");
    

    列出绑定关系:

    rabbitmqctl list_bindings

    代码实现

    完成代码

    生产者代码

    生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。路由模式下需要配置

    package m3_publishsubstribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //订阅者发布模式
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
    
            //定义交换机
    //        c.exchangeDeclare("logs","fanout",true,false,null);
            c.exchangeDeclare("logs","fanout");
            //发送消息
            while (true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                //第二个参数对fanout交换机无效 给予空串
                c.basicPublish("logs","",null,msg.getBytes());
            }
    
    
        }
    
    }
    
    

    消费者代码

    如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。

    package m3_publishsubstribe;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    //消费者
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
    
            //定义随机队列
            //方式一 自己给出所有参数
    //        String queue = UUID.randomUUID().toString();
    //        c.queueDeclare(queue,false,true,true,null);
            //方式二 服务器自动命名,并获取队列名字
            String queue = c.queueDeclare().getQueue();
            //绑定
            c.queueBind(queue,"logs","");
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    System.out.println("收到消息"+msg);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
        }
    }
    
    

    路由模式

    路由

    路由模式

    通过路由模式,我们将向其添加一个特性—我们将只订阅所有消息中的一部分

    绑定 Bindings

    在上一节,我们已经创建了队列与交换机的绑定。使用下面这样的代码:

    c.queueBind(queue, "direct_logs", "");
    

    绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

    绑定可以使用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为bindingKey。这是我们如何创建一个键绑定:

    c.queueBind(queue,"direct_logs",key);
    

    bindingKey的含义取决于交换机类型。我们前面使用的fanout交换机完全忽略它。

    直连交换机 Direct exchange

    上一节中的日志系统向所有消费者广播所有消息。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们希望将日志消息写入磁盘的程序只接收关键error,而不是在warning或info日志消息上浪费磁盘空间。

    前面我们使用的是fanout交换机,这并没有给我们太多的灵活性——它只能进行简单的广播。

    我们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey完全匹配的队列。为了说明这一点,请考虑以下设置

    路由模式

    其中我们可以看到直连交换机X,它绑定了两个队列。第一个队列用绑定键orange绑定,第二个队列有两个绑定,一个绑定black,另一个绑定键green

    这样设置,使用路由键orange发布到交换器的消息将被路由到队列Q1。带有blackgreen路由键的消息将转到Q2。而所有其他消息都将被丢弃。

    多重绑定 Multiple bindings

    多重绑定

    ​ 使用相同的bindingKey绑定多个队列是完全允许的。如图所示,可以使用binding key black将X与Q1和Q2绑定。在这种情况下,直连交换机的行为类似于fanout,并将消息广播给所有匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。

    发送消息

    我们把消息发送到一个Direct交换机,而不是fanout。我们将提供日志级别作为routingKey。这样,接收程序将能够选择它希望接收的级别。让我们首先来看发出日志。

    和前面一样,我们首先需要创建一个exchange:

    //参数1: 交换机名
    //参数2: 交换机类型
    c.exchangeDeclare("direct_logs", "direct");
    

    接着来看发送消息的代码

    //参数1: 交换机名
    //参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    //参数3: 其他配置属性
    //参数4: 发布的消息数据 
    c.basicPublish("direct_logs", "error", null, message.getBytes());
    

    订阅(绑定)

    接收消息的工作原理与前面章节一样,但有一个例外——我们将为感兴趣的每个日志级别创建一个新的绑定, 示例代码如下:

    ///用绑定键用来绑定
    c.queueBind(queue, "logs", "info");
    c.queueBind(queue, "logs", "warning");
    

    完整的代码

    完整代码

    生产者代码

    package m4_routing;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的生产者
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
    
    
            //定义直连交换机
            c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
    
            //向交换机发送消息,在消息上携带路由键
            while(true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("输入路由键");
                String key = new Scanner(System.in).nextLine();
                c.basicPublish("direct_logs",key,null,msg.getBytes());
    
            }
    
        }
    }
    
    

    消费者代码

    package m4_routing;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的消费者
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
            //定义随机队列
            String queue = c.queueDeclare().getQueue();
    
            System.out.println("输入绑定键,用空格隔开");
            String keys = new Scanner(System.in).nextLine();
            String[] a = keys.split("\s+");
            //用绑定键用来绑定
            for (String key : a) {
                c.queueBind(queue,"direct_logs",key);
            }
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    Envelope envelope = message.getEnvelope();
                    String key = envelope.getRoutingKey();
                    System.out.println("收到消息"+msg+",key="+key);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
    
        }
    }
    
    

    测试

    测试步骤:

    首先启动一个生产者,两个消费者

    然后分别在两个消费者上绑定路由键

    最后通过生产者发送消息,发送消息以及携带路由键

    观察控制台输出:

    image-20200829201148248

    image-20200829201219409

    image-20200829201253148

    有以上的输出,表示路由模式使用正确

    主题模式

    主题

    主题交换机 Topic exchange

    发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。

    bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:

    • * 可以通配单个单词。
    • # 可以通配零个或多个单词。

    用一个例子来解释这个问题是最简单的

    主题

    在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的routingKey发送。routingKey中的第一个单词表示速度,第二个是颜色,第三个是物种:“<速度>.<颜色>.<物种>”。

    我们创建三个绑定:Q1与bindingKey “*.orange.*” 绑定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

    这些绑定可概括为:

    • Q1对所有橙色的动物感兴趣。
    • Q2想接收关于兔子和慢速动物的所有消息。

    将routingKey设置为"quick.orange.rabbit"的消息将被发送到两个队列。消息 "lazy.orange.elephant“也发送到它们两个。另外”quick.orange.fox“只会发到第一个队列,”lazy.brown.fox“只发给第二个。”lazy.pink.rabbit“将只被传递到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox"不匹配任何绑定,因此将被丢弃。

    如果我们违反约定,发送一个或四个单词的信息,比如"orange“或”quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,并将丢失。

    另外,"lazy.orange.male.rabbit",即使它有四个单词,也将匹配最后一个绑定,并将被传递到第二个队列。

    生产者代码

    package m5_topic;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.concurrent.TimeoutException;
    
    //主题模式下的生产者
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("www.gavin6.xyz");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
    //        factory.setVirtualHost("/lq");
            Channel c = factory.newConnection().createChannel();
    
            //定义交换机
            //参数1: 交换机名
    		//参数2: 交换机类型
            c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
    
    
            //发送消息,并且携带路由键
            while(true){
                System.out.println("输入消息");
                String msg = new Scanner(System.in).nextLine();
                System.out.println("输入路由键");
                String key = new Scanner(System.in).nextLine();
                //参数1: 交换机名
    			//参数2: routingKey, 路由键,这里我们用日志级别,如"error","info","warning"
    			//参数3: 其他配置属性
    			//参数4: 发布的消息数据 
                c.basicPublish("topic_logs",key,null,msg.getBytes());
            }
        }
    }
    
    

    消费者代码

    package m5_topic;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Scanner;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    //路由模式下的消费者
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
    
            //连接
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("www.gavin6.xyz");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            Channel c = f.newConnection().createChannel();
            //定义交换机
            c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
            //定义随机队列
    
            String queue = UUID.randomUUID().toString();
            c.queueDeclare(queue,false,true,true,null);
    
            System.out.println("输入绑定键,用空格隔开");
            String keys = new Scanner(System.in).nextLine();
            String[] a = keys.split("\s+");
            //用绑定键用来绑定
            for (String key : a) {
                c.queueBind(queue,"topic_logs",key);
            }
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    byte[] body = message.getBody();
                    String msg = new String(body);
                    Envelope envelope = message.getEnvelope();
                    String key = envelope.getRoutingKey();
                    System.out.println("收到消息"+msg+",key="+key);
                }
            };
            CancelCallback cancelCallback = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
    
                }
            };
    
            //消费数据
            c.basicConsume(queue,true, deliverCallback, cancelCallback);
    
    
        }
    }
    
    

    测试

    开启一个生产者,开启两个消费者

    ​ 开启一个生产者

    image-20200831190126106

    ​ 开启两个生产者

    image-20200831190159845

    image-20200831190218792

  • 相关阅读:
    mysql数据库常用指令
    解决windows的mysql无法启动 服务没有报告任何错误的经验。
    “Can't open file for writing”或“operation not permitted”的解决办法
    启动Apache出现错误Port 80 in use by "Unable to open process" with PID 4!
    如何打开windows的服务services.msc
    常见的HTTP状态码 404 500 301 200
    linux系统常用的重启、关机指令
    (wifi)wifi移植之命令行调试driver和supplicant
    linux(debian)安装USB无线网卡(tp-link TL-WN725N rtl8188eu )
    alloc_chrdev_region申请一个动态主设备号,并申请一系列次设备号
  • 原文地址:https://www.cnblogs.com/liqbk/p/13579521.html
Copyright © 2011-2022 走看看