zoukankan      html  css  js  c++  java
  • Kafka Connect 出现ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while wait

    在官网第七步

    Step 7: 使用 Kafka Connect 来 导入/导出 数据

    从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

    Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。

    首先,我们首先创建一些“种子”数据用来测试,:

    echo -e "foo
    bar" > test.txt
    

    windows上:

    > echo foo> test.txt
    > echo bar>> test.txt
    

    接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。

    > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    

    kafka附带了这些示例的配置文件,并且使用了刚才我们搭建的本地集群配置并创建了2个连接器:第一个是源连接器,从输入文件中读取并发布到Kafka主题中,第二个是接收连接器,从kafka主题读取消息输出到外部文件。

    在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该读取从

    test.txt
    

    和写入到topic

    connect-test
    

    ,导出连接器从主题

    connect-test
    

    读取消息写入到文件

    test.sink.txt
    

    . 我们可以通过验证输出文件的内容来验证数据数据已经全部导出:

    more test.sink.txt
     foo
     bar
    

    注意,导入的数据也已经在Kafka主题

    connect-test
    

    里,所以我们可以使用该命令查看这个主题:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
     {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    ...
    

    连接器继续处理数据,因此我们可以添加数据到文件并通过管道移动:

    echo "Another line" >> test.txt
    

    你应该会看到出现在消费者控台输出一行信息并导出到文件。

    出现ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:304)

    解决办法:

    1、确保zookeeper和kafka服务启动

    2、修改connect-standalone.properties 文件,将bootstrap.servers=localhost:9092改为ip:9092

  • 相关阅读:
    LeetCode 842. Split Array into Fibonacci Sequence
    LeetCode 1087. Brace Expansion
    LeetCode 1219. Path with Maximum Gold
    LeetCode 1079. Letter Tile Possibilities
    LeetCode 1049. Last Stone Weight II
    LeetCode 1046. Last Stone Weight
    LeetCode 1139. Largest 1-Bordered Square
    LeetCode 764. Largest Plus Sign
    LeetCode 1105. Filling Bookcase Shelves
    LeetCode 1027. Longest Arithmetic Sequence
  • 原文地址:https://www.cnblogs.com/hecxx/p/11959841.html
Copyright © 2011-2022 走看看