zoukankan      html  css  js  c++  java
  • 初识kafka-connect

    一、kakfak-connect简介

    kafka-connet是一个工具,用来在kafka和外部数据存储系统之间移动数据,kafka-connect可以简单快捷地将数据从kafka导入或导出,数据范围涵盖了关系型数据库、日志、度量数据、Hadoop、数据仓库、NoSql数据存储、ES等。

    kafka-connect架构如下(图片来源:百度):

    kafka-connect有两个核心概念:Source和Sink。Source:负责导入数据到kafka,Sink负责从kafka导出数据,它们统称Connector,即连接器。

    另外还有两个重要概念:Task和Worker,每一个Connector都会协调一系列的task去执行任务,Connector把一项工作任务分割成许多的task,然后把task分发到各个worker进程中去执行。task不保存自己的状态信息,而是交给特定的kafka主题去保存。

    kafka-connect提供了以下特性:

    即:

    • 通用性:规范化其他数据系统与kafka的继集成,简化了连接器的开发、部署和管理
    • 支持独立模式(standalone)分布式模式(distributed)
    • REST接口:使用Rest API提交和管理Connector
    • 自动位移管理:自动管理位移的提交,不需要开发人员干预,降低了开发成本
    • 分布式和可扩展性:Kafka Connect 基于现有的组管理协议来实现扩展Kafka Connect 集群
    • 流式计算和批处理的集成

    kafka中通过connect-standalone.sh和connect-distributed.sh命令来实现独立模式和分布式模式运行的Kafka Connect,可以在kafka的/bin目录下看到:

    二、独立模式

    在独立模式中,所有操作都是在一个进程中完成的,它比较适合测试和功能验证的场景,但是无法充分利用kafka自身所提供的负载均衡和高容错特性。

    下面来演示一下使用独立模式将一个文件中的内容导入到kafka中。

    2.1 Source连接器用法

    1、修改配置文件

    • connect-standalone.properties:用于Work进程运行的配置文件
    • connect-file-source.properties:Source连接器的配置文件

    connect-standalone.properties内容如下(一般情况下使用默认配置即可):

    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets

    connect-file-source.properties

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/kafka/test-connect-source.txt
    topic=topic-connect

    要修改:

    file:该连接器数据源文件路径

    topic:设置连接器将数据导入哪个主题,如果该主题不存在则会自动创建,当然也可以自己提前创建好(推荐)

    2、创建topic

    我选择手动创建topic-connect,创建命令如下:

    ./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect --replication-factor 1 --partitions 1

    创建完成后查看以下topic的创建结果:

    ./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect

    创建结果如下:

     3、启动source连接器

    ./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-source.properties

    4、向test-connect-source.txt文件写入数据

    echo "hello world">>/tmp/kafka/test-connect-source.txt

    5、查看结果

    查看的方式有两种,一种是通过kafka-console-consumer.sh脚本,一种是kafka-dump-log.sh脚本,前者可以实时查看效果,后者每次写入后都要执行命令才能看到,下面演示通过这两种方式查看的效果:

    5.1 kafka-console-consumer.sh
    ./kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-connect

    结果如下:

     5.2 kafka-dump-log.sh
    ./kafka-dump-log --files /usr/local/var/lib/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log

    结果如下:

     

     以上是Source连接器的用法,下面再来探索一下Sink连接器的用法.

    2.2 Sink连接器用法

    1、修改配置文件

    • connect-standalone.properties:用于Work进程运行的配置文件
    • connect-file-sink.properties:Sink连接器的配置文件

    connect-standalone.properties内容如下(需要修改key和value的converter方式):

    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets

    connect-file-sink.properties内容如下:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/kafka/test-connect-sink.txt
    topic=topic-connect-sink

    2、创建topic

    我选择手动创建topic-connect,创建命令如下:

    ./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect-sink --replication-factor 1 --partitions 1

    创建完成后查看以下topic的创建结果:

    ./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect-sink

     

     3、启动sink连接器

    ./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-sink.properties

    4、发送消息

    发送消息到topic-connect-sink

    ./bin/kafka-console-producer --broker-list localhost:9092 --topic topic-connect-sink

    5、查看sink文件

    cat test-connect-sink.txt

     可以看到发送到test-connect-sink 这个topic的消息成功存储到sink文件test-connect-sink.txt中了。

    三、问题

    在实践sink的过程中,我本来是想通过Source将一条消息从source文件导入kafka,同时通过Sink将该条消息从kafka中导出到sink文件,配置如下:

    只要将connect-file-source.properties和connect-file-sink.properties这两个配置文件中的topic改成相同的即可,但是执行之后却发现:

    • 当向test-connect-source.txt文件写入消息时,并不会在test-connect-sink.txt文件中写入,也不会在通过kafka-console-consumer.sh消费到;
    • 当通过命令向topic-connect发送消息时,不会写入test-connect-sink.txt文件,但是能通过kafka-console-consumer.sh消费到;

    后来经过一下午的排查,居然发现一直改的是connect-console-source.propertiesconnect-console-sink.properties文件,而不是connect-file-source.propertiesconnect-file-properties,

    真的也是被自己蠢哭的一下午,然后重新修改了配置文件,通过以下命令重启connecter之后,在conncect-source.txt写入,在connect-sink.txt中就能看到了。

    四、REST API

    可以通过Kafka Connect 提供的基于REST风格的API接口来管理连接器,默认端口号是8083,可以通过Worker进程配置文件中的rest.port参数来修改端口号。

    如:

    curl localhost:8083/connectors

    方法请求类型 REST API 接口说明
    GET / 查看kafka集群版本信息
    GET /connectors 查看当前活跃的连接器列表,显示连接器的名字,即配置connector配置文件中的name属性
    POST /connectors 根据指定配置,创建一个新的连接器
    GET /connectors/{name} 查看指定连接器的信息
    GET /connectors/{name}/config 查看指定连接器的配置信息
    PUT /connectors/{name}/config 修改指定连接器的配置信息
    GET /connectors/{name}/state 查看指定连接器的状态
    POST /connectors/{name}/restart 重启指定的连接器
    PUT /connectors/{name}/pause 暂停指定的连接器
    GET /connectors/{name}/tasks 查询指定连接器正在运行的task
    POST /connectors/name}/tasks 修改指定连接器的Task配置
    GET /connectors/{name}/tasks/{taskId}/status 查询指定连接器中指定Task的状态
    POST /connectors/{name}/tasks/{taskId}/restat 重启指定连接器中指定的Task
    DELETE /connectors/{name} 删除指定的连接器

    大周末的,写了一天博客,这效率太低了,出门透透气~~~

    参考文献:

    朱忠华 《深入理解Kafka核心设计与实践原理》

    最美好的时光里,不要一直是一个lowser!
  • 相关阅读:
    PAT B1027 打印沙漏 (20 分)
    PAT B1025 反转链表 (25 分)
    PAT B1022 D进制的A+B (20 分)
    PAT B1018 锤子剪刀布 (20 分)
    PAT B1017 A除以B (20 分)
    PAT B1015 德才论 (25 分)
    PAT B1013 数素数 (20 分)
    PAT B1010 一元多项式求导 (25 分)
    HDU 1405 The Last Practice
    HDU 1165 Eddy's research II
  • 原文地址:https://www.cnblogs.com/hellowhy/p/14800761.html
Copyright © 2011-2022 走看看