zoukankan      html  css  js  c++  java
  • 使用 spring stream 发送消息

    为什么使用spring stream ?

     spring stream 是用来做消息队列发送消息使用的。他隔离了各种消息队列的区别,使用统一的编程模型来发送消息。

    目前支持:

    rabbitmq

    kafka

    rocketmq

    启动rocketmq 

    rocketmq 支持windows

    start mqnamesrv.cmd
    
    start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

    修改pom.xml

    <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
            </dependency>

    增加发送接收JAVA代码

    public interface InputOutput {
    
        String MAIL_OUTPUT = "mailOutput";
        String MAIL_INPUT = "mailInput";
    
        String OUTPUT = "output";
        String INPUT = "input";
    
    
        @Output(OUTPUT)
        MessageChannel output();
        @Input(INPUT)
        SubscribableChannel input();
    
    
        @Output(MAIL_OUTPUT)
        MessageChannel mailOutput();
        @Input(MAIL_INPUT)
        SubscribableChannel mailInput();
    
    
    }

    在应用上增加注解

    @EnableBinding({InputOutput.class})

    增加yml配置

    spring:
        cloud:
            stream:
              rocketmq:
                binder:
                  name-server: 127.0.0.1:9876
              bindings:
                output:
                  destination: bpmmessage
                  group: bpmmessage-group
        
                input:
                  destination: bpmmessage
                  group: bpmmessage-group-consumer
        
                mailOutput:
                  destination: mail
                  group: mail-group
        
                mailInput:
                    destination: mail
                    group: mail-group-consumer

    编写代码收发消息:

    MessageModel messageModel=new MessageModel();
    
            messageModel.setMsgType("mail");
            messageModel.setContent("helloworld");
    
            inputOutput.mailOutput().send( MessageBuilder.withPayload(
                    "mail"
            ).build());
    
            inputOutput.output().send(
                    MessageBuilder.withPayload(
                            messageModel
                    ).build()
            );

    这里发送的是两类消息。

    接收消息:

    @Service
    public class MessageListener {
    
        @StreamListener(InputOutput.INPUT)
        public void receive(MessageModel message) {
            System.err.println(message);
            System.err.println("ok");
        }
    
    
        @StreamListener(InputOutput.MAIL_INPUT)
        public void receive(String message) {
            System.err.println(message);
            System.err.println("ok");
        }
    }

    分别接收两类消息

    
    
    
  • 相关阅读:
    freemarker 获取当前日期
    获取Map的key和value的两种方法
    maven install中依赖关系打包failed
    cxf动态调用外部web service 报告异常java.lang.NoSuchFieldErr
    Java的Annotation标签
    webservice cxf
    serversocket和socket
    Problem 1 珠江夜游 (cruise .cpp)———2019.10.6
    Problem 2 旅行计划 (travelling .cpp)———2019.10.6
    Problem 3 基站建设 (station.cpp)———2019.10.6
  • 原文地址:https://www.cnblogs.com/yg_zhang/p/12837879.html
Copyright © 2011-2022 走看看