zoukankan      html  css  js  c++  java
  • SpringBoot(八) Spring和消息队列RabbitMQ

    概述

    1.大多数应用中,可以通过消息服务中间件来提升系统异步能力和拓展解耦能力。

    2.消息服务中的两个重要概念:消息代理(Message broker)目的地(destination)

    当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地。

    3.消息队列主要有两种形式的目的地: 

      1. 队列:点对点方式通信(point-to-point)
      2. 主题:发布/订阅消息服务

    点对点式:消息发送者发送消息后,消息代理将其放入一个队列中,消息接受者从队列中读取数据,接受者接收数据后,将消息移除队列。

    发布订阅:消息发布者将消息发布到主题中,多个接受者可以订阅主题,当消息到达时,所有的订阅者都会接收到消息。

    4.JMS(Java Message Service) Java消息服务:基于JVM消息代理的规范。

    5.AMQP(Advanced Message Queuing Protocol):它是一个面向消息中间件的开放式标准应用层协议。兼容JMS,RabbitMQ是AMQP的一个实现。

     
    JMS
    AMQP
    定义 Java API 网络线级协议
    跨平台
    跨语言
    Model (1)、Peer-2-Peer
    (2)、Pub/Sub
    (1)、direct exchange
    (2)、fanout exchange
    (3)、topic change
    (4)、headers exchange
    (5)、system exchange
    后四种都是pub/sub ,差别路由机制做了更详细的划分
    支持消息类型 TextMessage
    MapMessage
    ByteMessage
    StreamMessage
    ObjectMessage
    Message
    byte[]通常需要序列化

    RabbitMQ

    Message:消息头和消息体组成,消息体是不透明的,而消息头上则是由一系列的可选属性组成,属性:路由键【routing-key】,优先级【priority】,指出消息可能需要持久性存储【delivery-mode】

    Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序

    Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

    • Exchange的4中类型:direct【默认】点对点,fanout,topicheaders【并不多用了】, 发布订阅,不同类型的Exchange转发消息的策略有所区别

    Queue:消息队列,用来保存消息直到发送给消费者,它是消息的容器,也是消息的终点,一个消息可投入一个或多个队列,消息一直在队列里面,等待消费者连接到这个队列将数据取走。

    Binding:绑定,队列和交换机之间的关联,多对多关系

    Connection:网络连接,例如TCP连接

    Channel:信道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真是的TCP链接之内的虚拟连接AMQP命令都是通过信道发送出去的。不管是发布消息,订阅队列还是接受消息,都是信道,减少TCP的开销,复用一条TCP连接。

    Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端的 应用程序

    VirtualHost:小型的rabbitMQ,相互隔离

    Broker:表示消息队列 服务实体

    Exchange的三种方式

    • direct:根据路由键直接匹配,一对一
    • fanout:不经过路由键,直接发送到每一个队列
    • topic:类似模糊匹配的根据路由键,来分配绑定的队列。#匹配一个或者多个单词,*匹配一个单词

    RabbitMQ安装与使用

    在RabbitMQ官网的下载页面https://www.rabbitmq.com/download.html中,我们可以获取到针对各种不同操作系统的安装包和说明文档。这里,我们将对几个常用的平台一一说明。

    下面我们采用的Erlang和RabbitMQ Server版本说明:

    • Erlang/OTP 19.1
    • RabbitMQ Server 3.6.5

    Windows安装

    1. 安装Erland,通过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
    2. 安装RabbitMQ,通过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。
    3. 下载完成后,直接运行安装程序。
    4. RabbitMQ Server安装完成之后,会自动的注册为服务,并以默认配置启动起来。

    Docker安装

    1、打开虚拟机,在docker中安装RabbitMQ

    #1.安装rabbitmq,使用镜像加速
    docker pull registry.docker-cn.com/library/rabbitmq:3-management
    [root@node1 ~]# docker images
    REPOSITORY                                     TAG                 IMAGE ID            CREATED             SIZE
    registry.docker-cn.com/library/rabbitmq        3-management        c51d1c73d028        11 days ago         149 MB
    #2.运行rabbitmq
    ##### 端口:5672 客户端和rabbitmq通信 15672:管理界面的web页面
    
    docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028
    
    #3.查看运行
    docker ps

    2、打开网页客户端并登陆   网址:http://localhost:15672/,账号【guest】,密码【guest】,登陆。

    3、添加 【direct】【faout】【topic】的绑定关系。

    • 1.设置exchange。
      • name:名称
      • type:direct(直连型。点对点),topic(模糊发布。例如:*.news/#.news),fanout(广播模式。速度最快。发布订阅)
      • Durability:是否是持久化。

    4、添加消息队列

    5、绑定队列。点击任意一个exchange,进行绑定。队列与刚才新建的相同,routing key(路由件)也可以去队列名相同。

    6、发布信息测试,每一种都进行尝试 。以点对点为例 (direct)

    7、点击队列,可以看到点对点发送,路由件完全匹配的情况下已经有一条了。可以尝试其他情况。点进去后可以查看具体信息。

    SpringBoot整合RabbitMQ

    (1)Java代码的方式使用RabbitMQ

    1.在pom.xml文件中添加依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    2.在application.properties中配置关于RabbitMQ的连接和用户信息,用户可以回到上面的安装内容,在管理页面中创建用户。

    spring:
      rabbitmq:
        host: 192.168.1.125
        port: 5672
        username: guest
        password: guest

    3.创建消息生产者。通过注入RabbitTemplate接口的实例来实现消息的发送,RabbitTemplate接口定义了一套针对AMQP协议的基础操作。在Spring Boot中会根据配置来注入其具体实现。

     @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Test
        public void contextLoads() {
            //Message需要自己构建一个;定义消息体内容和消息头
            // rabbitTemplate.send(exchange, routingKey, message);
            //Object 默认当成消息体,只需要传入要发送的对象,自动化序列发送给rabbitmq;
            Map<String,Object> map = new HashMap<>();
            map.put("msg", "这是第一个信息");
            map.put("data", Arrays.asList("helloWorld",123,true));
            //对象被默认序列以后发送出去 exchange 和 routingKey都是在浏览器端定义好的。
            rabbitTemplate.convertAndSend("exchange.direct","jl",map);
        }

    4.因为发送过来的数据是被编码的,所以需要对JSON进行序列化。

    @Configuration
    public class MyAMQPConfig  {
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }

    5.接收消息(取出队列中的消息).接收后就会从消息队列中移除。

    @Test
    public void reciverAndConvert(){
    
        Object o = rabbitTemplate.receiveAndConvert("test.news");
        System.out.println(o.getClass());
        System.out.println(o);
    
    }

       

     6.广播的形式发送。

        @Test
        public void sendMesg(){
            //广播,就不需要写rootingKey,因为所有的队列都会收到
            rabbitTemplate.convertAndSend("exchange.fanout","",new Book(1L,"<<Java编程思想>>"));
        }

    (2)注解方式使用RabbitMQ

    1.主程序开启RabbitMQ的注解

    @EnableRabbit //开启基于注解的rabbitmq
    @SpringBootApplication
    public class AmqpApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(AmqpApplication.class, args);
        }
    }

    2.使用注解的方式接收

    @Service
    public class BookService {
        @RabbitListener(queues = "jl")
        public void receive(Book book){
            System.out.println("接收到消息:"+book);
        }
    
        @RabbitListener(queues = "jl.news")
        public void receive02(Message message){
            System.out.println(message.getBody());
            System.out.println(message.getMessageProperties());
        }
    }

    (3)创建 Exchange(交换器)、Queue(消息队列)、Bind(绑定规则)--- AmqpAdmin。

    1.创建一个Exange。参数中可以有DirectExchage,TopicExchage,fanoutExchage等。

    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.direct"));
        System.out.println("Create Finish");
    }

    2.创建Queue

    @Test
    public void createQueue(){
    
        //参数1:名字  参数2:是否持久化  
        amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
        System.out.println("Create Queue Finish");
    }

     

    3.创建Bind规则

    @Test
    public void createBind(){
        //参数1:目的地    参数2:类型(队列(queue)或者交换器(exchange))   参数3:exchange的名称    参数4:路由件       参数5:参数
        amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE , "amqpadmin.direct", "amqp.haha", null));
    }

     

     4.删除Queue或者Exchange。只需要传入名称即可。

    @Test
    public void deleteExchange(){
        amqpAdmin.deleteExchange("amqpadmin.direct");
        System.out.println("delete Finish");
    }
  • 相关阅读:
    42.接雨水 Trapping Rain Water
    6.Zigzag Z 字形变换
    级数求和
    三连击(两个for循环轻松搞定)
    经济学人:Facebook的第三幕(1)
    markdown编辑器
    QUERY
    在badi中按照正常的message&nbsp;x…
    建workflow的时候提示Prefix&nbsp;…
    ALV控件的简单案例(一)
  • 原文地址:https://www.cnblogs.com/JiangLai/p/10019917.html
Copyright © 2011-2022 走看看