SpringCloud Stream
SpringCloud Config
SpringCloud Gatewa
SpringCloud Hystrix
技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换。
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs(消费者)或者outputs(生产者)来与Spring Cloud Stream中binder对象交互。通过我们配置来绑定,而Spring Cloud Stream的binder对象负责与消息中间件交互。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。
官方版本目前仅仅支持RabbitMQ和Kafka。
MQ相关术语
Message:生产者/消费者之间靠消息媒介传递信息内容
MessageChannel:消息必须走特定的通道
消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。
相关注解
- Middleware:中间件,目前只支持RabbitMQ和Kafka
- Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。
- Input:表示输入通道,消息进入该通道传到应用程序。
- Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
- StreamListener:监听队列,用于消费者的队列的消息接收。
- EnableBinding:将信道channel和exchange绑定在一起。
首先创建一个provider,服务提供者rabbitmq-provider8801
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
编写配置文件application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.31.52 #rabbitmq服务启动所在机器的IP地址
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
编写一个发送数据的接口IMessageProvider
public interface IMessageProvider {
String sendMessage();
}
接口的实现类IMessageProviderImpl
@EnableBinding(Source.class) //定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String sendMessage()
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null;
}
}
controller层下的SendMessageController
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider iMessageProvider;
@GetMapping(value = "/sendMessage")
public String send(){
return iMessageProvider.sendMessage();
}
}
启动Eureka7001,启动服务提供者8801.启动虚拟机上的RabbitMQ
记得把虚拟机防火墙关了。
[hadoop@centos7 bin]$ systemctl stop firewalld
[hadoop@centos7 bin]$ systemctl status firewalld
然后测试一下服务提供者是否正常运行。
发送请求:http://localhost:8801/sendMessage
控制台输出UUID。
然后再创建一个服务消费者,在MQ的另一端进行消费消息。
创建另一个模块,cloud-stream-rabbitmq-consumer8802
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
和上一个服务提供者的依赖一样。
写配置文件application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.31.52
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
创建一个消费者的ReceiveMessageController
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("message = "+message.getPayload()+" "+"serverPort= "+serverPort);
}
}
如果消费者成功接收消息,则在控制台输出产生的UUID和端口号。
启动Eureka7001,启动服务提供者8801,启动服务消费者8802,还有MQ。
在Eureka中可以看到两个服务已经启动。
每次请求http://localhost:8801/sendMessage;消费者都能输出结果,输出的UUID与提供者的一致。
登录RabbitMQ的web管理,可以看到我们新建的exchange,并且可以查看消息队列中的请求次数的情况。
发送的消息除了可以是字符串类型还可以发送对象,在消费者接受数据的时候,会将实体转换成JSON字符串。
配置文件中,如果你使用的消息中间件是kafka,type: kafka;environment是设置消息中间件的配置信息,端口,主机地址,用户名,密码等,可以设置多个binder,适应不同的场景。
重复消费问题
默认情况下,每个消费者的分组名都是随机的,不同的,对于不同的组会引起重复消费的问题,例如:消息提供者只向消息队列中发送了一个消息,正常情况下,消费者A从队列中拿走之后,消费者B不能再获得相同的消息,但是由于AB是不同的组,所以A和B都会获取相同的消息,这就导致了资源被重复消费。
微服务应用放置到同一个group中,就能够保证消息只会被其中应用消费一次,不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
同一个应用的不同微服务,只用在配置文件中指定相同的group。
再次发送消息时,只有消费者其中一个能消费。避免了重复消费。
消息持久化
当两个消费者A和B,A设置了group属性值,B没有设置,这时,消费者全部宕机,但是消息生产者一直响MQ中生产消息,这时候重启A和B两者有什么区别呢?
正因为B没有这时分组,B再次启动后不会再向MQ中取数据,而A启动成功后可以正常消费消息队列中的消息。
因此设置了group的消费者,可以保证消息队列中的消息持久化,group对于消费者来讲很重要,既能避免重复消费,又能在消费者重启后依然可以消费消息队列中未消费的消息。