zoukankan      html  css  js  c++  java
  • SpringCloudStream整合RabbitMQ和Kafka

    SpringCloudStream 就是使用了基于消息系统的微服务处理架构。对于消息系统而言一共分为两类:基于应用标准的 JMS、基于协议标准的 AMQP,在整个 SpringCloud 之中支持有 RabbitMQ、Kafka 组件的消息系统。利用 SpringCloudStream 可以实现更加方便的消息系统的整合处理。

    1、生产者和消费者引入依赖

    <dependency>
       <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    
    

    2、创建生产者

    (1)定义配置文件

    spring:
      rabbitmq:
        host: 192.168.0.33
        username: user
        password: 123456
        virtual-host: /test
      cloud:
        stream:
          bindings:
            goods_output:
              #指定要连接binders中kafka或rabbitmq
              binder: rabbit1
              consumer:
                headerMode: raw
              producer:
                headerMode: raw
              destination: goods-topic
              content-type: text/plain
          binders:
            kafka1:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: http://192.168.0.33:9092
                          auto-add-partitions: true
                          auto-create-topics: true
                          min-partition-count: 1
            rabbit1:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: 192.168.0.33
                    port: 5672
                    username: user
                    password: 123456
                    virtual-host: /test
    
    

    (2)创建通道

    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    
    /**
     * 发送消息通道
     *
     * @author yanglei
     */
    public interface GoodsSource {
    
        String GOODS_OUTPUT = "goods_output";
    
        @Output(GoodsSource.GOODS_OUTPUT)
    	MessageChannel output();
    
    }
    
    

    (3)发送消息

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.messaging.support.MessageBuilder;
    
    /**
     * 消息发送
     *
     * @author yanglei
     */
    @EnableBinding(GoodsSource.class)
    public class GoodsProducer {
    
        @Autowired
        private GoodsSource source;
    
        public void sendMessage(String msg) {
            try {
                source.output().send(MessageBuilder.withPayload(msg).build());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    

    (4)创建测试接口

    @RestController
    @RequestMapping("test")
    @Api(tags="sku表,表示具体的商品实体")
    public class TestController {
    
    	@Autowired
    	private GoodsProducer goodsProducer;
    
        @GetMapping("send")
        @ApiOperation("信息")
        public void get(){
    		goodsProducer.sendMessage("我发送了一条消息");
        }
    
    }
    
    

    3、创建消费者

    (1)定义配置文件

    spring:
      rabbitmq:
        host: 192.168.0.33
        username: user
        password: 123456
        virtual-host: /test
      cloud:
        stream:
          bindings:
            goods_input:
              binder: rabbit1
              consumer:
                headerMode: raw
              producer:
                headerMode: raw
              #绑定的kafka topic名称
              destination: goods-topic
              content-type: text/plain
          binders:
            kafka1:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        binder:
                          brokers: http://192.168.0.33:9092
                          auto-add-partitions: true
                          auto-create-topics: true
                          min-partition-count: 1
            rabbit1:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: 192.168.0.33
                    port: 5672
                    username: user
                    password: 123456
                    virtual-host: /test
    
    

    (2)创建通道

    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.messaging.SubscribableChannel;
    
    /**
     * 接收消息通道
     *
     * @author yanglei
     */
    public interface GoodsSink {
    
        String GOODS_INPUT = "goods_input";
    
        @Input(GoodsSink.GOODS_INPUT)
    	SubscribableChannel input();
    
    }
    
    

    (3)接受消息

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    /**
     * 消息监听消费
     *
     * @author yanglei
     */
    @EnableBinding(GoodsSink.class)
    @Slf4j
    public class GoodsConsumer {
    
        @StreamListener(GoodsSink.GOODS_INPUT)
        public void onReceive(String shopJson) {
            log.info(shopJson);
        }
    }
    
    
  • 相关阅读:
    sublime插件时间
    git与github
    字符编码笔记:ASCII,Unicode和UTF-8
    阮一峰:互联网协议入门
    从理论到实践,全方位认识DNS
    ci事务
    linux下启动oracle
    Java连接Oracle
    我的博客终于开通了,加油!
    FILTER 执行次数
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13300572.html
Copyright © 2011-2022 走看看