zoukankan      html  css  js  c++  java
  • springboot(四) rabbitMQ demo

    RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式事务的支持,RPC的调用等等。

    以前一直使用的是ActiveMQ,在实际的生产使用中也出现了一些小问题,在网络查阅了很多的资料后,决定尝试使用RabbitMQ来替换ActiveMQ,RabbitMQ的高可用性、高性能、灵活性等一些特点吸引了我们,查阅了一些资料整理出此文。

    RabbitMQ介绍

    RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    相关概念

    通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

    通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

     左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。

    • 中间即是 RabbitMQ,其中包括了 交换机 和 队列。

    • 右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。

    那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

    • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

    • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
      这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

    • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

    交换机(Exchange)

    交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout

    • Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.

    • Topic:按规则转发消息(最灵活)

    • Headers:设置header attribute参数类型的交换机

    • Fanout:转发消息到所有绑定队列

    Direct Exchange
    Direct  Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

    第一个 X - Q1 就有一个 binding key,名字为 orange; X - Q2 就有 2 个 binding key,名字为 black 和 green。当消息中的 路由键 和 这个 binding key 对应上的时候,那么就知道了该消息去到哪一个队列中。

    Ps:为什么 X 到 Q2 要有 black,green,2个 binding key呢,一个不就行了吗? - 这个主要是因为可能又有 Q3,而Q3只接受 black 的信息,而Q2不仅接受black 的信息,还接受 green 的信息。

    Topic Exchange  

    Topic Exchange 转发消息主要是根据通配符。 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。

    在这种交换机模式下:

    • 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等。

    • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是可以的。

    具体代码发送的时候还是一样,第一个参数表示交换机,第二个参数表示routing key,第三个参数即消息。如下:

    rabbitTemplate.convertAndSend("testTopicExchange","key1.a.c.key2", " this is  RabbitMQ!");

    topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:

    • *表示一个词.

    • #表示零个或多个词.

    Headers Exchange

    headers 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routing_key , headers 则是一个自定义匹配规则的类型.
    在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

    Fanout Exchange

    Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。

    springboot集成RabbitMQ

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持。

    简单使用

    引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2、配置文件

    配置rabbitmq的安装地址、端口以及账户信息

    spring.rabbitmq.host=192.168.31.151
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
     
    #------------------------------------RabbitMQ可选配置(注:这里只用到了特别少的几个)
    # broker端没有收到消费者的ACK(即:消费者异常时)时,是否再次向消费者投递消息(默认为false)
    # 为false时,如果没有收到消费者的ACK,那么会无限投递;设置为true时,默认投递时次数为3此
    spring.rabbitmq.listener.simple.retry.enabled=true
    # 设置向消费者投递消息的最大次数
    spring.rabbitmq.listener.simple.retry.max-attempts=2
    # 投递消息的间隔(单位ms)
    spring.rabbitmq.listener.simple.retry.initial-interval=2000ms

    编写config

    package com.lf.config;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue Queue() {
            return new Queue("hello");
        }
    
    }

    编写sender

    package com.lf.rabbitmq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Component
    public class HelloSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("hello", context);
        }
    
    }

    编写receiver

    package com.lf.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver********  : " + hello);
        }
    
    }

    编写 test

    package com.lf;
    
    import com.lf.rabbitmq.HelloSender;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
    
    }

    运行--看到输出信息,或者到mq管理页面http://ip:15672/#/queues观察

    多对多使用

    一个发送者,N个接收者或者N个发送者和N个接收者会出现什么情况呢?

    一对多发送
    对上面的代码进行了小改造,接收端注册了两个Receiver,Receiver1和Receiver2,发送端加入参数计数,接收端打印接收到的参数,下面是测试代码,发送一百条消息,来观察两个接收端的执行效果

    改造代码:

    package com.lf.rabbitmq;
    
            import org.springframework.amqp.rabbit.annotation.RabbitHandler;
            import org.springframework.amqp.rabbit.annotation.RabbitListener;
            import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver*111111  : " + hello);
        }
    
        @RabbitHandler
        public void process(Integer i) {
            System.out.println("Receiver*111111  : " + i);
        }
    }
    package com.lf.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver2 {
    
        @RabbitHandler
        public void process(String hello) {
            System.out.println("Receiver*222222  : " + hello);
        }
    
        @RabbitHandler
        public void process(Integer i) {
            System.out.println("Receiver*222222  : " + i);
        }
    }

    test

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
        @Autowired
        private HelloSender helloSender;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
        @Test
        public void oneToMany() throws Exception {
            for (int i=0;i<100;i++){
                helloSender.send(i);
            }
        }

    运行:

    Receiver*222222 : 87
    Receiver*111111 : 54
    Receiver*222222 : 89
    Receiver*111111 : 56
    Receiver*111111 : 58
    Receiver*222222 : 91
    Receiver*111111 : 60
    Receiver*222222 : 93
    Receiver*111111 : 62
    Receiver*222222 : 95
    Receiver*111111 : 64
    Receiver*222222 : 97
    Receiver*111111 : 66
    Receiver*222222 : 99
    Receiver*111111 : 68

    返回结果得到以下结论:

    一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中,均匀消费

    多对多发送  

    复制了一份发送者,加入标记,在一百个循环中相互交替发送

    @Component
    public class HelloSender2 {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "hello " + new Date();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("hello", context);
        }
    
        public void send(int i) {
            System.out.println("Sender2222 : " + i);
            this.rabbitTemplate.convertAndSend("hello", i);
        }
    }

    test

    package com.lf;
    
    import com.lf.rabbitmq.HelloSender;
    import com.lf.rabbitmq.HelloSender2;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMqHelloTest {
        @Autowired
        private HelloSender helloSender;
    
        @Autowired
        private HelloSender2 helloSender2;
    
        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
        @Test
        public void oneToMany() throws Exception {
            for (int i=0;i<100;i++){
                helloSender.send(i);
                helloSender2.send(i);
            }
        }
    
    }

    运行:

    结论:和一对多一样,接收端仍然会均匀接收到消息

    高级使用

    对象的支持

    springboot以及完美的支持对象的发送和接收,不需要格外的配置。

    config:

    @Configuration
    public class RabbitConfig {
        @Bean
        public Queue Queue() {
            return new Queue("hello");
        }
    
        @Bean
        public Queue UserQueue() {
            return new Queue("user");
        }
    
    }

    User:需要实现序列化接口

    package com.lf.entity;
    
    import lombok.Data;
    import lombok.ToString;
    
    import java.io.Serializable;
    
    @Data
    @ToString
    public class User implements Serializable {
    
        private String username;
        private String password;
    }

    userSender:

    package com.lf.rabbitmq;
    
    import com.lf.entity.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Component
    public class UserSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        //发送者
        public void send(User user) {
            System.out.println("UserSender: " + user.toString());
            this.rabbitTemplate.convertAndSend("user", user);
        }
    }

    userReceiver:

    package com.lf.rabbitmq;
    
    import com.lf.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "user")
    public class UserReceiver {
    
        @RabbitHandler
        public void process(User user) {
            System.out.println("UserReceiver : " + user);
        }
    
    }

    test:

     @Autowired
        private UserSender userSender;
    
    @Test
        public void user() throws Exception {
            User  user = new User();
            user.setUsername("张三");
            user.setPassword("123456");
            userSender.send(user);
        }

    test运行结果:

    UserSender: User(username=张三, password=123456)
    2020-05-05 16:38:44.621  INFO 7456 --- [       Thread-3] o.s.w.c.s.GenericWebApplicationContext   : Closing org.springframework.web.context.support.GenericWebApplicationContext@1e8f619: startup date [Tue May 05 16:38:41 CST 2020]; root of context hierarchy
    2020-05-05 16:38:44.623  INFO 7456 --- [       Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
    2020-05-05 16:38:44.628  INFO 7456 --- [       Thread-3] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    UserReceiver : User(username=张三, password=123456)
     

    Topic Exchange

    topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

    首先对topic规则配置,这里使用两个队列来测试

    @Configuration
    public class TopicRabbitConfig {
        final static String message = "topic.message";
        final static String messages = "topic.messages";
    
        @Bean
        public Queue queueMessage() {
            return new Queue(TopicRabbitConfig.message);
        }
        @Bean
        public Queue queueMessages() {
            return new Queue(TopicRabbitConfig.messages);
        }
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }
        @Bean
        Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); top.#的都会发送到 queueMessages的receiver中
        }
    }

    receiver

    @Component
    @RabbitListener(queues = "topic.message")
    public class MessageReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("MessageReceiver : " + message);
        }
    
    }
    @Component
    @RabbitListener(queues = "topic.messages")
    public class MessagesReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("MessagesReceiver : " + message);
        }
    
    }

    运行test

     @Test
        public void sendMessage() {
            String context = "hi, i am message 1";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
        }
        @Test
        public void sendMessages() {
            String context = "hi, i am messages 2";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
        }

    发送sendMessage会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送sendMessages只有topic.#可以匹配所有只有MessagesReceiver监听到消息

    Fanout Exchange

    Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

    Fanout 相关配置

    package com.lf.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutRabbitConfig {
    
        @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
    
    }
     

    这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

    config:

    @Configuration
    public class FanoutRabbitConfig {
    
        @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
    
    }

    receiverA、B、C:

    package com.lf.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.A")
    public class AMessageReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("AMessageReceiver : " + message);
        }
    
    }
    package com.lf.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.B")
    public class BMessageReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("BMessageReceiver : " + message);
        }
    
    }
    package com.lf.rabbitmq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "fanout.C")
    public class CMessageReceiver {
    
        @RabbitHandler
        public void process(String message) {
            System.out.println("CMessageReceiver : " + message);
        }
    
    }

    test:

    @Test
        public void sendFanout() {
            String context = "hi, fanout msg ";
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("fanoutExchange","无论输什么队列", context);
        }

    结果如下:

    Sender : hi, fanout msg 
    2020-05-05 17:06:26.883  INFO 9924 --- [       Thread-3] o.s.w.c.s.GenericWebApplicationContext   : Closing org.springframework.web.context.support.GenericWebApplicationContext@17a7f33: startup date [Tue May 05 17:06:24 CST 2020]; root of context hierarchy
    2020-05-05 17:06:26.885  INFO 9924 --- [       Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
    2020-05-05 17:06:26.887  INFO 9924 --- [       Thread-3] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    CMessageReceiver : hi, fanout msg 
    AMessageReceiver : hi, fanout msg 
    BMessageReceiver : hi, fanout msg 
     

    结果说明,绑定到fanout交换机上面的队列才能收到消息

    
    
  • 相关阅读:
    第52周二Restful
    第52周一
    第51周日
    第51周六
    第51周五
    第51周四
    第51周三
    第51周二
    第51周一
    第50周日
  • 原文地址:https://www.cnblogs.com/flgb/p/12831426.html
Copyright © 2011-2022 走看看