zoukankan      html  css  js  c++  java
  • Spring Boot

    Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:

    • 消息路由到一个或多个目的地
    • 消息转化为其他的表现方式
    • 执行消息的聚集、消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户
    • 调用Web服务来检索数据
    • 响应事件或错误
    • 使用发布-订阅模式来提供内容或基于主题的消息路由

    AMQP 是 Advanced Message Queuing Protocol 的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP定义了这些特性:

    • 消息方向
    • 消息队列
    • 消息路由(包括:点到点和发布-订阅模式)
    • 可靠性
    • 安全性

    RabbitMQ 是以 AMQP 协议实现的一种中间件产品,也称为面向消息的中间件,它可以支持多种操作系统,多种编程语言,几乎可以覆盖所有主流的企业级技术平台。下面介绍 RabbitMQ 的基本概念:

    • Broker:可以理解为消息队列服务器的实体,是一个中间件应用,负责接收消息生产者的消息,然后将消息发送至消息接收者或者其他的 Broker
    • Exchange:消息交换机,是消息第一个到达的地方,消息通过他指定的路由规则分发到不同的消息队列中去,有如下几种类型:
      • Direct:完全按照 key 进行投递,比如,绑定时设置了 Routing Key 为 abc,那么客户端提交消息,只有设置了 Key 为 abc 的才会被投递到队列
      • Topic:对于 Key 进行模式匹配后进行投递,可以使用符号 # 匹配一个或多个词,符号 * 匹配正好一个词。比如, abc.# 可以匹配 abc.def.ghi ,而 abc.* 只能匹配 abc.def
      • Fanout:不需要任何 Key,采取广播的模式,一个消息进来时,投递到与该交换机绑定的所以队列
    • Queue:消息队列,消息通过发送和路由之后最终到达的地方,到达 Queue 的消息即进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。
    • Binding:绑定,将 Exchange 和 Queue 按照路由规则绑定起来
    • Routing Key:路由关键字,Exchange 根据该关键字进行消息投递
    • Virtual Host:虚拟主机,他是对 Broker 的虚拟划分,将消费者、生产者和依赖的 AMQP 相关结构进行隔离,一般情况都是为了安全考虑。
    • Connection:连接,代表生产者、消费者、Broker 之间进行通信的物理网络
    • Channel:消息通道,用于连接生产者和消费者的逻辑结构,在客户端的每个连接里,可以建立多个 Channel,每个 Channel 代表一个会话任务,通过 Channel 可以隔离同一个连接的不同交互内容。
    • Producer:消息生产者,制造消息并发送消息的程序
    • Consumer:消息消费者,接收消息并处理消息的程序

       

    快速入门

    我们通过在 Spring Boot 应用中整合 RabbitMQ ,实现一个简单的发送、接收消息的示例:

    • 创建项目

      创建一个 Spring Boot 项目,命名为 spring-boot-rabbitmq,并增加 spring-boot-starter-amqp 依赖,pom.xml 文件内容如下:

      <?xmlversion="1.0"encoding="UTF-8"?>

      <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">

      <modelVersion>4.0.0</modelVersion>

         

      <groupId>org.lixue.bus</groupId>

      <artifactId>spring-boot-rabbitmq</artifactId>

      <version>0.0.1-SNAPSHOT</version>

      <packaging>jar</packaging>

         

      <name>spring-boot-rabbitmq</name>

         

      <parent>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-parent</artifactId>

      <version>1.5.12.RELEASE</version>

      <relativePath/><!--lookupparentfromrepository-->

      </parent>

         

      <properties>

      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

      <java.version>1.8</java.version>

      </properties>

         

      <dependencies>

      <dependency>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-amqp</artifactId>

      </dependency>

         

      <dependency>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-starter-test</artifactId>

      <scope>test</scope>

      </dependency>

      </dependencies>

         

      <build>

      <plugins>

      <plugin>

      <groupId>org.springframework.boot</groupId>

      <artifactId>spring-boot-maven-plugin</artifactId>

      </plugin>

      </plugins>

      </build>

      </project>

         

    • 创建配置

      在 src/main/resources 目录中创建 application.yml 配置文件,并增加 RabbitMQ 相关配置,内容如下:

      #配置应用名称

      spring:

      application:

      name:rabbitmq-bus

      #配置RabbitMQ信息

      rabbitmq:

      #配置连接信息

      addresses:192.168.2.215:5672

      username:lixue

      password:liyong

      virtual-host:/

      #开启发送确认模式

      publisher-confirms:true

         

    • 消息生产者

      创建消息生产者 Sender 类,通过注入 AmqpTemplate 接口的实例(org.springframework.amqp.rabbit.core.RabbitTemplate)来实现消息的发送,AmqpTemplate 接口定义了一套针对 AMQP 协议的基础操作,Spring Boot 会根据配置来注入其具体实现。

      package org.lixue.bus;

         

      import org.springframework.amqp.core.AmqpTemplate;

      import org.springframework.beans.factory.annotation.Autowired;

      import org.springframework.stereotype.Component;

         

      import java.util.Date;

         

      @Component

      public class Sender{

      @Autowired

      private AmqpTemplate amqpTemplate;

         

      public void send(){

      String context="Hello"+new Date();

      //将消息发送到路由Key=send的队列

      amqpTemplate.convertAndSend("send",context);

      }

      }

         

    • 消息消费者

      创建消息消费者 Receiver 类,通过注入 AmqpTemplate 接口的实例(org.springframework.amqp.rabbit.core.RabbitTemplate)来实现消息的接收处理,也可以使用 @RabbitListener 和 @RabbitHandler 注解 来指定消息接收的队列和消息处理方法,使用注解处理消息如下:

      package org.lixue.bus;

         

      import org.springframework.amqp.rabbit.annotation.RabbitHandler;

      import org.springframework.amqp.rabbit.annotation.RabbitListener;

      import org.springframework.stereotype.Component;

         

      @Component

      @RabbitListener(queues="send")

      public class Receiver{

         

      @RabbitHandler

      public void process(String message){

      System.out.println("receive:"+message);

      }

      }

         

      如果不使用注解,可以通过 AmqpTemplate 接口的实例来获取队列的消息,代码如下:

      package org.lixue.bus;

         

      import org.springframework.amqp.core.AmqpTemplate;

      import org.springframework.beans.factory.annotation.Autowired;

      import org.springframework.stereotype.Component;

         

      @Component

      public class Receiver{

      @Autowired

      private AmqpTemplate amqpTemplate;

         

      public void receive(){

      String val=(String)amqpTemplate.receiveAndConvert("send");

      if(val!=null){

      System.out.println("receive:"+message);

      }

      }

      }

         

    • 创建配置类

      创建 RabbitMQ 的配置类 RabbitMQConfig,用来配置队列、交换器、路由等高级信息,代码如下:

      package org.lixue.bus;

         

      import org.springframework.amqp.core.*;

      import org.springframework.context.annotation.Bean;

      import org.springframework.context.annotation.Configuration;

         

      @Configuration

      public class RabbitMQConfig{

         

      /**

      *配置队列相关

      */

      @Bean

      public Queue newQueue(){

      return new Queue("send");

      }

         

      @Bean

      public Queue newQueue2(){

      return new Queue("send2");

      }

         

      /**

      *配置交换器相关

      */

      @Bean

      public Exchange newDirectExchange(){

      return new DirectExchange("directExchange",true,true);

      }

         

      @Bean

      public Exchange newTopicExchange(){

      return new TopicExchange("topicExchange",true,true);

      }

         

      @Bean

      public Exchange newFanoutExchange(){

      return new FanoutExchange("fanoutExchange",true,true);

      }

         

      /**

      *配置队列和交换器绑定

      */

      @Bean

      public Binding newDirectBinding(){

      return BindingBuilder.bind(newQueue()).to(newDirectExchange()).with("send").noargs();

      }

         

      @Bean

      public Binding newDirectBinding1(){

      return BindingBuilder.bind(newQueue2()).to(newDirectExchange()).with("send2").noargs();

      }

      }

         

    • 测试验证

      创建消息提供者的单元测试,在单元测试中执行消息发送方法,并执行单元测试,如果使用注解的方式接收消息,则不需要做额外处理,Spring Boot 启动后会接收消息,如下:

      package org.lixue.bus;

         

      import org.junit.Test;

      import org.junit.runner.RunWith;

      import org.springframework.beans.factory.annotation.Autowired;

      import org.springframework.boot.test.context.SpringBootTest;

      import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

      import org.springframework.test.context.junit4.SpringRunner;

         

      import static org.junit.Assert.*;

         

      @RunWith(SpringRunner.class)

      @SpringBootTest

      publi cclass SenderTest{

         

      @Autowired

      private Sender sender;

         

      @Test

      public void send() throws Exception{

      for(inti=0;i<1000000;i++){

      sender.send(i);

      }

      }

      }

      日志输出如下:

      2018-05-04 16:23:39.085 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.2.215:5672]

      2018-05-04 16:23:39.294 INFO 20176 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#232532cf:0/SimpleConnection@7c7f9a81 [delegate=amqp://lixue@192.168.2.215:5672/, localPort= 63144]

      2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (directExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

      2018-05-04 16:23:39.300 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (topicExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

      2018-05-04 16:23:39.301 INFO 20176 --- [cTaskExecutor-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (fanoutExchange) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.

      2018-05-04 16:23:39.476 INFO 20176 --- [ main] org.lixue.bus.SenderTest : Started SenderTest in 4.536 seconds (JVM running for 6.026)

      receive:Hello Fri May 04 16:23:39 CST 2018

      receive:Hello Fri May 04 16:23:39 CST 2018

      receive:Hello Fri May 04 16:23:39 CST 2018

      receive:Hello Fri May 04 16:23:39 CST 2018

         

  • 相关阅读:
    Cryptography I 学习笔记 --- 使用分组密码
    Cryptography I 学习笔记 --- 分组密码
    jQuery动画之自定义动画
    jQuery事件之一次性事件
    jQuery事件之自定义事件
    jQuery事件之解绑事件
    jQuery事件之绑定事件
    jQuery动画之停止动画
    JQuery动画之淡入淡出动画
    jQuery属性操作之值操作
  • 原文地址:https://www.cnblogs.com/li3807/p/9002688.html
Copyright © 2011-2022 走看看