zoukankan      html  css  js  c++  java
  • SpringBoot 整合篇 笔记--Spring Boot与消息

    Spring Boot与消息

    消息概述

    JMS/AMQP:java消息服务


      

    JMS 和 AMQP 区别?

      

      

    RabbitMQ简介 

    RabbitMQ简介:

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。

    核心概念

    Message

    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

    Publisher

    消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    Exchange

    有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

    Queue

    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    Binding

    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定可以是多对多的关系。

    Connection

    网络连接,比如一个TCP连接。

    Channel

    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

    Consumer

    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

    Virtual Host

    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

    Broker

    表示消息队列服务器实体

    RabbitMQ运行机制 

    消息路由

    Exchange 类型

     

     

     

    RabbitMQ整合

    1. 引入 spring-boot-starter-amqp
    2. application.yml配置
    3. 测试RabbitMQ
      •    AmqpAdmin:管理组件
      •   RabbitTemplate:消息发送处理组件

     

    springboot 整合消息

    自动配置
     1、RabbitAutoConfiguration
     2、有自动配置了连接工厂ConnectionFactory;
     3、RabbitProperties 封装了 RabbitMQ的配置
     4、 RabbitTemplate :给RabbitMQ发送和接受消息;
     5、 AmqpAdmin : RabbitMQ系统管理功能组件;
         AmqpAdmin:创建和删除 Queue,Exchange,Binding
     6、@EnableRabbit +  @RabbitListener 监听消息队列的内容

    具体实现

      1.利用idea的spring初始化器创建应用选中RabbitMq模块

     

     2.配置properties

    spring.rabbitmq.host=10.87.18.41
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    #spring.rabbitmq.virtual-host=

     3.测试类

    package com.example.testrabbitmq;

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;

    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;

    @SpringBootTest
    class TestrabbitmqApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
    /**
    * 点对点
    * message 需要自己定义
    */
    // rabbitTemplate.send(exchange,routeKey,message);

    /**
    * 点对点 默认消息体,要传入消息对象,自动序列化发送给rabbit
    */
    Map<String,Object> map=new HashMap<>();
    map.put("msg","这是第一个消息");
    map.put("data", Arrays.asList("helloworld",123,true));
    rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);

    }
    @Test
    public void receive(){
    Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
    System.out.println("数据类型="+o.getClass());
    System.out.println("数据="+o);
    }

    /**
    * 广播发送
    */
    @Test
    public void sendAll(){
    rabbitTemplate.convertAndSend("exchange.fanout","",new Book("广播发送","广播发送"));
    }
    }

    4.结果

     调用java序列化方法,调用 rabbitTemplate.convertAndSend 进行发送

    调用 rabbitTemplate.receiveAndConvert("atguigu.news") 队列中就被消费,不在存在

    问题: 如何正确序列化?

    配置类:

    package com.example.testrabbitmq.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    //自定义messageconveter(json格式)
    @Configuration
    public class MyAMQPConfig {
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    View Code

    结果:

     消息监听器: 

    1.启动类添加注解: 开启基于注解的RabbtisMQ模式

     2.编写监听器:监听 atguigu.news 消息队列

     创建消息队列和交换器

    @Autowired
        private AmqpAdmin amqpAdmin;//操作
    
        /**
         * 添加Exchange
         */
        @Test
        public void createExchange(){
            amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
            System.out.println("创建完成");
        }
    
        /**
         * 添加队列
         */
        @Test
        public void createQueue(){
            amqpAdmin.declareQueue(new Queue("amqpadmin.queue"));
            System.out.println("创建队列成功");
        }
    
        /**
         * 添加绑定
         */
        @Test
        public void createBinding(){
            amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","ampq.haha",null));
        }
  • 相关阅读:
    mybatis学习$与#号取值区别
    java学习
    mybatis学习
    spring mvc 数据校验(bean实体注解实现)
    maven学习
    java.lang.ClassNotFoundException: org.springframework.web.util.WebAppRootListener
    20180804 excel规划求解。。。
    java.lang.IllegalArgumentException: Invalid character found in the request target. The valid characters are defined in RFC 7230 and RFC 3986
    mybatis config 配置设置说明
    进程状态以及状态转换
  • 原文地址:https://www.cnblogs.com/denghy-301/p/13043846.html
Copyright © 2011-2022 走看看