zoukankan      html  css  js  c++  java
  • 一小时彻底搞懂RabbitMQ

     

     

     

     windows上面安装rabbitmq-server-3.7.4.exe

    首先需要安装otp_win64_20.3.exe

    步骤1:安装Erlang

    RabbitMQ 它依赖于Erlang,需要先安装Erlang。首先确定你的window电脑是32位还是64位,以下的安装以window 64位电脑举例。

    Erlang官网:http://www.erlang.org/

    rabbit安装注意事项:需要安装erlang20.3以上的版本

    erlang21_win64下载地址:https://download.csdn.net/download/qq_38862234/10965869

    环境配置:修改系统属性path下添加erlang的路径

     安装的是不能存在中文目录,对于erl配置系统环境变量的时候,计算名称也不能存在中文

    这样就表示erl环境变量安装成功了

     Rabbitmq安装的时候不能存在中文,Rabbitmq安装成功之后,在windows的程序窗口会看到下面的菜单选项

     1、windows安装成功之后,我们要使用Rabbitmq提供的管理控制台,我们需要开启管理控制台的插件命令,我们进入到D:Program FilesRabbitMQ Server abbitmq_server-3.7.4sbin目目录下,使用命令rabbitmq-plugins.bat安装下,插件安装完成之后,需要对rabbitmq服务进行重启,浏览器输入http://localhost:15672/,登录的用户名和密码都是guest,这样登录成功之后,说明应该就安装安装

    d:Program FilesRabbitMQ Server
    abbitmq_server-3.7.4sbin>rabbitmq-plugins.bat  enable rabbitmq_management

     接下来我们要创建用户admin 密码和12345,已经对应的vhost

     

     

     rabbitmq支持6种消息模式

     每一种模式我们都需要进行详细的讲解和分析

    简单模式Hello World

    使用ConnectionFactory创建连接,实质上,此连接就是一个Socket连接,设置host为localhost来连接本地的RabbitMQ Server,设置Port为5672(默认队列连接端口),设置用户名和密码,如果未设置用户名和密码将默认使用guest/guest口令(当然,此口令仅能在localhost本地使用)。

    使用channel.queueDeclare()来定义队列,在RabbitMQ中,队列仅能够创建一次,如果发现已经存在此队列,将会忽略此方法。

    功能:一个生产者P发送消息到队列Q,一个消费者C接收

    生产者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。

    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    queue: 队列名称

    durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库

    exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景

    autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

    arguments:
    队列中的消息什么时候会自动被删除?

    Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
    channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

    Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

    Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

    Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

    Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

    Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

    Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

    Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中

    Master locator(x-queue-master-locator)

    http://www.voidcn.com/article/p-dcbgmfmb-bpw.html

    参加博客:https://www.cnblogs.com/leocook/p/mq_rabbitmq_3.html

     第一种

    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
                throws IOException;
    • exchange 交换器名称

    • routingKey 路由键

    • props 有14个成员

    我们来看下代码

     项目的pom.xml的依赖如下

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com</groupId>
      <artifactId>com.rabbitmq.test</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
         <dependencies>
          <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.0.0</version>
    </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    生产者的代码如下

    package com.rabbitmq.test1;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            //
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" 生产消息:‘" + message + "");
    
        }
    
    }

    这里最关键的就是两个地方,第一个就是channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    queue: 队列名称

    durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库

    exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景

    autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

    arguments:
    队列中的消息什么时候会自动被删除?

    Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
    channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));

    Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

    Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim

    Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

    Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

    Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

    Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,

    第二个就是:channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

    因为我们使用一个默认exchange,我们使用(””)来标识的。QUEUE_NAME队列的名称

    package com.rabbitmq.test1;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端接收消息:" + message);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    两个消费者,消费者1的代码如下

    package com.rabbitmq.test2;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端1接收消息:" + message);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    package com.rabbitmq.test1;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            //
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" 生产消息:‘" + message + "");
    
        }
    
    }

    我们来看下web管理控制台的展示

    队列中可以看到我们创建的hello,点击hello可以看到队列中的具体指标

     

     在创建队列的时候,可以指定队列中的消息是否持久化、队列中消息的具体参数TTL、是否过期、最大长度等

    在Exchange交换页面,可以看到系统在对于的虚拟机下面会给我们创建6中默认的交换机类型

     我们也可以自己创建一个Exchange

     

     

     

     在概览页面可以查看当前消息实例的具体的信息有多少connection、channel、交换机、队列、消费这等信息

     connection下面可以查看当前的connection的具体信息,以及当前connection下面存在多少个channel

     在channel下面可以查看当前channel的具体流量信息

     RabbitMQ学习总结 第三篇:工作队列Work Queue

     生产者可以产生多条消息,但是每一条消息默认只能被一个消费者消费,c1和c2默认采用轮询的策略消费

    我们模拟生产者产生20条消息

    package com.rabbitmq.test2;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //
            for(int i =0 ;i < 20;i++){
                String message = "hello world"+i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" 生产消息:‘" + message + "");
                Thread.sleep(i);
                
            }
            
          channel.close();
          connection.close();
        
        }
    
    }

     消费者2的代码如下

    package com.rabbitmq.test2;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer2 {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端2接收消息:" + message);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    我们来看下日志的打印

    生产者产生的20条消息,消费者1消费了10条,消费者2默认消费了10条,使用的是轮询的策略,默认情况下,RabbitMQ会把每个消息以此轮询发到各个消费者那,把消息平均的发到各个消费者那。这种分配管理的方式叫轮询,还可以测试多个worker的情形。

    但是上面的这种work模式存在一定的问题,

    你可能会注意到有的时候RabbitMQ不能像你预想中的那样分发消息。例如有两个worker,第奇数个消息对应的任务都很耗时,第偶数个消息对应的任务都很快就能执行完。这样的话其中有个worker就会一直都很繁忙,另外一个worker几乎不做任务。RabbitMQ不会去对这种现象做任何处理,依然均匀的去推送消息。

    这是因为RabbitMQ在消息被生产者推送过来后就被推送到消费者端,它不会去查看未接收到消费者确认的消息数量。它只会把N个消息均与的分发到N个消费者那。

    为了能解决这个问题,我们可以使用basicQos放来来设置消费者最多会同时接收多少个消息。这里设置为1,表示RabbitMQ同一时间发给消费者的消息不超过一条。这样就能保证消费者在处理完某个任务,并发送确认信息后,RabbitMQ才会向它推送新的消息,在此之间若是有新的消息话,将会被推送到其它消费者,若所有的消费者都在处理任务,那么就会等待。,这里我们来看下消费者代码的修改

    我们来看下代码,注意修改消费者的代码

    package com.rabbitmq.test2;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端1接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    消费者的代码需要修改下面的三个地方

    1、将消费的自动ack修改为需要人为的手动ack

       channel.basicConsume(QUEUE_NAME, false, consumer);

    2、增加手动ack消息的代码,ack不需要批量回复

    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
    channel.basicAck(envelope.getDeliveryTag(), false);

     3、增加代码 channel.basicQos(1);

    修改之后我们来看下日志的打印

     

    RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe

    上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者。本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)。

    接下来我们实现发布一条消息,多个消费者都能够接受到

    生产者者可以把消息发送到exchange(消息交换机)上。exchange是一个很简单的事情,它一边接收生产者的消息,另一边再把消息推送到消息队列中。Exchange必须知道在它接收到一条消息时应该怎么去处理。应该把这条消息推送到指定的消息队列中?还是把消息推送到所有的队列中?或是把消息丢掉?这些规则都可以用exchange类型来定义。

    有一些可用的exchange类型:direct, topic, headers和fanout。这里我们主要看最后一个:fanout,这里我们创建一个名字为logs、类型为fanout的exchange:

    channel.exchangeDeclare("logs", "fanout");

    fanout类型的exchange是很简单的。就是它把它能接收到的所有消息广播到它知道的所有队列中

    我们来看下代码

    发送日志消息的生产者程序和之前的程序没有太多的差别。最大的区别就是我们把消息推送到一个命名的exchange上,而不是之前未命名的默认exchange。在我们发送消息时需要提供一个routingKey,但对于fanout类型的exchange可以忽略。下边是生产者的代码EmitLog.java:

    package com.rabbitmq.test3;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            //channel发送消息到交换机,采用fanout类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //
            for(int i =0 ;i < 5;i++){
                String message = "hello world"+i;
                channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());
                System.out.println(" 生产消息:‘" + message + "");
                Thread.sleep(i);
                
            }
            
          channel.close();
          connection.close();
        
        }
    
    }

    改动点有下面的三处:

    1、channel与交换机进行绑定,数据channel发送到Exchange中,交换机采用fanout类型,把消息发送到全部和改Exchange绑定的队列中

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    2、发送消息的时候,是发送给交换机,不再是发送给消息队列

     channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());

    我们再来看消费者的代码

    1、消费者1的队列是fanout1,需要将fanout1队列和生产者的Exchange进行绑定

    2、消费者2的队列是fanout2,需要将fanout2队列和生产者的Exchange进行绑定

    2、生产者发布了5条消息,消费者1和消费者2都能够收到这5条消息,我们来看下代码

    package com.rabbitmq.test3;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout1";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "");
            
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端1接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    package com.rabbitmq.test3;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer2 {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout2";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            //设置消费者预取得消费的数量
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "");
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端2接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    我们来看下日志的打印信息如下

     

    RabbitMQ学习总结 第五篇:路由Routing

    上面的publish模式中,Exchange会把全部的消息发送给与之绑定的队列中,下面我们可以采用路由的模式,Exchange只把消息发送到指定的队列中,使用direct模式

    这里生产者需要做下面的修改,就是需要发送消息的时候,需要指定交换机依据那种bingkey进行发送,这里制定bingkey为error

    package com.rabbitmq.test4;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            //channel发送消息到交换机,采用fanout类型
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //
            for(int i =0 ;i < 5;i++){
                String message = "hello world"+i;
                channel.basicPublish(EXCHANGE_NAME,"error", null, message.getBytes());
                System.out.println(" 生产消息:‘" + message + "");
                Thread.sleep(i);
                
            }
            
          channel.close();
          connection.close();
        
        }
    
    }

    生产者修改的代码如下:

    1、指定交换机类型为 channel.exchangeDeclare(EXCHANGE_NAME, "direct");

    2、指定交换机和队列绑定的key为error

     channel.basicPublish(EXCHANGE_NAME,"error", null, message.getBytes());

    消费者的代码需要修改为,当消费者队列与交换机绑定的时候,需要指定对于的bingkey

    消费者1的代码为,bingkey为info,  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info");

    package com.rabbitmq.test4;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout1";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info");
            
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端1接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    消费者2的代码为,bingkey为error,  channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "error");

    package com.rabbitmq.test3;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer2 {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout2";
        private static final String EXCHANGE_NAME = "public-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            //设置消费者预取得消费的数量
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "info");
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端2接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    因为消费者2指定的bingkey和生产者中的bingkey一样,所以消费者2能够收到生产者发送的数据,消费者1收不到,消费者1没有任何日志的打印

    我们来看下日志

     RabbitMQ学习总结 第六篇:Topic类型的exchange

    上面的交换机中的direct类型中,bingkey都必须是很明确的error或者info,如果bingkey要想使用通配符的模式,能否实现了,可以使用交换机的Topic模式

    这些binding可以总结为:

    • Q1对所有橘色的(orange)的动物感兴趣;
    • Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。

    一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。

    如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。

    我们来看下代码

    生产者的代码

    package com.rabbitmq.test5;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class MessageProvider {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "worker";
        private static final String EXCHANGE_NAME = "topic-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
    
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
    
            //获得连接:Rabbitmq中对于的connection
            Connection connection = factory.newConnection();
            //从connection中获得对应的channel
            Channel channel = connection.createChannel();
            //channel发送消息到交换机,采用fanout类型
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //
            for(int i =0 ;i < 5;i++){
                String message = "hello world"+i;
                channel.basicPublish(EXCHANGE_NAME,"lazy.1", null, message.getBytes());
                System.out.println(" 生产消息:‘" + message + "");
                Thread.sleep(i);
                
            }
            
          channel.close();
          connection.close();
        
        }
    
    }

    生产者使用topic模式,bingkey为"lazy.1",那么消费者的bingkey为lazy.1和lazy.*都可以收到生产者发送的消息,*表示通配符全部所有的意思

    我们来看消费者的全部代码为

    package com.rabbitmq.test5;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout1";
        private static final String EXCHANGE_NAME = "topic-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "lazy.1");
            
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端1接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //true  异步接收消息
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    package com.rabbitmq.test5;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    public class MessageConsumer2 {
    
        private static final String HOST = "127.0.0.1";
    
        private static final int PORT = 5672; // rabbitmq端口是5672
    
        private static final String USERNAME = "admin";
    
        private static final String PASSWORD = "123456";
    
        private static final String QUEUE_NAME = "fanout2";
        private static final String EXCHANGE_NAME = "topic-exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST);
            factory.setPort(PORT);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            factory.setVirtualHost("vhost01");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            //设置消费者预取得消费的数量
            channel.basicQos(1);
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //把队列绑定到交换机中
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME, "lazy.*");
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    //模拟业务处理需要200毫秒
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    String message = new String(body, "UTF-8");
                    System.out.println("消费端2接收消息:" + message);
                    //envelope.getDeliveryTag()获得当前需要收到ack的消息的编号
                    //false表示仅仅ack当前的消费,如果内存中给还存在多条消费没有ack,是否批量回复。false表示不批量回复
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            //这里第二个参数要设置为false,表示取消自动ack,需要手动设置ack
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
  • 相关阅读:
    JAVA合并两个有序的单链表,合并之后的链表依然有序
    excel如何将一个单元格内容拆分成多个单元格?(用到了数据->分列)
    Navicat导入excel的xlsx文件提示无法打开文件
    Request对象实现请求转发
    MessageFormat.format()和String.format()
    使用Servlet动态生成验证码
    Http协议
    使用freemarker导出word
    java注解学习(1)注解的作用和三个常用java内置注解
    SSM_CRUD新手练习(6)分页后台控制器编写
  • 原文地址:https://www.cnblogs.com/kebibuluan/p/13061319.html
Copyright © 2011-2022 走看看