zoukankan      html  css  js  c++  java
  • RabbitMQ六种工作模式的对比与实践

    最近学习RabbitMQ的使用方式,记录下来,方便以后使用,也方便和大家共享,相互交流。

    RabbitMQ的六种工作模式:

    1、Work queues
    2、Publish/subscribe
    3、Routing
    4、Topics
    5、Header 模式
    6、RPC

    一、Work queues

    多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者;

     特点:

    1、一条消息只会被一个消费端接收;

    2、队列采用轮询的方式将消息是平均发送给消费者的;

    3、消费者在处理完某条消息后,才会收到下一条消息

    生产端:

    1、声明队列

    2、创建连接

    3、创建通道

    4、通道声明队列

    5、制定消息

    6、发送消息,使用默认交换机

    消费端:

    1、声明队列

    2、创建连接

    3、创建通道

    4、通道声明队列

    5、重写消息消费方法

    6、执行消息方法

    新建两个maven工程,生产消息的生产端,消费消息的消费端;

    pom.xml文件中依赖坐标如下:

    <dependencies>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
                <version>2.1.0.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.7.0</version>
            </dependency>
    </dependencies>

     生产端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /*
    1、声明队列
    2、创建连接
    3、创建通道
    4、通道声明队列
    5、制定消息
    6、发送消息,使用默认交换机
    */
    public class Producer02 {
        //声明队列
        private static final String QUEUE ="queue";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");//mq服务ip地址
                connectionFactory.setPort(5672);//mq client连接端口
                connectionFactory.setUsername("guest");//mq登录用户名
                connectionFactory.setPassword("guest");//mq登录密码
                connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
                //创建与RabbitMQ服务的TCP连接
                connection = connectionFactory.newConnection();
                //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
                channel = connection.createChannel();
    
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE,true,false,false,null);//通道绑定邮件队列
    
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                    System.out.println("mq消息发送成功!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    消费端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /*
    1、声明队列
    2、创建连接
    3、创建通道
    4、通道声明队列
    5、重写消息消费方法
    6、执行消息方法
    */
    public class Consumer02 {
        private static final String QUEUE ="queue";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE,true,false,false,null);//通道绑定邮件队列
    
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE,true,consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    生产端启动后,控制台打印信息如下:

     RabbitMQ中的已有消息:

     queue中的消息正是生产端发送的消息:

     二、Publish/subscribe 模式

    这种模式又称为发布订阅模式,相对于Work queues模式,该模式多了一个交换机,生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息。

    发布订阅模式:

    1、每个消费者监听自己的队列;

    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
    到消息

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    生产端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机

    7、制定消息

    8、发送消息

    消费端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机

    7、重写消息消费方法

    8、执行消息方法

    Publish/subscribe 模式绑定两个消费端,因此需要有两个消费端,一个邮件消费端,一个短信消费端;

    生产端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer01 {
        //声明两个队列和一个交换机
        //Publish/subscribe发布订阅模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");//mq服务ip地址
                connectionFactory.setPort(5672);//mq client连接端口
                connectionFactory.setUsername("guest");//mq登录用户名
                connectionFactory.setPassword("guest");//mq登录密码
                connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
                //创建与RabbitMQ服务的TCP连接
                connection = connectionFactory.newConnection();
                //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Publish/subscribe发布订阅模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Publish/subscribe发布订阅模式
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
                channel.queueBind(QUEUE_SMS,EXCHANGE,"");
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    //Publish/subscribe发布订阅模式
                    channel.basicPublish(EXCHANGE,"",null,message.getBytes());
                    System.out.println("mq消息发送成功!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    邮件消费端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer01 {
        //Publish/subscribe发布订阅模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Publish/subscribe发布订阅模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Publish/subscribe发布订阅模式
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消费者接收消息调用此方法
                  * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                  * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                    (收到消息失败后是否需要重新发送)
                  * @param properties
                  * @param body
                  * @throws IOException
                  * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                  */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_EMAIL,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    短信消费端的代码如下:

    package xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer01 {
        //Publish/subscribe发布订阅模式
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Publish/subscribe发布订阅模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Publish/subscribe发布订阅模式
                channel.queueBind(QUEUE_SMS,EXCHANGE,"");
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消费者接收消息调用此方法
                  * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                  * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                    (收到消息失败后是否需要重新发送)
                  * @param properties
                  * @param body
                  * @throws IOException
                  * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                  */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_SMS,true,consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    三、Routing 路由模式

    Routing 模式又称路由模式,该种模式除了要绑定交换机外,发消息的时候还要制定routing key,即路由key,队列通过通道绑定交换机的时候,需要指定自己的routing key,这样,生产端发送消息的时候也会指定routing key,通过routing key就可以把相应的消息发送到绑定相应routing key的队列中去。

    路由模式:

    1、每个消费者监听自己的队列,并且设置routingkey;
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    生产端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机并指定该队列的routingkey

    7、制定消息

    8、发送消息并指定routingkey

    消费端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机并指定routingkey

    7、重写消息消费方法

    8、执行消息方法

    按照假设的应用场景,同样,Routing 路由模式也是一个生产端,两个消费端,所不同的是,声明交换机的类型不同,队列绑定交换机的时候需要指定Routing key,发送消息的时候也需要指定Routing key,这样根据Routing key就能把相应的消息发送到相应的队列中去。

    生产端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer03 {
        //声明两个队列和一个交换机
        //Routing 路由模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");//mq服务ip地址
                connectionFactory.setPort(5672);//mq client连接端口
                connectionFactory.setUsername("guest");//mq登录用户名
                connectionFactory.setPassword("guest");//mq登录密码
                connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
                //创建与RabbitMQ服务的TCP连接
                connection = connectionFactory.newConnection();
                //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                 * 参数明细
                 * 1、交换机名称
                 * 2、交换机类型,fanout、topic、direct、headers
                 */
                //Routing 路由模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Routing 路由模式
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
                channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
                //给email队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送email消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    //Routing 路由模式
                    channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
                    System.out.println("mq消息发送成功!");
                }
                //给sms队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送sms消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    //Routing 路由模式
                    channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
                    System.out.println("mq消息发送成功!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    邮件消费端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer03 {
        //Routing 路由模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Routing 路由模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Routing 路由模式
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_EMAIL,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    短信消费端的代码如下:

    package xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer03 {
        //Routing 路由模式
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Routing 路由模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                //Routing 路由模式
                channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_SMS,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    四、Topics 模式

    Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。

    路由模式:

    1、每个消费者监听自己的队列,并且设置带统配符的routingkey

    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    生产端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机并指定该队列的routingkey(通配符)

    7、制定消息

    8、发送消息并指定routingkey(通配符)

    消费端:

    1、声明队列,声明交换机

    2、创建连接

    3、创建通道

    4、通道声明交换机

    5、通道声明队列

    6、通过通道使队列绑定到交换机并指定routingkey(通配符)

    7、重写消息消费方法

    8、执行消息方法

    按照假设的应用场景,Topics 模式也是一个生产端,两个消费端,生产端队列绑定交换机的时候,需要指定的routingkey是通配符,发送消息的时候绑定的routingkey也是通配符,消费端队列绑定交换机的时候routingkey也是通配符,这样就能根据通配符匹配到消息了。

    生产端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer04 {
        //声明两个队列和一个交换机
        //Topics 模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");//mq服务ip地址
                connectionFactory.setPort(5672);//mq client连接端口
                connectionFactory.setUsername("guest");//mq登录用户名
                connectionFactory.setPassword("guest");//mq登录密码
                connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
                //创建与RabbitMQ服务的TCP连接
                connection = connectionFactory.newConnection();
                //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Topics 模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
                channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
                //给email队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送email消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());
                    System.out.println("mq email 消息发送成功!");
                }
                //给sms队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送sms消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());
                    System.out.println("mq sms 消息发送成功!");
                }
                //给email和sms队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送email sms消息。。。");
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());
                    System.out.println("mq email sms 消息发送成功!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    邮件消费端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer04 {
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_EMAIL,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    短信消费端的代码如下:

    package xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer04 {
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 */
                channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_SMS,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    由于生产端同时发送了email的消息(10条),sms的消息(10条),email和sms同时收到的消息(10条),所以每个消费端都应收到各自的10条消息,加上同时都能收到的10条消息,每个消费端应该收到20条消息;

    生产端控制台打印:

     邮件消费端控制台打印:

     短信消费端的控制台打印:

     生产端执行后,RabbitMQ上的消息队列情况:

     两个消费端执行完后,RabbitMQ上的消息队列情况:

     五、Header 模式

    header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

    案例:

    根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

    根据假设使用场景,需要一个生产端,两个消费端,不同的是,生产端声明交换机时,交换机的类型不同,是headers类型,生产端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,发送消息时也是使用header中的 key/value(键值对)匹配队列。

    消费端同样是声明交换机时,交换机的类型不同,是headers类型,消费端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,消费消息时也是使用header中的 key/value(键值对)匹配队列。

    生产端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    
    import java.io.IOException;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Producer05 {
        //声明两个队列和一个交换机
        //Header 模式
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");//mq服务ip地址
                connectionFactory.setPort(5672);//mq client连接端口
                connectionFactory.setUsername("guest");//mq登录用户名
                connectionFactory.setPassword("guest");//mq登录密码
                connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
                //创建与RabbitMQ服务的TCP连接
                connection = connectionFactory.newConnection();
                //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                //Header 模式
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定短信队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 * 4、
                 * String queue, String exchange, String routingKey, Map<String, Object> arguments
                 */
                Map<String,Object> headers_email = new Hashtable<String,Object>();
                headers_email.put("inform_type","email");
                Map<String,Object> headers_sms = new Hashtable<String, Object>();
                headers_sms.put("inform_type","sms");
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
                channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);
                //给email队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送email消息。。。");
                    Map<String,Object> headers = new Hashtable<String,Object>();
                    headers.put("inform_type","email");//匹配email通知消费者绑定的header
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                    properties.headers(headers);
                    //Email通知
                    channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                    System.out.println("mq email 消息发送成功!");
                }
                //给sms队列发消息
                for(int i = 0;i<10;i++){
                    String message = new String("mq 发送sms消息。。。");
                    Map<String,Object> headers = new Hashtable<String,Object>();
                    headers.put("inform_type","sms");//匹配sms通知消费者绑定的header
                    /**
                      * 消息发布方法
                      * param1:Exchange的名称,如果没有指定,则使用Default Exchange
                      * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                      * param3:消息包含的属性
                      * param4:消息体
                      * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
                      * 默认的交换机,routingKey等于队列名称
                     */
                    //String exchange, String routingKey, BasicProperties props, byte[] body
                    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                    properties.headers(headers);
                    //sms通知
                    channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                    System.out.println("mq sms 消息发送成功!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    邮件消费端的代码如下:

    package com.xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer05 {
        private static final String QUEUE_EMAIL ="queueEmail";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                              * 参数明细
                              * 1、交换机名称
                              * 2、交换机类型,fanout、topic、direct、headers
                              */
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 * 4、
                 * String queue, String exchange, String routingKey, Map<String, Object> arguments
                 */
                Map<String,Object> headers_email = new Hashtable<String,Object>();
                headers_email.put("inform_email","email");
                channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_EMAIL,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    短信消费端的代码如下:

    package xyfer;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer05 {
        private static final String QUEUE_SMS ="queueSms";
        private static final String EXCHANGE = "messageChange";
        public static void main(String[] args) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("127.0.0.1");
                connectionFactory.setPort(5672);
                connection = connectionFactory.newConnection();
                channel = connection.createChannel();
                //通道绑定交换机
                /**
                  * 参数明细
                  * 1、交换机名称
                  * 2、交换机类型,fanout、topic、direct、headers
                  */
                channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
                //通道绑定队列
                /**
                 * 声明队列,如果Rabbit中没有此队列将自动创建
                 * param1:队列名称
                 * param2:是否持久化
                 * param3:队列是否独占此连接
                 * param4:队列不再使用时是否自动删除此队列
                 * param5:队列参数
                 * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                 *
                 */
                channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道绑定邮件队列
                //交换机和队列绑定
                /**
                 * 参数明细
                 * 1、队列名称
                 * 2、交换机名称
                 * 3、路由key
                 * 4、
                 * String queue, String exchange, String routingKey, Map<String, Object> arguments
                 */
                Map<String,Object> headers_email = new Hashtable<String,Object>();
                headers_email.put("inform_email","sms");
                channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email);
                //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                DefaultConsumer consumer = new DefaultConsumer(channel) {
                    /**
                      * 消费者接收消息调用此方法
                      * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
                      * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                        (收到消息失败后是否需要重新发送)
                      * @param properties
                      * @param body
                      * @throws IOException
                     * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        //交换机
                        String exchange = envelope.getExchange();
                        //路由key
                        String routingKey = envelope.getRoutingKey();
                        envelope.getDeliveryTag();
                        String msg = new String(body,"utf-8");
                        System.out.println("mq收到的消息是:"+msg );
                    }
    
                };
                System.out.println("消费者启动成功!");
                channel.basicConsume(QUEUE_SMS,true,consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

    生产端启动后RabbitMQ上面的消息队列情况:

    六、RPC 模式

     RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

    1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

    2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

    3、服务端将RPC方法 的结果发送到RPC响应队列。

    4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

    至此,RabbitMQ的六种工作模式已经介绍完毕,手动代码实现,实际体验六种工作模式的不同。

  • 相关阅读:
    对于大流量网站怎样解决访问量的问题
    服务器中 配置phpstudy一键安装包
    [转]MySQLHelper类
    [转]C#连接操作mysql实例
    设计模式—观察者模式
    asp.net mvc 中的自定义验证(Custom Validation Attribute)
    软件行业名称缩写
    设计模式—原型模式
    设计模式—建造者模式
    添加asp.net mvc到现有的asp.net web form 应用程序
  • 原文地址:https://www.cnblogs.com/xyfer1018/p/11581511.html
Copyright © 2011-2022 走看看