zoukankan      html  css  js  c++  java
  • Kafka Connect使用入门-Mysql数据导入到ElasticSearch

    1.Kafka Connect

    Connect是Kafka的一部分,它为在Kafka和外部存储系统之间移动数据提供了一种可靠且伸缩的方式,它为连接器插件提供了一组API和一个运行时-Connect负责运行这些插件,它们负责移动数据。Connect以worker进程集群的方式运行,基于work进程安装连接器插件,然后使用REST API管理和配置connector,这些work进程都是长时间运行的作业。connector启动额外的task,利用work节点的资源以并行的方式移动大量的数据。SourceConnector负责从源系统读取数据,并把数据对象提供给work进程,SinkConnector负责从work进程获取数据,并把它们写入目标系统。

    2.Connect中一些概念

    连接器:实现了Connect API,决定需要运行多少个任务,按照任务来进行数据复制,从work进程获取任务配置并将其传递下去

    任务:负责将数据移入或移出Kafka

    work进程:相当与connector和任务的容器,用于负责管理连接器的配置、启动连接器和连接器任务,提供REST API

    转换器:kafka connect和其他存储系统直接发送或者接受数据之间转换数据

    3.运行Connect

    //分布模式
    cd kafka/bin sh connect-distributed.sh ../config/connect-distributed.properties

    connect-distributed.properties中有一些配置:

    bootstrap.servers:kafka集群信息
    #相同id的connect worker属于一个Connect集群
    group.id:group.id=connect-cluster
    #定义数据在Kafka中存储形式
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

    REST API查看、管理connectors

    查看kafka支持的connector
    curl -X GET http://ip:8083/connector-plugins
    GET /connectors – 返回所有正在运行的connector名。 
    POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。 
    GET /connectors/{name} – 获取指定connetor的信息。 
    GET /connectors/{name}/config – 获取指定connector的配置信息。 
    PUT /connectors/{name}/config – 更新指定connector的配置信息。 
    GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 
    GET /connectors/{name}/tasks – 获取指定connector正在运行的task。 
    GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。 
    PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。 
    PUT /connectors/{name}/resume – 恢复一个被暂停的connector。 
    POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用 
    POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。 
    DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。 

    apache kafka默认支持FileStreamSinkConnector、FileStreamSourceConnector。Confluent实现很多开源的connector,也可以自己根据Connect API实现自定义的connector。

    4. 连接器示例-从MySQL到ElasticSearch

    4.1 下载连接器

    confluentinc-kafka-connect-elasticsearch-5.0.0、confluentinc-kafka-connect-jdbc-5.0.0,将两个文件中lib中jar包放在运行connect worker节点中kafka安装路径下的lib目录,另外mysql-connector-java-5.1.22.jar也要放进去

    confluent 中的连接器使用说明 https://docs.confluent.io/2.0.0/connect/connect-jdbc/docs/index.html

    4.2 重启Connect

     验证插件是否加载成功

    curl -X GET http://ip:8083/connector-plugins
    [{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"5.0.0"},{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"5.0.0"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"5.0.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"1.0"}]

    4.3 mysql建立测试表

    mysql> create table login(username varchar(50),login_time datetime);
    Query OK, 0 rows affected (0.73 sec)
    
    mysql> insert into login values('przhang',now());
    Query OK, 1 row affected (0.03 sec)
    
    mysql> insert into login values('peter',now());
    Query OK, 1 row affected (0.00 sec)

    4.4 启动jdbc-connector

    echo '{"name":"mysql-login-connector","config":{"connector.class":"JdbcSourceConnector","connection.url":"jdbc:mysql://localhost:3306/dwwspdb?user=dw_wspdb&password=dw_wspdb","mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp.column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http://ip:8083/connectors --header "Content-Type:application/json"

    JdbcSourceConnector一些配置说明

    connection.url,mysql数据库连接

    mode:timestamp  && "timestamp.column.name":"login_time",表示识别根据login_time时间列来识别增量数据,一旦这一列值发生变化,就会有一天新的记录写到kafka主题

    mode:incrementing && "incrementing.column.id":"id",适合还有自增列的表,一旦有新的记录入mysq,就会有新的记录写到kafka主题

    topic.prefix:mysql.,表示写到kafka的主题为mysql.表名

    查看kafka主题中的消息

    sh kafka-console-consumer.sh --bootstrap-server=kafkaip:9092 --topic mysql.login --from-beginning
    {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"przhang","login_time":1540453531000}}
    {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"peter","login_time":1540453540000}}
    
    mysql数据更新:
    update login set login_time=now() where username='przhang';
    kafka实时输出:
    {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"username"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},"payload":{"username":"przhang","login_time":1540454254000}}

    4.5 启动ElasticsearchSinkConnector

    echo '{"name":"elastic-login-connector","config":{"connector.class":"ElasticsearchSinkConnector","connection.url":"http://ESIP:9200","type.name":"mysql-data","topics":"mysql.login","key.ignore":true}}' | curl -X POST -d @- http://ip:8083/connectors --header "Content-Type:application/json"

    ElasticsearchSinkConnector一些配置:

    connection.url,es连接

    type.name,写入ES的索引类别

    key.ignore=true,表示写入ES的每条记录的键为kafka主题名字+分区id+偏移量

    从ES中查看数据:

    curl -X GET http://ESIP:9200/mysql.login/_search?pretty=true
    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : 5,
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "mysqllogin",
            "_type" : "mysql-data",
            "_id" : "mysqllogin+3+0",
            "_score" : 1.0,
            "_source" : {
              "username" : "przhang",
              "login_time" : 1540453531000
            }
          },
          {
            "_index" : "mysqllogin",
            "_type" : "mysql-data",
            "_id" : "mysqllogin+3+3",
            "_score" : 1.0,
            "_source" : {
              "username" : "mayun",
              "login_time" : 1540454531000
            }
          },
          {
            "_index" : "mysqllogin",
            "_type" : "mysql-data",
            "_id" : "mysqllogin+3+2",
            "_score" : 1.0,
            "_source" : {
              "username" : "przhang",
              "login_time" : 1540454254000
            }
          },
          {
            "_index" : "mysqllogin",
            "_type" : "mysql-data",
            "_id" : "mysqllogin+3+4",
            "_score" : 1.0,
            "_source" : {
              "username" : "pony",
              "login_time" : 1540473988000
            }
          },
          {
            "_index" : "mysqllogin",
            "_type" : "mysql-data",
            "_id" : "mysqllogin+3+1",
            "_score" : 1.0,
            "_source" : {
              "username" : "peter",
              "login_time" : 1540453540000
            }
          }
        ]
      }
    }
  • 相关阅读:
    04.Javascript——入门一些方法记录之iterable
    03.Javascript——入门一些方法记录之Map和Set
    02.Javascript——入门一些方法记录之Object
    unity 工具开发基础
    lua热重载
    unity接入安卓sdk (unity调用安卓工程)
    转 Unity企业级支持案例与分析
    ugui的优化
    c#各个版本的特性
    最好用的lua编辑器--------emmylua使用汇总
  • 原文地址:https://www.cnblogs.com/darange/p/9857698.html
Copyright © 2011-2022 走看看