zoukankan      html  css  js  c++  java
  • Spring Boot----RabbiMQ基础

    概述

    1.大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力

    2.消息服务中两个重要概念:

      消息代理(message broker,消息中间件服务器)和目的地(destination)

      当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

    3.消息队列主要有两种形式的目的地

      1.队列(queue):点对点消息通信(point-to-point)

      2.主题(topic):发布(publish)/订阅(subscribe)消息通信

    4.点对点式:

      -消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列

      -消息只有唯一的发送者和接受者,但并不是说只能有一个接收者

    5.发布订阅式:

      -发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

    6.JMS(Java Message Service)JAVA消息服务:

      Java 消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于java操作消息队列(类似jdbc操作数据库)

      -基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现

    7.AMQP(Advanced Message Queuing Protocol)

      -高级消息队列协议,也是一个消息代理的规范,兼容JMS

      -RabbitMQ是AMQP的实现

    RabbitMQ简介

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

    核心概念

    • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。Exchange有4种类型:direct(默认),fanout,topic,和headers,不同类型的Exchange转发消息的策略有所区别
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    • Producer:消息生产者,即生产方客户端,生产方客户端将消费发送到MQ。
    • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
    • Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优凭权)、delivery-mode(指出该消息可能需要持久性存储)等。
    • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定成的路由表。Exchange 和Queue的绑定可以是多对多的关系。
    • Connection:网络连接,比如一个TCP连接。
    • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这此新作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常的销,所以引入了信道的概念,以复用一条 TCP连接。
    • Virtual Host:虚拟主机,默认的虚拟机名字是"/",可以新虚拟多个RabbiMQ,每一个虚拟机互相不影响;

    RabbitMQ下载安装

    linux通过docker安装

    docker pull rabbitmq
    docker run-d-p 5672:5672-p 15672:15672 --name myrabbitmq ID

    window:

    1、 安装Erlang语言运行环境:https://www.erlang.org/downloads

    2、安装RabbitMQ:https://www.rabbitmq.com/download.html

    Erlang和MQ的版本需要匹配

    使用:(https://www.cnblogs.com/sunzhao/p/8336232.html

      rabbitmqctl.bat start_app  //启动服务(window系统服务中可以设置自动启动)
      rabbitmqctl status     //查看状态
      rabbitmq-plugins enable rabbitmq_management  //在web界面查看和管理RabbitMQ服务
    

      

    RabbitMQ 测试使用

    1、添加交换器

    2、添加4个队列

    3、添加绑定

    给之前添加的Exchange direct和Exchange fanout 绑定队列

    给之前添加的Exchange topic 添加绑定规则("#"表示匹配0个或者多个单词,*:表示匹配一个单词)

    4、测试

    4.1 测试 Exchange direct

    是点对点的方法发送消息到对应的队列

    只有queue1 匹配

     

    4.2 测试 Exchange fanout

    是以广播的形式发送消息,只要给某一个队列发送消息,其他的队列都能收到

    其他的每一个队列都增加了一条消息

    4.3 测试 Exchange topic

    根据路由键的匹配规则发送消息

     只有 #   * 匹配成功,queue#匹配不成功。(#,* 前面不需要加上任何的修饰,表示匹配某一个单词,但是可以写xxx.*  xxx.#)

    4.1 消息队列可以应答消息

    RabbitMq入门

    test-rabbitmq-producer

    pom

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

    RabbitMQ工作模式

    1、work queues(工作队列模式)

                       

      work queues与入门程序相比,多了一个消费端(我们直接在启动一个消费者即可实现),两个消费端共同消费同一个队列中的消息,它的特点如下:

    • 一个生产者将消息发给一个队列
    • 多个消费者共同监听一个队列的消息
    • 消息不能被重复消费(只能由一个消费者使用)
    • rabbit采用轮询的方式将消息是平均发送给消费者

    producter

    • 1)创建连接
    • 2)创建通道
    • 3)声明队列
    • 4)发送消息

      运行后,访问http://localhost:15672,可以发现RabbitMQ中多了一个hellowword的队列,并且队列中有一条数据等待消费者来获取

      注意:生产者往消息队列中发送消息之后,完全可以关闭连接

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Producer01 {
        //队列名字
        private static final String QUEUE = "helloworld";
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,RabbiMQ中新建的队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE,true,false,false,null);
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                //消息内容
                String message = "hello world 黑马程序员";
                channel.basicPublish("",QUEUE,null,message.getBytes());
                System.out.println("send to mq "+message);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }  
    Producer

    consumer

    • 1)创建连接
    • 2)创建通道
    • 3)声明队列
    • 4)监听队列
    • 5)接收消息
    • 6)ack回复

      可以创建多个消费者

      注意:消费者不需要关闭连接,他需要一直监听队列等待生产者往队列中发送消息

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 入门程序消费者
     * @author Administrator
     * @version 1.0
     * @create 2018-06-17 9:25
     **/
    public class Consumer01 {
        //队列(消费者声明队列的原因是,如果没有队列的话可以创建一个队列,不至于监听队列报错,生产者声明队列的原因同理)
        //但是需要和生产者的队列名字相同
        private static final String QUEUE = "helloworld";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            //监听队列
            //声明队列,如果队列在mq 中没有则要创建
            //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            //如果队列存在就不创建,没有就创建;
            channel.queueDeclare(QUEUE,true,false,false,null);
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于手动编程确认消息已接收(当autoAck(自动回复)设置为false)
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false,需要通过编程实现回复,如果不回复,则消息一直存在消息队列中
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE,true,defaultConsumer);
        }
    }
    
    Consumer
    Consumer

    2、Publish/subscribe(发布订阅模式)

                     

    • 1、一个生产者将消息发给交换机
    • 2、与交换机绑定的有多个队列,每个消费者监听自己的队列。
    • 3、生产者将消息发给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
    public class Producer02_publish {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
    
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                for(int i=0;i<5;i++){
                    //消息内容
                    String message = "send inform message to user";
                    channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    Producer02_publish
    public class Consumer02_subscribe_email {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    
        }
    }
    Consumer02_subscribe_email
    public class Consumer02_subscribe_sms {
        //队列名称
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    
        }
    }
    Consumer02_subscribe_sms

    3、Routing(路由模式)

                  

    • 1、每个消费者监听自己的队列,并且设置routingkey,可以设置多个routingKey.
    • 2、生产者将消息发给交换机,发送消息时需要指定routingkey,由交换机根据routingkey来转发消息到指定的队列。
    public class Producer03_routing {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
        private static final String ROUTINGKEY_EMAIL="inform_email";
        private static final String ROUTINGKEY_SMS="inform_sms";
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
               /* for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send sms inform message to user";
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());
                    System.out.println("send to mq "+message);
                }*/
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send inform message to user";
                    channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    Producer03_routing
    public class Consumer03_routing_email {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
        private static final String ROUTINGKEY_EMAIL="inform_email";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    
        }
    }
    Consumer03_routing_email
    public class Consumer03_routing_sms {
        //队列名称
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
        private static final String ROUTINGKEY_SMS="inform_sms";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    
        }
    }
    Consumer03_routing_sms

    4、Topics(通配符模式)

                           

      1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

    符号#:匹配一个或者多个词,比如 inform.# 可以匹配 inform.sms、inform.emai1、inform.email.sms
    符号*:只能匹配一个词,比如 inform.* 可以匹配 inform.sms、inform.email
    
    public class Producer04_topics {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
        private static final String ROUTINGKEY_SMS="inform.#.sms.#";
        public static void main(String[] args) {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            Connection connection = null;
            Channel channel = null;
            try {
                //建立新连接
                connection = connectionFactory.newConnection();
                //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
                channel = connection.createChannel();
                //声明队列,如果队列在mq 中没有则要创建
                //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
                /**
                 * 参数明细
                 * 1、queue 队列名称
                 * 2、durable 是否持久化,如果持久化,mq重启后队列还在
                 * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
                 * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
                 * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
                 */
                channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
                channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
                //声明一个交换机
                //参数:String exchange, String type
                /**
                 * 参数明细:
                 * 1、交换机的名称
                 * 2、交换机的类型
                 * fanout:对应的rabbitmq的工作模式是 publish/subscribe
                 * direct:对应的Routing    工作模式
                 * topic:对应的Topics工作模式
                 * headers: 对应的headers工作模式
                 */
                channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
                //进行交换机和队列绑定
                //参数:String queue, String exchange, String routingKey
                /**
                 * 参数明细:
                 * 1、queue 队列名称
                 * 2、exchange 交换机名称
                 * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
                 */
                channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
                channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
                //发送消息
                //参数:String exchange, String routingKey, BasicProperties props, byte[] body
                /**
                 * 参数明细:
                 * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
                 * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
                 * 3、props,消息的属性
                 * 4、body,消息内容
                 */
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send email inform message to user";
                    channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send sms inform message to user";
                    channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
                for(int i=0;i<5;i++){
                    //发送消息的时候指定routingKey
                    String message = "send sms and email inform message to user";
                    channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                    System.out.println("send to mq "+message);
                }
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭连接
                //先关闭通道
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    Producer04_topics
    public class Consumer04_topics_email {
        //队列名称
        private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
        private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    
        }
    }
    Consumer04_topics_email
    public class Consumer04_topics_sms {
        //队列名称
        private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
        private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
        private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //通过连接工厂创建新的连接和mq建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);//端口
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
            connectionFactory.setVirtualHost("/");
    
            //建立新连接
            Connection connection = connectionFactory.newConnection();
            //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
            Channel channel = connection.createChannel();
    
            /**
             * 参数明细
             * 1、queue 队列名称
             * 2、durable 是否持久化,如果持久化,mq重启后队列还在
             * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
             * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
             * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
             */
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //声明一个交换机
            //参数:String exchange, String type
            /**
             * 参数明细:
             * 1、交换机的名称
             * 2、交换机的类型
             * fanout:对应的rabbitmq的工作模式是 publish/subscribe
             * direct:对应的Routing    工作模式
             * topic:对应的Topics工作模式
             * headers: 对应的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //进行交换机和队列绑定
            //参数:String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、exchange 交换机名称
             * 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串
             */
            channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
    
            //实现消费方法
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
    
                /**
                 * 当接收到消息后此方法将被调用
                 * @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
                 * @param envelope 信封,通过envelope
                 * @param properties 消息属性
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机
                    String exchange = envelope.getExchange();
                    //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息内容
                    String message= new String(body,"utf-8");
                    System.out.println("receive message:"+message);
                }
            };
    
            //监听队列
            //参数:String queue, boolean autoAck, Consumer callback
            /**
             * 参数明细:
             * 1、queue 队列名称
             * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
             * 3、callback,消费方法,当消费者接收到消息要执行的方法
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    
        }
    }
    Consumer04_topics_sms

    5、Header模式

      header模式与routing不同的地方在于,header模式取消routingkey,使用header中的key/value(键值对)匹配队列。

    生产者

    Map<String, Object> headers_email = new Hashtable<String, Object>(); 
    headers_email.put("inform_type", "email");
    Map<String, Object> headers_sms = new Hashtable<String, Object>(); 
    headers_sms.put("inform_type", "sms"); 
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
    String message = "email inform to user"+i;
    Map<String,Object> headers = new Hashtable<String, Object>(); 
    headers.put("inform_type", "email");//匹配email通知消费者绑定的header 
    //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header 
    AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); 
    properties.headers(headers); 
    //Email通知 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

    消费者

    channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM,BuiltinExchangeType.HEADERS); 
    Map<String, Object> headers_email = new Hashtable<String, Object>(); 
    headers_email.put("inform_email", "email"); 
    //交换机和队列绑定 
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 
    //指定消费队列 
    channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

    6、RPC

           

    RPC即客户端远程调用服务端的方法,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

    • 1、客户端既是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
    • 2、服务端(消费者)监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
    • 3、服务端将RPC方法的结果发送到RPC响应队列
  • 相关阅读:
    通用权限管理设计 之 数据库结构设计
    jQuery LigerUI 插件介绍及使用之ligerDateEditor
    jQuery LigerUI 插件介绍及使用之ligerTree
    jQuery LigerUI V1.01(包括API和全部源码) 发布
    jQuery liger ui ligerGrid 打造通用的分页排序查询表格(提供下载)
    jQuery LigerUI V1.1.5 (包括API和全部源码) 发布
    jQuery LigerUI 使用教程表格篇(1)
    jQuery LigerUI V1.0(包括API和全部源码) 发布
    jQuery LigerUI V1.1.0 (包括API和全部源码) 发布
    nginx keepalived
  • 原文地址:https://www.cnblogs.com/yanxiaoge/p/11379715.html
Copyright © 2011-2022 走看看