zoukankan      html  css  js  c++  java
  • spring cloud 搭建(kafka 入门(二))

    前面一篇,将了如何配置kafak:https://www.cnblogs.com/hanjun0612/p/13398119.html

    这一篇在上面的基础,扩展成2个微服务。

    其实很简单。

    就是在原来的基础上,把StreamReceiver 和 TestStream 拷贝到Service1服务中。

     

    代码如下:

    StreamReceiver

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    @Component
    @EnableBinding(value = {TestStream.class})
    public class StreamReceiver {
     
        @StreamListener(TestStream.INPUT)
        public void receive(String message) {
            System.out.println("StreamReceiver: "+message);
        }
    }
     

    TestStream

    package com.test.service1.controller.kafka;
     
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
     
    /**
     * @author Tyler
     * @date 2020/7/28
     */
     
    public interface TestStream {
        String INPUT = "test-in";
        String OUTPUT = "test-out";
        @Input(INPUT)
        SubscribableChannel testIn();
        @Output(OUTPUT)
        MessageChannel testOut();
     
    }

    POM文件:

    PS:这里spring版本是2.3.2,用的是Hoxton.SR6

    之前用的1.5.4,会报错:kafka Could not convert message

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        
        <properties>
            <java.version>1.8</java.version>
            <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
        </properties>
     
     
    <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
                <version>2.0.1.RELEASE</version>
            </dependency>
    </dependencies>

    application.yml

    spring:
      application:
        name: service1
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
          bindings:
            test-in: #TestStream 中 INPUT
              destination: testkafka

    效果

    Service1:

    Kafka:

  • 相关阅读:
    数组指针的一个易错点
    jQuery on()方法
    php 前一天或后一天的日期
    用jQuery监听浏览器窗口的变化
    jquery获取json对象中的key小技巧
    JQuery操作元素的属性与样式及位置
    用JQuery操作元素的style属性
    如何删除jsPlumb连接
    jsPlumb.jsAPI阅读笔记(官方文档翻译)
    Jquery empty() remove() detach() 方法的区别
  • 原文地址:https://www.cnblogs.com/hanjun0612/p/13434653.html
Copyright © 2011-2022 走看看