zoukankan      html  css  js  c++  java
  • spring cloud 搭建(kafka 入门(二))

    前面一篇,将了如何配置kafak:https://www.cnblogs.com/hanjun0612/p/13398119.html

    这一篇在上面的基础,扩展成2个微服务。

    其实很简单。

    就是在原来的基础上,把StreamReceiver 和 TestStream 拷贝到Service1服务中。

     

    代码如下:

    StreamReceiver

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    @Component
    @EnableBinding(value = {TestStream.class})
    public class StreamReceiver {
     
        @StreamListener(TestStream.INPUT)
        public void receive(String message) {
            System.out.println("StreamReceiver: "+message);
        }
    }
     

    TestStream

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    public interface TestStream {
        String INPUT = "test-in";
        String OUTPUT = "test-out";
        @Input(INPUT)
        SubscribableChannel testIn();
        @Output(OUTPUT)
        MessageChannel testOut();
     
    }

    POM文件:

    PS:这里spring版本是2.3.2,用的是Hoxton.SR6

    之前用的1.5.4,会报错:kafka Could not convert message

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
        </properties>
     
     
    <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
    </dependencies>

    application.yml

    spring:
      application:
        name: service1
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
          bindings:
            test-in: #TestStream 中 INPUT
              destination: testkafka

    效果

    Service1:

    Kafka:

  • 相关阅读:
    POJ 3630 Phone List/POJ 1056 【字典树】
    HDU 1074 Doing Homework【状态压缩DP】
    POJ 1077 Eight【八数码问题】
    状态压缩 POJ 1185 炮兵阵地【状态压缩DP】
    POJ 1806 Manhattan 2025
    POJ 3667 Hotel【经典的线段树】
    状态压缩 POJ 3254 Corn Fields【dp 状态压缩】
    ZOJ 3468 Dice War【PD求概率】
    POJ 2479 Maximum sum【求两个不重叠的连续子串的最大和】
    POJ 3735 Training little cats【矩阵的快速求幂】
  • 原文地址:https://www.cnblogs.com/hanjun0612/p/13434653.html
Copyright © 2011-2022 走看看