zoukankan      html  css  js  c++  java
  • 第十章:(4)Spring Boot 与 消息 之 整合 RabbitMQ

    一、依赖关系

      创建的 SpringBoot 模块中,引入了 amqp 的启动器

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

      amqp中包括了下面两个依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-messaging</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>

    二、自动配置

      1、自动配置类:RabbitAutoConfiguration

      2、自动配置了连接工厂 CachingConnectionFactory(通过RabbitProperties创建连接工厂)

        

      3、RabbitProperties 封装了 Rabbit 的配置

        

      4、RabbitTemplate 给 RabbitMQ 发送和接受消息

        

      5、AmqpAdmin :RabbitMQ 系统管理功能组件

     

    三、测试

      1、配置 RabbitMQ 的信息

    spring.rabbitmq.host=/192.168.1.6
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    # 默认就是5672
    spring.rabbitmq.port=5672  
    #spring.rabbitmq.virtual-host
    

      

      2、测试单播

      (1)发送数据

        /**
         * 1. 单播(点对点)
         */
        @Test
        public void testDirect() {
            //message 需要自己构造一个,定义消息体内容和消息头
            //rabbitTemplate.send(exchage, routeKey, message);
    
            //Object 只需要传入要发送的对象,自动序列化发送给 rabbitmq
            //rabbitTemplate.convertAndSend(exchage, routeKey, Object);
            Map<String, Object> map = new HashMap<>();
            map.put("msg", "这是第一个消息");
            map.put("data", Arrays.asList("helloworld", 123, true));
    
            //对象被默认序列化以后发送出去
            rabbitTemplate.convertAndSend("exchange.direct", "njf.news", map);
        }

      (2)接收数据

        //接收数据
        @Test
        public void receiveMsg() {
            Object o = rabbitTemplate.receiveAndConvert("njf.news");
            System.out.println(o.getClass());
            System.out.println("o = " + o);
        }

        发送数据,将数据自动转化为 JSON 发送出去,可以自定义序列化器

      3、自定义序列化器

        在 RabbitTemplate 类中有一个 MessageConverter 消息转换器:

        

         如果没有配置消息转换器就使用默认的转换器。

         在SimpleMessageConverter类中的 fromMessage 方法中:

         这是默认的消息转换器。

        RabbitMQ 中的 MessageConverter 是消息转换接口,提供了多个实现类:

        

         只需要在容器中配置想使用的转换器即可。

    @Configuration
    public class MyAMQPConfig {
    
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

        再次进行测试,发现就可以把数据转成 JSON格式的数据了。

      4、测试广播

        /**
         * 2、广播
         */
        @Test
        public void fanout() {
            rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("三国演义", "罗贯中"));
        }

        从管理平台也可以看到消息。

    四、监听消息

      1、@EnableRabbit 开启基于注解的 RabbitMQ 模式

    @EnableRabbit  //开启基于注解的 RabbitMQ 模式
    @SpringBootApplication
    public class SpringBoot10AmqpApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBoot10AmqpApplication.class, args);
        }
    
    }

      2、@RabbitListener 监听消息队列的内容

    @Service
    public class BookService {
    
        @RabbitListener(queues = {"njf.news"})
        public void receive(Book book){
            System.out.println("book = " + book);
            System.out.println("收到消息 = " + book);
        }
    
        @RabbitListener(queues = {"njf"})
        public void receiveMsg(Message message){
            System.out.println("收到消息 = " + message);
            System.out.println(message.getBody());
            System.out.println(message.getMessageProperties());
        }
    }

        当监听到指定队列里面有内容时,就会来执行方法。

    五、AmqpAdmin

      AmqpAdmin :RabbitMQ 系统管理功能组件。

      AmqpAdmin:创建和删除 Queue,Exchange,Binding规则等。

      示例:

        @Autowired
        AmqpAdmin amqpAdmin;
    
        @Test
        public void createExchange() {
            //declare 开头的用于创建组件
            //创建交换器
            Exchange exchange = new DirectExchange("amqpAdmin.exchange");
            amqpAdmin.declareExchange(exchange);
            System.out.println(exchange + "创建完成");
    
            //创建队列
            amqpAdmin.declareQueue(new Queue("adqpAdmin.queue", true));
    
    
            //创建绑定规则
            amqpAdmin.declareBinding(new Binding("adqpAdmin.queue", Binding.DestinationType.QUEUE,
                    "amqpAdmin.exchange", "amqp.hhh", null));
    
        }

      效果:

  • 相关阅读:
    Numpy学习笔记练习代码 ——(二)
    Requests爬取表格数据并存入CSV中
    Numpy学习练习代码 ——(一)
    Requests爬取中文网站乱码问题
    Pycharm用Ctrl+鼠标滚轮控制字体大小
    一、Windows10下python3和python2同时安装
    inux下配置rsyncd服务
    shell 脚本中$$,$#,$?分别代表什么意思?
    linux shell awk 流程控制语句(if,for,while,do)详细介绍
    定时任务
  • 原文地址:https://www.cnblogs.com/niujifei/p/15743077.html
Copyright © 2011-2022 走看看