zoukankan      html  css  js  c++  java
  • Spring cloud stream【消息分区】

      在上篇文章中我们给大家介绍了Stream的消息分组,可以实现消息的重复消费的问题,但在某些场景下分组还不能满足我们的需求,比如,同时有多条同一个用户的数据,发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑消息分区了。
      当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理。

    Stream 消息分区

    创建项目

      将我们上篇文章中的分组的三个项目,拷贝一份修改名称及服务名称

    在这里插入图片描述

    没有分区的情况下演示

    发送多条消息查看效果

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes=StreamSenderStart.class)
    public class StreamTest {
    	
    	@Autowired
    	private ISendeService sendService;
    
    	@Test
    	public void testStream(){
    		Product p = new Product(999, "stream test ...999");
    		// 将需要发送的消息封装为Message对象
    		Message message = MessageBuilder
    								.withPayload(p)
    								.build();
    		for (int i = 0; i < 10; i++) {
    			// 发送多条消息到队列中
    			sendService.send().send(message );
    		}
    		
    	}
    }
    
    

    10条消息被随机的分散到了两个消费者中:

    在这里插入图片描述

    在这里插入图片描述
    我们可以看到A中6条消息,B中4条消息,而且这是随机的,下次执行的结果肯定不一样。

    分区

    1.发送者中配置

    spring.application.name=stream-partition-sender
    server.port=9060
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #rebbitmq 链接信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    
    # 对应 MQ 是 exchange  outputProduct自定义的信息
    spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
    
    #通过该参数指定了分区键的表达式规则
    spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
    #指定了消息分区的数量。 
    spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
    

    2.消费者中配置

    服务A

    spring.application.name=stream-partition-receiverA
    server.port=9070
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #rebbitmq 链接信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    
    # 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
    spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
    # 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
    spring.cloud.stream.bindings.inputProduct.group=groupProduct999
    
    #开启消费者分区功能
    spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
    #指定了当前消费者的总实例数量
    spring.cloud.stream.instanceCount=2 
    #设置当前实例的索引号,从 0 开始
    spring.cloud.stream.instanceIndex=0
    

    服务B

    spring.application.name=stream-partition-receiverB
    server.port=9071
    #设置服务注册中心地址,指向另一个注册中心
    eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
    
    #rebbitmq 链接信息
    spring.rabbitmq.host=192.168.88.150
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=dpb
    spring.rabbitmq.password=123
    spring.rabbitmq.virtualHost=/
    
    # 对应 MQ 是 exchange  和消息发送者的 交换器是同一个
    spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
    # 具体分组 对应 MQ 是 队列名称 并且持久化队列  inputProduct 自定义
    spring.cloud.stream.bindings.inputProduct.group=groupProduct999
    
    #开启消费者分区功能
    spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
    #指定了当前消费者的总实例数量
    spring.cloud.stream.instanceCount=2 
    #设置当前实例的索引号,从 1 开始
    spring.cloud.stream.instanceIndex=1
    

    启动服务测试

    在这里插入图片描述

    10个消息都被消费者A给消费了,说明到达了我们需要的效果。
    案例源码:https://github.com/q279583842q/springcloud-e-book

  • 相关阅读:
    【转】C#连接mysql
    【转】深度优先算法
    【转】mysql安装
    win7NVIDIA显卡驱动升级时卡住
    【转】win7系统删除桌面IE图标
    双系统删掉一个后,所在分区无法格式化
    SQL各种JOIN
    C# 反射
    【转】C#强制转换和显式转换
    SQL Server 去除表中字段空格
  • 原文地址:https://www.cnblogs.com/dengpengbo/p/11104841.html
Copyright © 2011-2022 走看看