zoukankan      html  css  js  c++  java
  • 通过实例理解 RabbitMQ 的基本概念

    先说下自己开发的实例。

    最近在使用 Spring Cloud Config 做分布式配置中心(基于 SVN/Git),当所有服务启动后,SVN/Git 中的配置文件更改后,客户端服务读取的还是旧的配置,并不能实时读取(配置信息会缓存在客户端),Spring Boot 提供了一种方式进行更新(通过spring-boot-starter-actuator监控模块),然后 Post 访问客户端服务的/refresh接口(也可以命令执行curl -X POST http://worker2:8115/refresh),这样客户端会重新从配置中心获取新的配置信息,请求命令可以写在 Git 的 Webhooks 脚本中(修改提交 Push 后执行)。

    如果客户端服务比较少的话,这样的解决方式没问题,如果客户端服务多的话,执行的请求脚本就会非常多,而且单个服务的解决方式,也不利于后期的维护(点对点的方式),那该怎么解决上面的问题呢?答案就是通过 Spring Cloud Bus

    Spring Cloud Bus 翻译为消息总线,使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都能连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如配置信息的变更或者其他一些管理操作等。

    架构示意图(引用来源):

    下面,我们需要利用 Spring Cloud Bus 来改造 Spring Cloud Config 的服务端和客户端,其实非常简单。

    添加下面的依赖:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    

    然后在bootstrap.yml中添加下面配置:

    spring:
      rabbitmq:
        host: manager1
        port: 5672
        username: admin
        password: admin123
    management:
      security:
        enabled: false
    

    上面的配置信息都是新增的,并且都需要配置在服务端和客户端,通过上面的示例图可以看到,配置信息更新后请求的是服务端,那么客户端我们就不需要配置management.security.enabled(也不需要配置spring-boot-starter-actuator监控模块)。

    服务端和客户端的任何 Java 代码都不需要编写,重新启动服务,当配置信息更新后,通过 Git 的 Webhooks 执行请求脚本:curl -X POST http://manager1:port/bus/refresh服务端接受到请求之后,会通过 Spring Cloud Bus 通知所有的客户端(通过 RabbitMQ),重新从配置中心获取配置信息,达到实时更新配置的目的。

    上面的实例描述就到这里。

    RabbitMQ 的基本概念

    RabbitMQ,是一个使用 erlang 编写的 AMQP(高级消息队列协议)的服务实现,简单来说,就是一个功能强大的消息队列服务。

    RabbitMQ 最基本模型

    RabbitMQ 的基本概念

    • Producer:消息生产者。
    • Consumer:消息消费者。
    • Connection(连接):Producer 和 Consumer 通过TCP 连接到 RabbitMQ Server。
    • Channel(信道):基于 Connection 创建,数据流动都是在 Channel 中进行。
    • Exchange(交换器):生产者将消息发送到 Exchange(交换器),由 Exchange 将消息路由到一个或多个 Queue 中(或者丢弃);Exchange 并不存储消息;Exchange Types 常用有 Fanout、Direct、Topic 三种类型,每种类型对应不同的路由规则。
    • Queue(队列):是 RabbitMQ 的内部对象,用于存储消息;消息消费者就是通过订阅队列来获取消息的,RabbitMQ 中的消息都只能存储在 Queue 中,生产者生产消息并最终投递到 Queue 中,消费者可以从 Queue 中获取消息并消费;多个消费者可以订阅同一个 Queue,这时 Queue 中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
    • Binding(绑定):是 Exchange(交换器)将消息路由给 Queue 所需遵循的规则。
    • Routing Key(路由键):消息发送给 Exchange(交换器)时,消息将拥有一个路由键(默认为空), Exchange(交换器)根据这个路由键将消息发送到匹配的队列中。
    • Binding Key(绑定键):指定当前 Exchange(交换器)下,什么样的 Routing Key(路由键)会被下派到当前绑定的 Queue 中。

    另外,再说下 Exchange Types(交换器类型)的三种常用类型

    • Direct:完全匹配,消息路由到那些 Routing Key 与 Binding Key 完全匹配的 Queue 中。比如 Routing Key 为cleint-key,只会转发cleint-key,不会转发cleint-key.1,也不会转发cleint-key.1.2
    • Topic:模式匹配,Exchange 会把消息发送到一个或者多个满足通配符规则的 routing-key 的 Queue。其中*表号匹配一个 word,#匹配多个 word 和路径,路径之间通过.隔开。如满足a.*.c的 routing-key 有a.hello.c;满足#.hello的 routing-key 有a.b.c.helo
    • Fanout:忽略匹配,把所有发送到该 Exchange 的消息路由到所有与它绑定 的Queue 中。

    下面通过一段代码,理解一下消息发布的流程代码引用):

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='first', type='topic')
    channel.queue_declare(queue='A')
    channel.queue_declare(queue='B')
    
    channel.queue_bind(exchange='first', queue='A', routing_key='a.*.*')
    channel.queue_bind(exchange='first', queue='B', routing_key='a.#')
    
    channel.basic_publish(exchange='first',
                          routing_key='a',
                          body='Hello World!')
    
    channel.basic_publish(exchange='first',
                          routing_key='a.b.c',
                          body='Hello World!')
    

    大致步骤

    • 先获取一个 Connection(连接)。
    • 从 Connection(连接)上获取一个 Channel(信道)。
    • 声明一个 Exchange(交换器),只会创建一次。
    • 声明两个 Queue,只会创建一次。
    • 把 Queue 绑定到 Exchange(交换器)上.
    • 向指定的 Exchange(交换器)发送一条消息.

    因为基于 Exchage Topic 模式,在上面发出的两条消息当中,消息a只会被a.#匹配到,而a.b.c会被两个都匹配到。所以,最终的结果会是队列 A 中有一条消息,队列 B 中有两条消息。

    从队列取出消息代码:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='A')
    
    def callback(ch, method, properties, body):
        print body
    
    channel.basic_consume(callback, queue='A', no_ack=True)
    channel.start_consuming()
    

    服务消费者取出消息,需要重新创建 Connection(连接)和 Exchange(交换器),但 Queue 并不会创建,只需要从 Channel 中获取对应的 Queue 消息即可。

    通过实例理解 RabbitMQ 基本概念

    上面实例服务部署的情况是:三台管理服务器(config-server-git/config-server-svn)和一台工作服务器(config-client-git/config-server-svn),因为做了集群,服务的具体情况

    • config-server-git:3 个服务。
    • config-server-svn:3 个服务。
    • config-client-git:1 个服务。
    • config-client-svn:1 个服务。

    所以,总的部署服务有 8 个

    我们通过 RabbitMQ Server 管理界面中的内容,说下 Connection(连接)、Channel(信道)、Exchange(交换器)和 Queue(队列)的具体使用情况(根据数量理解)。

    1. Connection(连接)

    为什么 Connection(连接)数量为 16 个?因为部署的 8 个服务,各自发布和接受消息(即作为小心发布者,也作为消息接受者),计算公式:16 = 8 * 2

    2. Channel(信道)

    为什么 Channel(信道)数量为 16 个?因为 Connection(连接)数量为 16 个,Channel(信道)是在 Connection(连接)基础上创建的。

    3. Exchange(交换器)

    为什么 Exchange(交换器)数量为 1 个?因为都是使用的同一个 Exchange(交换器),名字为springCloudBus,Exchange Type 为topic,Routing Key 为#

    4. Queue(队列)

    为什么 Queue(队列)数量为 8 个?因为部署的 8 个服务,各自发布和接受的 Queue 是同一个,一个服务对应一个 Queue。

    参考资料:

  • 相关阅读:
    sql server 获取存储过程,表值,标量函数的参数
    拼接枚举字符串
    存储过程的输出接受强类
    映射对象
    C# abstract,virtual 修饰符
    SqlSugar之SqlQueryDynamic返回值处理
    sql server 中数据库数据导入到另一个库中
    sql server 自增长显式添加值
    sql得到表中的列信息
    程序中MMap集合数据重复导致程序慢的情况
  • 原文地址:https://www.cnblogs.com/xishuai/p/spring-cloud-bus-rabbitmq.html
Copyright © 2011-2022 走看看