zoukankan      html  css  js  c++  java
  • spring cloud 搭建(kafka 入门(二))

    前面一篇,将了如何配置kafak:https://www.cnblogs.com/hanjun0612/p/13398119.html

    这一篇在上面的基础,扩展成2个微服务。

    其实很简单。

    就是在原来的基础上,把StreamReceiver 和 TestStream 拷贝到Service1服务中。

     

    代码如下:

    StreamReceiver

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    @Component
    @EnableBinding(value = {TestStream.class})
    public class StreamReceiver {
     
        @StreamListener(TestStream.INPUT)
        public void receive(String message) {
            System.out.println("StreamReceiver: "+message);
        }
    }
     

    TestStream

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    public interface TestStream {
        String INPUT = "test-in";
        String OUTPUT = "test-out";
        @Input(INPUT)
        SubscribableChannel testIn();
        @Output(OUTPUT)
        MessageChannel testOut();
     
    }

    POM文件:

    PS:这里spring版本是2.3.2,用的是Hoxton.SR6

    之前用的1.5.4,会报错:kafka Could not convert message

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
        </properties>
     
     
    <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
    </dependencies>

    application.yml

    spring:
      application:
        name: service1
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
          bindings:
            test-in: #TestStream 中 INPUT
              destination: testkafka

    效果

    Service1:

    Kafka:

  • 相关阅读:
    树的直径、重心、中心
    DP优化--四边形不等式
    P5569 【SDOI2008】 石子合并(黑科技)
    P3147 262144游戏
    P3205 【HNOI2010】合唱队
    Windows Server 2012 虚拟化实战:网络(一)
    Windows Server 2012 虚拟化实战:存储(二)
    Android使用最小宽度限定符时最小宽度的计算
    Eclipse调试Android App若选择“Use same device for future launches”就再也无法选择其他设备的问题
    Python的模块引用和查找路径
  • 原文地址:https://www.cnblogs.com/hanjun0612/p/13434653.html
Copyright © 2011-2022 走看看