zoukankan      html  css  js  c++  java
  • 初识kafka-connect

    一、kakfak-connect简介

    kafka-connet是一个工具,用来在kafka和外部数据存储系统之间移动数据,kafka-connect可以简单快捷地将数据从kafka导入或导出,数据范围涵盖了关系型数据库、日志、度量数据、Hadoop、数据仓库、NoSql数据存储、ES等。

    kafka-connect架构如下(图片来源:百度):

    kafka-connect有两个核心概念:Source和Sink。Source:负责导入数据到kafka,Sink负责从kafka导出数据,它们统称Connector,即连接器。

    另外还有两个重要概念:Task和Worker,每一个Connector都会协调一系列的task去执行任务,Connector把一项工作任务分割成许多的task,然后把task分发到各个worker进程中去执行。task不保存自己的状态信息,而是交给特定的kafka主题去保存。

    kafka-connect提供了以下特性:

    即:

    • 通用性:规范化其他数据系统与kafka的继集成,简化了连接器的开发、部署和管理
    • 支持独立模式(standalone)分布式模式(distributed)
    • REST接口:使用Rest API提交和管理Connector
    • 自动位移管理:自动管理位移的提交,不需要开发人员干预,降低了开发成本
    • 分布式和可扩展性:Kafka Connect 基于现有的组管理协议来实现扩展Kafka Connect 集群
    • 流式计算和批处理的集成

    kafka中通过connect-standalone.sh和connect-distributed.sh命令来实现独立模式和分布式模式运行的Kafka Connect,可以在kafka的/bin目录下看到:

    二、独立模式

    在独立模式中,所有操作都是在一个进程中完成的,它比较适合测试和功能验证的场景,但是无法充分利用kafka自身所提供的负载均衡和高容错特性。

    下面来演示一下使用独立模式将一个文件中的内容导入到kafka中。

    2.1 Source连接器用法

    1、修改配置文件

    • connect-standalone.properties:用于Work进程运行的配置文件
    • connect-file-source.properties:Source连接器的配置文件

    connect-standalone.properties内容如下(一般情况下使用默认配置即可):

    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets

    connect-file-source.properties

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/kafka/test-connect-source.txt
    topic=topic-connect

    要修改:

    file:该连接器数据源文件路径

    topic:设置连接器将数据导入哪个主题,如果该主题不存在则会自动创建,当然也可以自己提前创建好(推荐)

    2、创建topic

    我选择手动创建topic-connect,创建命令如下:

    ./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect --replication-factor 1 --partitions 1

    创建完成后查看以下topic的创建结果:

    ./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect

    创建结果如下:

     3、启动source连接器

    ./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-source.properties

    4、向test-connect-source.txt文件写入数据

    echo "hello world">>/tmp/kafka/test-connect-source.txt

    5、查看结果

    查看的方式有两种,一种是通过kafka-console-consumer.sh脚本,一种是kafka-dump-log.sh脚本,前者可以实时查看效果,后者每次写入后都要执行命令才能看到,下面演示通过这两种方式查看的效果:

    5.1 kafka-console-consumer.sh
    ./kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-connect

    结果如下:

     5.2 kafka-dump-log.sh
    ./kafka-dump-log --files /usr/local/var/lib/kafka-logs/topic-connect-0/00000000000000000000.log --print-data-log

    结果如下:

     

     以上是Source连接器的用法,下面再来探索一下Sink连接器的用法.

    2.2 Sink连接器用法

    1、修改配置文件

    • connect-standalone.properties:用于Work进程运行的配置文件
    • connect-file-sink.properties:Sink连接器的配置文件

    connect-standalone.properties内容如下(需要修改key和value的converter方式):

    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets

    connect-file-sink.properties内容如下:

    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    file=/tmp/kafka/test-connect-sink.txt
    topic=topic-connect-sink

    2、创建topic

    我选择手动创建topic-connect,创建命令如下:

    ./kafka-topics --zookeeper localhost:2181 --create --topic topic-connect-sink --replication-factor 1 --partitions 1

    创建完成后查看以下topic的创建结果:

    ./kafka-topics --zookeeper localhost:2181 --describe --topic topic-connect-sink

     

     3、启动sink连接器

    ./bin/connect-standalone ./libexec/config/connect-standalone.properties ./libexec/config/connect-file-sink.properties

    4、发送消息

    发送消息到topic-connect-sink

    ./bin/kafka-console-producer --broker-list localhost:9092 --topic topic-connect-sink

    5、查看sink文件

    cat test-connect-sink.txt

     可以看到发送到test-connect-sink 这个topic的消息成功存储到sink文件test-connect-sink.txt中了。

    三、问题

    在实践sink的过程中,我本来是想通过Source将一条消息从source文件导入kafka,同时通过Sink将该条消息从kafka中导出到sink文件,配置如下:

    只要将connect-file-source.properties和connect-file-sink.properties这两个配置文件中的topic改成相同的即可,但是执行之后却发现:

    • 当向test-connect-source.txt文件写入消息时,并不会在test-connect-sink.txt文件中写入,也不会在通过kafka-console-consumer.sh消费到;
    • 当通过命令向topic-connect发送消息时,不会写入test-connect-sink.txt文件,但是能通过kafka-console-consumer.sh消费到;

    后来经过一下午的排查,居然发现一直改的是connect-console-source.propertiesconnect-console-sink.properties文件,而不是connect-file-source.propertiesconnect-file-properties,

    真的也是被自己蠢哭的一下午,然后重新修改了配置文件,通过以下命令重启connecter之后,在conncect-source.txt写入,在connect-sink.txt中就能看到了。

    四、REST API

    可以通过Kafka Connect 提供的基于REST风格的API接口来管理连接器,默认端口号是8083,可以通过Worker进程配置文件中的rest.port参数来修改端口号。

    如:

    curl localhost:8083/connectors

    方法请求类型 REST API 接口说明
    GET / 查看kafka集群版本信息
    GET /connectors 查看当前活跃的连接器列表,显示连接器的名字,即配置connector配置文件中的name属性
    POST /connectors 根据指定配置,创建一个新的连接器
    GET /connectors/{name} 查看指定连接器的信息
    GET /connectors/{name}/config 查看指定连接器的配置信息
    PUT /connectors/{name}/config 修改指定连接器的配置信息
    GET /connectors/{name}/state 查看指定连接器的状态
    POST /connectors/{name}/restart 重启指定的连接器
    PUT /connectors/{name}/pause 暂停指定的连接器
    GET /connectors/{name}/tasks 查询指定连接器正在运行的task
    POST /connectors/name}/tasks 修改指定连接器的Task配置
    GET /connectors/{name}/tasks/{taskId}/status 查询指定连接器中指定Task的状态
    POST /connectors/{name}/tasks/{taskId}/restat 重启指定连接器中指定的Task
    DELETE /connectors/{name} 删除指定的连接器

    大周末的,写了一天博客,这效率太低了,出门透透气~~~

    参考文献:

    朱忠华 《深入理解Kafka核心设计与实践原理》

    最美好的时光里,不要一直是一个lowser!
  • 相关阅读:
    很火的华为太空表网站源码
    exists用法 exists用法讲解
    mysql 建立索引在on 从句中_MySQL优化
    mysql on 条件会走索引吗
    【算法理论】动归入门[C语言描述]
    机器学习基础考试复习
    【基础知识】深度学习500问之生成对抗网络
    【王道数据结构】《王道数据结构》课后代码题汇总
    【C语言实现】数据结构算法题及答案
    【题目归档】考研数据结构算法题目归档
  • 原文地址:https://www.cnblogs.com/hellowhy/p/14800761.html
Copyright © 2011-2022 走看看