zoukankan      html  css  js  c++  java
  • Kafka connect in practice(3): distributed mode mysql binlog ->kafka->hive

    In the previous post Kafka connect in practice(1): standalone, I have introduced about the basics of kafka connect  configuration and demonstrate a local standalone demo. In this post we will show the knowledge about distributed data pull an sink. To start, do make sure the kafka broker and zookeeper have been started!

    1. configuration

    vim $KAFKA_HOME/bin/connect-distributed.properties

    set contents of this file as:

    # These are defaults. This file just demonstrates how to override some settings.
    #A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. 
    #The client will make use of all servers irrespective of which servers are specified here for bootstrapping—
    #this list only impacts the initial hosts used to discover the full set of servers. 
    #This list should be in the form host1:port1,host2:port2,.... 
    #Since these servers are just used for the initial connection to discover the full cluster membership 
    #(which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, 
    #in case a server is down).
    #notes: this configuration is required.
    bootstrap.servers=localhost:9092
    
    # unique name for the cluster, used in forming the Connect cluster group. 
    #Note that this must not conflict with consumer group IDs.
    #notes: this configuration is required.
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    #Converter class for key Connect data. This controls the format of the data that will be written to Kafka for source connectors or read from Kafka for sink connectors. 
    #Popular formats include Avro and JSON.
    #notes: this configuration is required.
    key.converter=org.apache.kafka.connect.json.JsonConverter
    
    #Converter class for value Connect data. This controls the format of the data that will be written to 
    #Kafka for source connectors or read from Kafka for sink connectors. Popular formats include Avro and JSON.
    #notes: this configuration is required.
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
    # it to, 
    #if we want to see the schema of the message ,we can turn on the *.converter.schemas.enable 
    #vice, if we don't wana to see the schema of the message, we should turn of the *.converter.schemas.enable 
    # generally speaking, if dev and testing env, we can turn the following attributes on for tracking consideration,
    # and turned off in production consideration for network and disk capacity usage.
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    
    #The name of the topic where connector and task configuration offsets are stored. This must be the same for all workers with 
    #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and 
    #a compacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to 
    #create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of 
    #partitions (e.g., 25 or 50, just like Kafka's built-in __consumer_offsets topic) to support large Kafka Connect clusters.
    offset.storage.topic=connect-offsets
    #The replication factor used when Connect creates the topic used to store connector offsets. This should always be at least
    # 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
    offset.storage.replication.factor=1
    #The number of partitions used when Connect creates the topic used to store connector offsets. A large value (e.g., 25 or 50, 
    #just like Kafka's built-in __consumer_offsets topic) is necessary to support large Kafka Connect clusters.
    offset.storage.partitions=50
    
    #The name of the topic where connector and task configuration data are stored. This must be the same for all workers with 
    #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with a single-partition and 
    #ompacted cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create 
    #this topic manually, always create it as a compacted topic with a single partition and a high replication factor (3x or more).
    config.storage.topic=connect-configs
    #The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data. 
    #This should always be at least 3 for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
    config.storage.replication.factor=1
    config.storage.partitions=1
    
    #The name of the topic where connector and task configuration status updates are stored. This must be the same for all workers with 
    #the same group.id. Kafka Connect will upon startup attempt to automatically create this topic with multiple partitions and a compacted 
    #cleanup policy to avoid losing data, but it will simply use the topic if it already exists. If you choose to create this topic manually, 
    #always create it as a compacted, highly replicated (3x or more) topic with multiple partitions.
    status.storage.topic=connect-status
    #The replication factor used when Connect creates the topic used to store connector and task status updates. This should always be at least 3 
    #for a production system, but cannot be larger than the number of Kafka brokers in the cluster.
    status.storage.replication.factor=1
    #The number of partitions used when Connect creates the topic used to store connector and task status updates.
    status.storage.partitions=10
    
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    # These are provided to inform the user about the presence of the REST host and port configs 
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    #Hostname for the REST API. If this is set, it will only bind to this interface.
    #notes: this configuration is optional
    rest.host.name=localhost
    #Port for the REST API to listen on.
    #notes: this configuration is optional
    rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=
    #rest.advertised.port=
    rest.advertised.host.name=127.0.0.1
    rest.advertised.port=8083
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Note: symlinks will be followed to discover dependencies or plugins.
    plugin.path=/home/lenmom/workspace/software/kafka_2.11-2.1.0/connect
    
    #Converter class for internal key Connect data that implements the Converter interface. Used for converting data like offsets and configs.
    #notes: this configuration is optional
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    #Converter class for offset value Connect data that implements the Converter interface. Used for converting data like offsets and configs.
    #notes: this configuration is optional
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    #notes: this configuration is optional
    task.shutdown.graceful.timeout.ms=10000
    #notes: this configuration is optional
    offset.flush.timeout.ms=5000

    2. download a debezium-connector-mysql plugin tarbar and unzip it into the the folder defined at the end of connect-distributed.properties

    2. start mysql server with bin-log enabled

    for detail please refer my previous blog post mysql 5.7 enable binlog

    3. start the kafka connect distributed

    sh $KAFKA_HOME/connect-distributed.sh   $KAFKA_HOME/config/connect-distributed.properties 
    #or start in background 
    #sh $KAFKA_HOME/connect-distributed.sh  --daemon  $KAFKA_HOME/config/connect-distributed.properties  

    it starts with the following screenshot.

    be aware that the INFO Added aliases 'MySqlConnector' and 'MySql' to plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:390), our target plugin has been loaded by the kafka connect distributed.

    4. create a the demo database in mysql 

    # In production you would almost certainly limit the replication user must be on the follower (slave) machine,
    # to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
    #
    # However, this grant is equivalent to specifying *any* hosts, which makes this easier since the docker host
    # is not easily known to the Docker container. But don't do this in production.
    #
    GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator' IDENTIFIED BY 'replpass';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
    
    # Create the database that we'll use to populate data and watch the effect in the binlog
    DROP DATABASE if exists inventory;
    CREATE DATABASE
    if not exists inventory;
    GRANT ALL PRIVILEGES ON inventory.
    * TO 'root'@'%'; # Switch to this database USE inventory; # Create and populate our products using a single insert with many rows CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter",3.14), (default,"car battery","12V car battery",8.1), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8), (default,"hammer","12oz carpenter's hammer",0.75), (default,"hammer","14oz carpenter's hammer",0.875), (default,"hammer","16oz carpenter's hammer",1.0), (default,"rocks","box of assorted rocks",5.3), (default,"jacket","water resistent black wind breaker",0.1), (default,"spare tire","24 inch spare tire",22.2); # Create and populate the products on hand using multiple inserts CREATE TABLE products_on_hand ( product_id INTEGER NOT NULL PRIMARY KEY, quantity INTEGER NOT NULL, FOREIGN KEY (product_id) REFERENCES products(id) ); INSERT INTO products_on_hand VALUES (101,3); INSERT INTO products_on_hand VALUES (102,8); INSERT INTO products_on_hand VALUES (103,18); INSERT INTO products_on_hand VALUES (104,4); INSERT INTO products_on_hand VALUES (105,5); INSERT INTO products_on_hand VALUES (106,0); INSERT INTO products_on_hand VALUES (107,44); INSERT INTO products_on_hand VALUES (108,2); INSERT INTO products_on_hand VALUES (109,5); # Create some customers ... CREATE TABLE customers ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE KEY ) AUTO_INCREMENT=1001; INSERT INTO customers VALUES (default,"Sally","Thomas","sally.thomas@acme.com"), (default,"George","Bailey","gbailey@foobar.com"), (default,"Edward","Walker","ed@walker.com"), (default,"Anne","Kretchmar","annek@noanswer.org"); # Create some veyr simple orders CREATE TABLE orders ( order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATE NOT NULL, purchaser INTEGER NOT NULL, quantity INTEGER NOT NULL, product_id INTEGER NOT NULL, FOREIGN KEY order_customer (purchaser) REFERENCES customers(id), FOREIGN KEY ordered_product (product_id) REFERENCES products(id) ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2016-01-16', 1001, 1, 102), (default, '2016-01-17', 1002, 2, 105), (default, '2016-02-19', 1002, 2, 106), (default, '2016-02-21', 1003, 1, 107);

    5. Register a mysql connector using kafka connector rest api

    5.1 check connector version

    curl -H "Accept:application/json" localhost:8083/

    output

    {"version":"2.1.0","commit":"809be928f1ae004e","kafka_cluster_id":"NGQRxNZMSY6Q53ktQABHsQ"}

    5.2 get current connector list

    lenmom@M1701:~/$ curl -H "Accept:application/json" localhost:8083/connectors/
    []

    the ouput indicates there's no connector in the distributed connector.

    5.3 registet a mysql connector instance to in the distributed connector

    $ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "127.0.0.1", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "127.0.0.1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

    the formated content describes as follows for readable consideration

    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "127.0.0.1",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "184054",
        "database.server.name": "127.0.0.1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
      }
    }

     the response of the post request show as follows:

    HTTP/1.1 201 Created
    Date: Tue, 07 apr 2019 16:49:34 GMT
    Location: http://localhost:8083/connectors/inventory-connector
    Content-Type: application/json
    Content-Length: 471
    Server: Jetty(9.2.15.v20160210)
    
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "184054",
        "database.server.name": "127.0.0.1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
        "database.history.kafka.topic": "dbhistory.inventory",
        "name": "inventory-connector"
      },
      "tasks": []
    }

     5.4 get connector using curl again, there should exist a connector since we have just registered one.

    curl -H "Accept:application/json" localhost:8083/connectors/

    output 

    ["inventory-connector"]

    5.5 get connector detail

    curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

    output

    HTTP/1.1 200 OK
    Date: Wed, 24 Apr 2019 10:15:16 GMT
    Content-Type: application/json
    Content-Length: 536
    Server: Jetty(9.4.12.v20180830)
    
    {
      "name": "inventory-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.user": "root",
        "database.server.id": "184054",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",
        "database.history.kafka.topic": "dbhistory.inventory",
        "database.server.name": "127.0.0.1",
        "database.port": "3306",
        "database.hostname": "127.0.0.1",
        "database.password": "root",
        "name": "inventory-connector",
        "database.whitelist": "inventory"
      },
      "tasks": [
        {
          "connector": "inventory-connector",
          "task": 0
        }
      ],
      "type": "source"
    }

    the "task":0 indicate there is one task with id 0.

    5.5 list the kafka topics 

    sh $KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --list 

    output 

    127.0.0.1
    127.0.0.1.inventory.customers
    127.0.0.1.inventory.orders
    127.0.0.1.inventory.products
    127.0.0.1.inventory.products_on_hand
    __consumer_offsets
    connect-configs
    connect-offsets
    connect-status
    connect-test
    dbhistory.inventory

    and this indicate the mysql connector has started watching the mysql data changes and begin to push the changed data to kafka broker.

     5.6 abserve the data in the kafka topic

    sh $KAFKA_HOME/bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic 127.0.0.1.inventory.customers --from-beginning   # all the data changes for  table customers in database inventory would be listed

    output:

    {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}

    let's take a record as formated as example,say the first one.

    {
      "before": null,
      "after": {
        "id": 1001,
        "first_name": "Sally",
        "last_name": "Thomas",
        "email": "sally.thomas@acme.com"
      },
      "source": {
        "version": "0.9.4.Final",
        "connector": "mysql",
        "name": "127.0.0.1",
        "server_id": 0,
        "ts_sec": 0,
        "gtid": null,
        "file": "mysql-bin.000012",
        "pos": 154,
        "row": 0,
        "snapshot": true,
        "thread": null,
        "db": "inventory",
        "table": "customers",
        "query": null
      },
      "op": "c",
      "ts_ms": 1556100441651
    }

    the op field indicate the data change type: 

    • c: insert a record into the database.   if c, the before element would be null.
    • d: delete a record in the database.    if d, the after element would be null
    • u:update a record in the database.   the before element indicate the data in the database when the update take action, and the after element indicate the data after the update take action.

    let's do some update and delete operations in mysql database to see the changed data captured in kafka.

    mysql> select * from customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
    +------+------------+-----------+-----------------------+
    4 rows in set (0.00 sec)
    
    mysql> update customers set first_name='1234' where id=1004;
    Query OK, 1 row affected (0.01 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    
    mysql> delete from customers where id=1004;
    Query OK, 1 row affected (0.01 sec)
    
    mysql> select * from customers;
    +------+------------+-----------+-----------------------+
    | id   | first_name | last_name | email                 |
    +------+------------+-----------+-----------------------+
    | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
    | 1002 | George     | Bailey    | gbailey@foobar.com    |
    | 1003 | Edward     | Walker    | ed@walker.com         |
    +------+------------+-----------+-----------------------+
    3 rows in set (0.00 sec)

    as it shows, we first update the record with id=1004 field first_name to 1234, and then delete the record.

    kafka record:

    {"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000012","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1556100441651}
    {"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102881,"gtid":null,"file":"mysql-bin.000012","pos":364,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"u","ts_ms":1556102881385}
    {"before":{"id":1004,"first_name":"1234","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"0.9.4.Final","connector":"mysql","name":"127.0.0.1","server_id":223344,"ts_sec":1556102900,"gtid":null,"file":"mysql-bin.000012","pos":725,"row":0,"snapshot":false,"thread":2,"db":"inventory","table":"customers","query":null},"op":"d","ts_ms":1556102900269}
    null

    there's two more record in the kafka related topic.

    update:

    {
      "before": {
        "id": 1004,
        "first_name": "Anne",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "after": {
        "id": 1004,
        "first_name": "1234",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "source": {
        "version": "0.9.4.Final",
        "connector": "mysql",
        "name": "127.0.0.1",
        "server_id": 223344,
        "ts_sec": 1556102881,
        "gtid": null,
        "file": "mysql-bin.000012",
        "pos": 364,
        "row": 0,
        "snapshot": false,
        "thread": 2,
        "db": "inventory",
        "table": "customers",
        "query": null
      },
      "op": "u",
      "ts_ms": 1556102881385
    }

    delete:

    {
      "before": {
        "id": 1004,
        "first_name": "1234",
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org"
      },
      "after": null,
      "source": {
        "version": "0.9.4.Final",
        "connector": "mysql",
        "name": "127.0.0.1",
        "server_id": 223344,
        "ts_sec": 1556102900,
        "gtid": null,
        "file": "mysql-bin.000012",
        "pos": 725,
        "row": 0,
        "snapshot": false,
        "thread": 2,
        "db": "inventory",
        "table": "customers",
        "query": null
      },
      "op": "d",
      "ts_ms": 1556102900269
    }

    if we stop the mysql connector or restart kafka broker, we should see the data still exist in the kafka since it persistent message offsets and data in kafka's specific topic we configed in connect-distributed.properties

    6. register hdfs   connector plugin in distributed connect 

    cd $KAFKA_HOME/connect                           # goto kafka connect plugin folder
    wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-hdfs/versions/5.2.1/confluentinc-kafka-connect-hdfs-5.2.1.zip   #download the hdfs plugin 
    unzip confluentinc-kafka-connect-hdfs-5.2.1.zip  # unzip the hdfs plugin to the connect plugin folder
    rm -f confluentinc-kafka-connect-hdfs-5.2.1.zip

    7. restart the kafka connect distributed server to reload the hdfs plugin

    if in temimal ,we can use just crtl + c to stop it,  and if in background mode, we can get the process id using lsof -i:8083 and then kill -9 {processid just queryed}, as list below.

    lenmom@M1701:~/workspace$ lsof -i:8083    
    COMMAND  PID   USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
    java    8257 lenmom  314u  IPv6 2015241      0t0  TCP localhost:8083 (LISTEN)
    lenmom@M1701:~/workspace$ kill 8257

    and then restart the connct distributed using command:

    sh $KAFKA_HOME/connect-distributed.sh   $KAFKA_HOME/config/connect-distributed.properties 

    then we can find the hdfs connect plugin has been load in the output terminal

    [2019-04-24 19:29:03,939] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/lenmom/workspace/software/kafka_2.11-2.1.0/connect/confluentinc-kafka-connect-hdfs-5.2.1/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:243)
    [2019-04-24 19:29:03,939] INFO Added plugin 'io.confluent.connect.hdfs.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
    [2019-04-24 19:29:03,941] INFO Added plugin 'io.confluent.connect.hdfs.HdfsSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
    [2019-04-24 19:29:03,941] INFO Added plugin 'io.confluent.connect.storage.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
  • 相关阅读:
    wget命令
    Linux常用命令大全
    centos7 中文乱码问题解决方法
    Linux软件安装的补充
    redis在Linux上的安装
    Linux下tomcat的安装
    Linux常见命令
    Linux下jdk安装过程
    JAVA中日期处理
    JAVA中File类的使用
  • 原文地址:https://www.cnblogs.com/lenmom/p/10763589.html
Copyright © 2011-2022 走看看