14.SpringCloud Bus 消息总线
概述
Spring Cloud Bus 是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。
分布式自动刷新配置功能
Spring Cloud Bus配合Spring Cloud Config使用可以实现配置的动态刷新
Bus支持两种消息代理:RabbitMQ和Kafka
Bus 能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、时间推送等,也可以当做微服务间的通信通道。
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。
ConfigClient实例都监听MQ中同一个topic,默认是springCloudBus。当一个服务刷新数据时,它会把这个信息放入到Topic中,这样其他监听同一个Topic的服务就能得到通知,然后去更新自身的配置。
RabbitMQ环境配置
-
安装Erlang,下载地址
-
安装RabbitMQ,下载地址
-
进入 sbin 目录,输入以下命令启动管理功能,启用可视化插件
. abbitmq-plugins.bat enable rabbitmq_management
-
在 开始 菜单里查找
RabbitMQ Service - start
,启动服务 -
访问 Url
http://localhost:15672/
,guest
/guest
-
SpringCloud Bus动态刷新全局广播
-
必须先具备良好的RabbitMQ环境先
-
演示广播效果,增加复杂度,再以3355为模板再制作一个3366
-
设计思想
利用消息总线触发一个服务端ConfigServer的
/bus/refresh
端点,而刷新所有客户端的配置(更加推荐) -
给 cloud-config-center-3344 配置中心服务端添加消息总线支持
-
POM
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
-
YML
#rabbitmq相关配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest ##rabbitmq相关配置,暴露bus刷新配置的端点 management: endpoints: #暴露bus刷新配置的端点 web: exposure: include: 'bus-refresh'
-
-
给 cloud-config-center-3355、cloud-config-center-3366 客户端添加消息总线支持
-
POM
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
-
YML
#rabbitmq相关配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
-
-
测试
-
RabbitMQ 就绪
-
启动注册中心,cloud-eureka-server7001
-
启动配置中心,cloud-config-center-3344
-
启动配置客户端,cloud-config-client-3355,cloud-config-client-3366
-
分别通过配置中心和客户端访问配置
http://localhost:3344/master/configclient-dev.yml http://localhost:3355/configInfo http://localhost:3366/configInfo
此时配置都是最新的,配置中心与客户端获取到的配置相同
-
修改 GitHub 上的配置,再次访问
此时通过配置中心访问到的是最新的,客户端的配置没有改变
-
动态刷新全局广播
POST 请求访问
http://localhost:3344/actuator/bus-refresh
-
再次通过客户端访问配置,发现两个客户端的配置都更新了
-
SpringCloud Bus动态刷新定点通知
实现只通知 3355,不通知 3366:
POST 请求
http://localhost:3344/actuator/bus-refresh/config-client:3355
格式为:
http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}
{destination}:spring.application.name
上面3355和3366是集群
15.SpringCloud Stream消息驱动
消息驱动概述
是什么
SpringCloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 对象交互。
通过我们配置来 binding(绑定),而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。
所以,我们只需搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持 RabbitMQ、Kafka。
屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型。
设计思想
标准 MQ
- 生产者/消费者之间靠消息媒介传递信息内容——
Message
- 消息必须走特定的通道——消息通道
MessageChannel
- 消息通道里的消息如何被消费呢,谁负责收发处理——消息通道
MessageChannel
的子接口SubscribableChannel
,由MessageHandler
消息处理器订阅
为什么用Cloud Stream
与 MQ 的实现解耦,统一底层差异:
在没有绑定器这个概念的情況下,我们的 Spring Boot 应用要直接与消息中间件进行信息交互的时侯,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。
通过定义绑定器作为軻可层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器 Binder 作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder
- INPUT 对应于消费者
- OUTPUT对应于生产者
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ 切换为 Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播:
- 在 RabbitMQ 就是 Exchange
- 在 Kafka 中就是 Topic
Spring Cloud Stream 标准流程套路
-
Binder
很方便的连接中间件,屏蔽差异
-
Channel
通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
-
Source 和 Sink
简单的可理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入
编码API和常用注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持 RabbitMQ 和 Kaka |
Binder | Binder 是应用与消息中间件之间的封装,目前实现了Kafka 和 RabbitMQ 的 Binder,通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型(对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@Enablebinding | 指信道 channel 和 exchange 绑定在一起 |
案例说明
- RabbitMQ环境已经OK
- 工程中新建三个子模块
- cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- cloud-stream-rabbitmq-consumer8803,作为消息接收模块
消息驱动之生产者
-
Module,cloud-stream-rabbitmq-provider8801
-
POM
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
-
YML
spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
主启动类
-
业务类
@EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: " + serial); return null; } }
-
测试
-
启动7001eureka
-
启动rabbitmq
http://localhost:15672/
-
启动 8801
-
查看 RabbitMQ 中 Exchanges 的 studyExchange 出现波动
-
消息驱动之消费者
-
Module,cloud-stream-rabbitmq-consumer8802
-
POM,同生产者
-
YML
spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置
-
主启动类
-
业务类
@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者1号,----->接受到的消息: " + message.getPayload() + " port: " + serverPort); } }
-
测试
- 启动注册中心 Eureka7001、Stream 生产者 cloud-stream-rabbitmq-provider8801、Stream 消费者 cloud-stream-rabbitmq-consumer8802
- 访问 http://localhost:8801/sendMessage,向 RabbitMQ 发送消息
- 查看控制台,Stream 消费者接收到消息
分组消费与持久化
依照8802,clone出来一份运行8803,cloud-stream-rabbitmq-consumer8803
启动8803
运行后两个问题
-
有重复消费问题
生产者发送消息后,两个消费者都接收到消息
-
消息持久化问题
分组消费
多数情况,生产者发送消息给某个具体微服务时只希望被消费一次,为了解决这个问题,Spring Cloud Stream 中提供了 消费组 的概念。
重复消费问题可以使用 Stream 中的 消息分组 来解决。
不同组可以全面消费(重复消费),同一组内存在竞争关系,只有其中一个可以消费。
导致重复消费的原因:默认分组 group 是不同的,组流水号不一样,被认为是不同组。
可以通过 RabbitMQ 查看消费者的组。
解决问题的原理:
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
解决方法:
-
为消费者8802、8803指定 分组和持久化属性 group
spring.cloud.stream.bindings.input.group: atguigu
-
再次测试,发现两个消费者不会重复消费
持久化
- 停止8802、8803并去除掉8802的分组 group:atguigu
- 8803的分组 group:atguigu 没有去掉
- 8801先发送4条信息到 Rabbitmq
- 先启动8802,无分组属性配置,后台没有打出来消息
- 先启动8803,有分组属性配置,后台打出来了MQ上的消息
原因:
之前指定 group 时,将组注册到了 RabbitMQ,即使停止消费者,RabbitMQ 上依然有记录这个组,发送消息时,会为这个注册了的组保留未消费的消息,指定 group 的消费者启动后,可以接着消费保留的消息。
如果将 group 从 atguigu 换成 atguiguA,此时组变了,重新启动后也不会接收到消息。
16.SpringCloud Sleuth分布式请求链路追踪
概述
需要解决的问题
在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果,每个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。
是什么
Spring Cloud Sleuth 提供了一套完整的服务跟踪的解决方案
在分布式系统中提供追踪解决方案并且兼容支持了 Zipkin
搭建链路监控步骤
-
zipkin
-
下载
SpringCloud从F版起已不需要自己构建Zipkin server了,只需要调用jar包即可
https://dl.bintray.com/openzipkin/maven/io/zipkin/java/zipkin-server/
zipkin-server-2.12.9.exec.jar
-
运行jar
-
运行控制台
-
术语
完整的调用链路:
表示一条请求链路,一条链路通过 Trace ld 唯一标识,Span 标识发起的请求信息,各 Span 通过 parent id 关联起来
名词解释:
Trace: 类似于树结构的 Span 集合,表示一条调用链路,存在唯一标识
Span: 表示调用链路来源,通俗的理解 Span 就是一次请求信息
-
-
服务提供者,对 cloud-provider-payment8001 进行改造
-
POM
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>
-
YML
spring: zipkin: base-url: http://localhost:9411 sleuth: sampler: #采样率值介于 0 到 1 之间,1 则表示全部采集 probability: 1
-
业务类 PaymentController
@GetMapping("/payment/zipkin") public String paymentZipkin() { return "hi ,i'am paymentzipkin server fall back,welcome to atguigu,O(∩_∩)O哈哈~"; }
-
-
服务消费者(调用方),改造 cloud-consumer-order80
-
POM,同上
-
YML,同上
-
业务类 OrderController
@GetMapping("/consumer/payment/zipkin") public String paymentZipkin() { String result = restTemplate.getForObject(PAYMENT_URL + "/payment/zipkin/", String.class); return result; }
-
-
启动 eureka7001、8001、80
-
查看 Zipkin,http://localhost:9411/zipkin/