zoukankan      html  css  js  c++  java
  • RabbitMQ交换机工作模式及示例

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
    它可以使对应的客户端(client)与对应的消息中间件(broker)进行交互。消息中间件从发布者(publisher)那里收到消息(发布消息的应用,也称为producer),然后将他们转发给消费者(consumers,处理消息的应用)。由于AMQP是一个网络协议,所以发布者、消费者以及消息中间件可以部署到不同的物理机器上面。

    SpringBoot整合RabbitMq

    应用依赖

    //mq必须的
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    //web应用
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    //yml格式的配置文件依赖包
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>

    配置文件

    application.yml配置文件

    //本服务启动的端口号
    server:
        port: 8090
    
    spring:
      rabbitmq:
          addresses: 192.168.100.10:5672,192.168.100.11:5672
          username: xxx
          password: xxx

    简单配置简单使用

    生产者,消费者

    先在mq的控制台创建一个queueName队列

    生产者:

    @Controller
    public class ProducerDemo {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        @RequestMapping("/send")
        @ResponseBody
        public String send() {
            String context = "hello==========" + new Date(); 
            //往"queueName"队列里面发送消息(先在mq的控制台创建一个queueName队列)
            this.rabbitTemplate.convertAndSend("queueName", context);
            return "success";
        }
    }

    消费者

    @Component
    @RabbitListener(queues = "queueName") //监听queueName这个队列
    public class ConsumerDemo {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }

    springboot配置是不是很简单,整个整合都非常简单。

    用代码创建一个队列

    用代码创建一个队列,而不是在mq管理控制台手动创建。so easy.

    @Configuration
    public class RabbitMQConfig {
    
        @Bean
        public Queue queue() {
            return new Queue("queueName");
        }
    }

    spring在启动时会扫描到Configuration这个注解是一个配置文件的注解。在发送者发送消息时,发现没有这个队列,才会创建这个队列。

    高级玩法

    时候我们有多个mq,自己组的mq队列啊,交换机啊,虚拟内存之类的,但有时候会用到其他组的mq,他们的配置信息和我们的完全不一样,这样时候直接使用spring自带的集成模式就难于满足我们。但是spring没那么傻,它提供了配置文件,你可以通过注解配置来实现多个mq不同的mq来做设置。

    @Configuration
    public class RabbitConfig {
        @Value("${rabbitmq.queue.group}")
        private String groupQueueName;
        @Value("${rabbitmq.exchange}")
        private String exchangeName;
        @Value("${rabbitmq.addresses}")
        private String address;
        @Value("${rabbitmq.port}")
        private Integer port;
        @Value("${rabbitmq.username}")
        private String userName;
        @Value("${rabbitmq.pw}")
        private String password;
        @Value("${rabbitmq.virtual}")
        private String virtualHost;
    
       //创建连接工厂
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setPort(port);
            connectionFactory.setAddresses(address);
            connectionFactory.setUsername(userName);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setConnectionTimeout(1000);
            return connectionFactory;
        }
    
        //创建连接工厂的一个ampg管理
        @Bean
        public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        Queue queue() {
            return new Queue(groupQueueName, true);
        }
    
        //创建一个topic交换机,使用的是amqpAdmin来管理。
        @Bean
        TopicExchange exchange(AmqpAdmin amqpAdmin) {
            TopicExchange exchange = new TopicExchange(exchangeName);
            exchange.setAdminsThatShouldDeclare(amqpAdmin);
            return exchange;
        }
    
        //创建一个模版,绑定的是connectionFactory这个工厂。
        @Bean
        RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            return new RabbitTemplate(connectionFactory);
        }
    
    
        //创建第二个连接工厂
        @Bean
        public ConnectionFactory tempConnectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setPort(port);
            connectionFactory.setAddresses(address);
            connectionFactory.setUsername(userName);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost("temp");
            connectionFactory.setConnectionTimeout(1000);
            return connectionFactory;
        }
    
        //第二个管理
        @Bean
        public AmqpAdmin tempAdmin(ConnectionFactory tempConnectionFactory) {
            return new RabbitAdmin(tempConnectionFactory);
        }
    
        //创建一个交换机,关联到tempAdmin这个上面
        @Bean
        TopicExchange tempExchange(AmqpAdmin tempAdmin) {
            TopicExchange exchange = new TopicExchange("topic.A");
            exchange.setAdminsThatShouldDeclare(tempAdmin);
            return exchange;
        }
    
        //创建第二个template
        @Bean
        RabbitTemplate tempRabbitTemplate(ConnectionFactory tempConnectionFactory) {
            return new RabbitTemplate(tempConnectionFactory);
        }
    
    
        //设置一个简单监听工厂。
        @Bean
        public SimpleRabbitListenerContainerFactory tempListenerContainerFactory(ConnectionFactory tempConnectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(tempConnectionFactory);
            return factory;
        }
    
    
        }

    第二种模式下面的监听者

    @RabbitListener(containerFactory = "tempListenerContainerFactory", bindings = {@QueueBinding(value = @Queue(value = "A.queue"),
                exchange = @Exchange(value = "topic.A", type = ExchangeTypes.TOPIC),
                key = "user.message.up")}) 
        public void process(Message message) {
        }
     }   

    注意:VirtualHost这个参数。虚拟host,我们创建的所有队列,交换机之类的东西都是在虚拟host下面进行的,在不同的虚拟host下面,他们之间互不通信,我们可以创建2个一摸一样的队列,只需要在不同的虚拟host下面。虚拟host下面就相当于物理隔绝差不多。

    上面这个例子在项目中,我们组的队列放到了一个虚拟host下面,其他组的队列放到放到了另外一个虚拟host中,导致了在开发过程中互不通信,所以需要这么配置。

    Spring配置文件整合RabbitMq

    依赖

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.6.2.RELEASE</version>
    </dependency>

    基本配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xmlns="http://www.springframework.org/schema/beans"
           xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                    http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/rabbit
                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
    
    
        <!-- 连接服务配置  -->
        <!--rabbitmq.broker.port 在spring变量的配置文件里面定义-->
        <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbitmq.broker.url}"
                                   port="${rabbitmq.broker.port}"  />
    
        <!-- 创建一个连接工厂 -->
        <rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
        <!-- 定义了一个队列queueDirect_queue -- >
        <rabbit:queue id="queueDirect" durable="true" exclusive="false" name="queueDirect_queue"/>
    
        <!--定义了一个direct交换机绑定了一个队列queueDirect_queue。定义的路由规则名称为:info_queue_key-->
        <rabbit:direct-exchange id="directQueueExchange" name="solutionInfo_exchange" durable="true">
            <rabbit:bindings>
                <rabbit:binding queue="queueDirect" key="info_queue_key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
         <!-- 定义一个template模版,指定了该模版的默认交换机为solutionInfo_exchange -->
        <rabbit:template id="helloTemplate"  exchange="solutionInfo_exchange"  connection-factory="rabbitConnectionFactory" reply-timeout="2000" />
        <!-- 生产者end -->
    
    
    
    
         <!--消费者bean -->
        <bean id="messageReceiver" class="com.rabbitmq.demo.MQDemoReceiver"></bean>
    
        <!-- 监听queueDirect_queue这个队列-->
        <rabbit:listener-container connection-factory="rabbitConnectionFactory" max-concurrency="5" >
            <rabbit:listener queues="queueDirect_queue" ref="messageReceiver"/>
        </rabbit:listener-container>
    </beans>

    配置文件其实还可以更简单点:

    <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbitmq.broker.url}"
                                   port="${rabbitmq.broker.port}"  />
        <!-- 创建一个连接工厂 -->
        <rabbit:admin connection-factory="rabbitConnectionFactory"/>
         <!-- 定义一个template模版,指定了该模版的默认交换机为solutionInfo_exchange -->
        <rabbit:template id="helloTemplate"   connection-factory="rabbitConnectionFactory" reply-timeout="2000" />

    这就完毕了。中间定义了交换机,定义了队列,其实都可以省略的,可以到mq的管理后台创建。先创建一个队列,然后创建一个交换机绑定该队列,然后定义一个路由规则就可以了。

    在配置文件里面定义队列,定义交换机有什么好处呢?

    好处就是在部署代码时,如果mq里面没有该队列,没有该交换机,在使用时就会自动创建。如果没有在配置文件里面写队列啊,交换机啊,在部署代码时,如果mq中没有该队列,交换机,在发送消息或监听消息时就会抛异常提示没有该交换机,没有该队列,发送失败。

    生产者,消费者

    生产者:

    @Component
    public class DemoProducer {
    
        @Resource(name = "helloTemplate")
        private RabbitTemplate rabbitTemplate; 
    
        public void sendProducer() {
              //生产者往info_queue_key这个路由规则里面发送消息
            rabbitTemplate.convertAndSend("info_queue_key", jsonObject.toJSONString());   
            //或者生产者往solutionInfo_exchange这个交换机,这个info_queue_key路由中发送消息。
            //rabbitTemplate.convertAndSend("solutionInfo_exchange","info_queue_key", jsonObject.toJSONString());
            //上面两者有什么区别呢?第一种少了交换机,因为在配置文件里面已经为rabbitTemplate指定了交换机,所以可以省去交换机。
            //第二种呢,可以手动指定交换机。
        }
    }

    消费者:继承MessageListener接口

    package com.rabbitmq.demo
    
    @Component
    public class MQDemoReceiver implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            try {
                byte[] bodyByte = message.getBody();
                String bodyJson = new String(bodyByte);
                log.info("bodyJson:" + bodyJson);//接收到消息喽
             } catch (Throwable e) {
                log.error("WechatMsgReceiver exception", e);
            }
    
    
        }
    }

    RabbitMq的六种模式分析详解

    官网学习地址

    上面介绍了spring怎么整合mq,下面开始讲解怎么使用。

    说了是6种模式,其实在我看来也就三大类而已。第一 直连;第二 通过交换机连接;第三 RPC模式

    Hello Word

    P代表生产者,C代表消费者,红色代码消息队列。P将消息发送到消息队列,C对消息进行处理。

    @Controller 
    public class ProducerDemo {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        @RequestMapping("/send")
        @ResponseBody
        public String send() {
            String context = "hello==========" + new Date();
            log.info("Sender : " + context);
            //生产者,正在往hello这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称
            this.rabbitTemplate.convertAndSend("hello", context);
            return "success";
        }
    }    

    消费者:

    @Component
    //监听hello这个队列
    @RabbitListener(queues = "hello")
    public class ConsumerDemo {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }

    工作模式

    一个队列有两个消费者

    在上面的基础中,添加一个消费者就OK了。

    消费者:

    @Component
    @RabbitListener(queues = "hello")//监听hello这个队列
    public class ConsumerDemo1{
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }
    //两个消费者
    @Component
    @RabbitListener(queues = "hello")//监听hello这个队列
    public class ConsumerDemo2{
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }

    当两个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息。1,2,3,4,5,6消息来了,consumer1消费了1,3,5;consumer2消费了2,4,6。这个数据是随机的哦,别理解为奇偶数。可以自己测试一下。

    一个队列中一条消息,只能被一个消费者消费

    订阅与发布模式


    生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。

    1.定义一个订阅模式的交换机:FanoutExchange交换机。
    2.创建2个队列helloA,helloB,然后将这两个队列绑定到交换机上面。

    @Configuration
    public class RabbitMQConfig {
    
         @Bean
        public Queue queueA() {
            return new Queue("helloA", true);
        }
    
        @Bean
        public Queue queueB() {
            return new Queue("helloB", true);
        }
    
        //创建一个fanoutExchange交换机
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
        //将queueA队列绑定到fanoutExchange交换机上面
        @Bean
        Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueA).to(fanoutExchange);
        }
    
        //将queueB队列绑定到fanoutExchange交换机上面
        @Bean
        Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueB).to(fanoutExchange);
        }
    
    }

    注意一个细节哦。bindingExchangeMessageFanoutA这种参数重的queueA与创建队列的方法queueA()名字要相同哦。这样才知道queueA绑定了该交换机哦。交换机的名称也同样。fanoutExchange参数的名字和fanoutExchange()名字要一样哦。

    生产者:this.rabbitTemplate.convertAndSend("fanoutExchange","", context);

    @Component
    @RabbitListener(queues = "queueA")//监听queueA这个队列
    public class ConsumerDemo1{
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }
    
    //两个消费者
    @Component
    @RabbitListener(queues = "queueB")//监听queueB这个队列
    public class ConsumerDemo2{
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver  ===================: " + hello);
        }
    }

    现在生产者发送了一条消息,会发现consumer1,2都会收到。之前不是说过一个队列里面的一条消息,只能被一个消费者消费吗?怎么现在一条消息被两个消费者消费了。要知道这里对于生产者来说是只生产了一条消息,但是它发送给了交换机,交换机会根据绑定的队列来发送。现在绑定了queueA,queueB队列,所以两个队列里面都有消息了。而消费者关注的也是两个队列,就看到了一条消息被两个消费者消费的情况了。

    路由模式

    @Configuration
    public class RabbitMQConfig {
       public static final String DIRECT_EXCHANGE = "directExchange";
       public static final String QUEUE_DIRECT_A = "direct.A";
    
        public static final String QUEUE_DIRECT_B = "direct.B";
    
       //创建一个direct交换机
         @Bean
        DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE);
        }
    
        @Bean
        Queue queueDirectNameA() {
            return new Queue(QUEUE_DIRECT_A);
        }
    
    
        //创建队列
        @Bean
        Queue queueDirectNameB() {
            return new Queue(QUEUE_DIRECT_B);
        }
    
    
        //将direct.A队列绑定到directExchange交换机中,使用direct.a.key作为路由规则
        @Bean
        Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
            return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key");
        }
    
    
        @Bean
        Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
            return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.b.key");
        }
    }    

    消费者一样,只需要监听队列就OK了。

    @Component
    public class ConsumerDemo {
    
         @RabbitListener(queues = "topic.A")
        @RabbitHandler
        public void processtopicA(String hello) {
            System.out.println("Receiver Exchanges topic.A  ===================: " + hello);
        }
    
        @RabbitListener(queues = "topic.B")
        @RabbitHandler
        public void processtopicB(String hello) {
            System.out.println("Receiver Exchanges topic.B  ===================: " + hello);
        }
    }    

    //往directExchange交换机中发送消息,使用direct.a.key作为路由规则。

    生产者:rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct.a.key", context);

    topic.A,topic.B两个队列都绑定了交换机directExchange,但他们的路由规则不同,a队列用了direct.a.key,b队列用了direct.b.key,这种情况下,生产者使用direct.a.key作为路由规则,就只有a队列能收到消息,b队列则收不到消息。

    //如果a,b队列都关联了direct.a.key路由规则,则上面的生产者发送消息时,a,b队列都能收到消息,这样就有类似fanout交换机的功能了。

        @Bean
        Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
            return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key");
        }
    
    
        @Bean
        Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
            return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.a.key");
        }

    topic主题模式

        public static final String TOPIC_EXCHANGE = "topicExchange";
    
        public static final String QUEUE_TOPIC_KEY = "topic.#";
    
        public static final String QUEUE_TOPIC_KEY_B = "topic.b.key";
    
        public static final String QUEUE_TOPIC_A = "topic.A";
    
        public static final String QUEUE_TOPIC_B = "topic.B";
    
    
        //创建一个topic交换机
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
        }
    
        @Bean
        Queue queueTopicNameA() {
            return new Queue(QUEUE_TOPIC_A);
        }
    
        @Bean
        Queue queueTopicNameB() {
            return new Queue(QUEUE_TOPIC_B);
        }
    
        //队列topic.A绑定交换机并且关联了topic.#正则路由规则。就是说只要topic.开头的,topic.A队列都将收到消息
        @Bean
        Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(QUEUE_TOPIC_KEY);
        }
    
        //队列topic.B绑定交换机并且关联了topic.b.key正则路由规则。就是说必须是topic.b.key,topic.B队列才能收到消息,和directExchange类型一样了。
        @Bean
        Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(QUEUE_TOPIC_KEY_B);
        }

    生产者

        @RequestMapping("/topic/send")
        @ResponseBody
        public String sendTopicExchange() {
            String context = "Exchange==topic-->b====== " + new Date();
            log.info("Sender : " + context);
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.b.key", context);
            return "success";
        }

    这里发送消息时,往topicExchange这个交换机中发送,并且路由规则为topic.b.key。由于b队列绑定了交换机和路由规则就是它,所以队列b能收到消息。
    但是由于A队列的过滤规则为topic.#,就是说只要topic开头的就的路由规则,交换机就会往这个队列里面发送消息。所以a队列也能收到消息,topic.b.key是topic开头的。

    对于a队列来说,路由规则为topic.adsf,topic.b.key,topic.a等等,a队列都将收到消息,因为它的路由规则就是topic开头就可以。

    消费者:监听队列就OK了,其他不用关心。

    小小总结:订阅模式,路由模式,主题模式,他们三种都非常类似。而且主题模式可以随时变成两外两种模式。

    在主题模式下:路由规则不为正则表达式的时候,他就和路由模式一样。当路由规则不为表达式,且路由规则一样时,就变成了订阅模式。是不是很厉害的样子。

    在路由模式下:路由规则一样时,就变成了订阅模式。

    简单总结六种模式:

    1. 简单模式:生产者,一个消费者,一个队列
    2. 工作模式:生产者,多个消费者,一个队列
    3. 订阅与发布模式(fanout):生产者,一个交换机(fanoutExchange),没有路由规则,多个消费者,多个队列
    4. 路由模式(direct):生产者,一个交换机(directExchange),路由规则,多个消费者,多个队列
    5. 主题模式(topic):生产者,一个交换机(topicExchange),模糊匹配路由规则,多个消费者,多个队列
    6. RPC模式:生产者,多个消费者,路由规则,多个队列

    总结

      1. 一个队列,一条消息只会被一个消费者消费(有多个消费者的情况也是一样的)。
      2. 订阅模式,路由模式,主题模式,他们的相同点就是都使用了交换机,只不过在发送消息给队列时,添加了不同的路由规则。订阅模式没有路由规则,路由模式为完全匹配规则,主题模式有正则表达式,完全匹配规则。
      3. 在订阅模式中可以看到一条消息被多个消费者消费了,不违背第一条总结,因为一条消息被发送到了多个队列中去了。
      4. 在交换机模式下:队列和路由规则有很大关系
      5. 在有交换机的模式下:3,4,5模式下,生产者只用关心交换机与路由规则即可,无需关心队列
      6. 消费者不管在什么模式下:永远不用关心交换机和路由规则,消费者永远只关心队列,消费者直接和队列交互
  • 相关阅读:
    成功的速度一定要大于父母老去的速度
    luogg_java学习_09_泛型_集合
    luogg_java学习_08_设计模式_API
    luogg_java学习_07_抽象类_接口_多态学习总结
    报表请求默认输出格式(html或者excel)设置
    XML报表开发基本过程
    rtf模板常用技巧
    xml模板提交请求submit_request
    XML基础知识
    HTML语言常用语法
  • 原文地址:https://www.cnblogs.com/wangfg/p/14889661.html
Copyright © 2011-2022 走看看