zoukankan      html  css  js  c++  java
  • RocketMQ 自定义 Binding

    概述
    在实际生产中,我们需要发布和订阅的消息可能不止一种 Topic ,故此时就需要使用自定义 Binding 来帮我们实现多 Topic 的发布和订阅功能

    生产者

    自定义 Output 接口,代码如下:

    public interface MySource {
    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
    

    }
    发布消息的案例代码如下:

    @Autowired
    private MySource source;

    public void send(String msg) throws Exception {
    source.output1().send(MessageBuilder.withPayload(msg).build());
    }

    消费者

    自定义 Input 接口,代码如下:

    public interface MySink {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();
    
    @Input("input3")
    SubscribableChannel input3();
    
    @Input("input4")
    SubscribableChannel input4();
    

    }
    接收消息的案例代码如下:

    @StreamListener("input1")
    public void receiveInput1(String receiveMsg) {
    System.out.println("input1 receive: " + receiveMsg);
    }

    Application

    配置 Input 和 Output 的 Binding 信息并配合 @EnableBinding 注解使其生效,代码如下:

    @SpringBootApplication
    @EnableBinding({ MySource.class, MySink.class })
    public class RocketMQApplication {
    public static void main(String[] args) {
    SpringApplication.run(RocketMQApplication.class, args);
    }
    }

    application.yml

    生产者

    spring:
    application:
    name: rocketmq-provider
    cloud:
    stream:
    rocketmq:
    binder:
    namesrv-addr: 192.168.10.149:9876
    bindings:
    output1: {destination: test-topic1, content-type: application/json}
    output2: {destination: test-topic2, content-type: application/json}

    消费者

    spring:
    application:
    name: rocketmq-consumer
    cloud:
    stream:
    rocketmq:
    binder:
    namesrv-addr: 192.168.10.149:9876
    bindings:
    input: {consumer.orderly: true}
    bindings:
    input1: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
    input2: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
    input3: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
    input4: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1

    等你看到的时候,想变得有一点点不一样
  • 相关阅读:
    使用Mybatis-Generator自动生成Dao、Model、Mapping相关文件
    Mybatis学习 PageHelper分页插件
    mysql 5.1.7.17 zip安装 和 隔段时间服务不见了处理
    使用Maven搭建Struts2+Spring3+Hibernate4的整合开发环境
    一位资深程序员大牛给予Java初学者的学习建议
    数据结构和算法学习 -- 线性表
    多线程的实现方式区别
    Log4j.properties属性文件
    Java自定义注解
    Spring配置属性文件
  • 原文地址:https://www.cnblogs.com/snake107/p/11920902.html
Copyright © 2011-2022 走看看