zoukankan      html  css  js  c++  java
  • Kafka Connect REST Interface

    Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. When executed in distributed mode, the REST API will be the primary interface to the cluster. You can make requests to any cluster member; the REST API automatically forwards requests if required.

    Although you can use the standalone mode just by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and more.

    Currently the top level resources are connector and connector-plugins. The sub-resources for connector lists configuration settings and tasks and the sub-resource for connector-plugins provides configuration validation and recommendation.

    Note that if you try to modify, update or delete a resource under connector which may require the request to be forwarded to the leader, Connect will return status code 409 while the worker group rebalance is in process as the leader may change during rebalance.

    Content Types

    Currently the REST API only supports application/json as both the request and response entity content type. Your requests should specify the expected content type of the response via the HTTP Accept header:

    Accept: application/json
    
    Copy

    and should specify the content type of the request entity (if one is included) via the Content-Type header:

    Content-Type: application/json
    
    Copy

    Statuses & Errors

    The REST API will return standards-compliant HTTP statuses. Clients should check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them.

    When possible, all endpoints will use a standard error message format for all errors (status codes in the 400 or 500 range). For example, a request entity that omits a required field may generate the following response:

    HTTP/1.1 422 Unprocessable Entity
    Content-Type: application/json
    
    {
        "error_code": 422,
        "message": "config may not be empty"
    }
    
    Copy

    Connectors

    GET /connectors

    Get a list of active connectors

    Response JSON Object:
     
    • connectors (array) -- List of connector names

    Example request:

    GET /connectors HTTP/1.1
    Host: connect.example.com
    Accept: application/json
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    Content-Type: application/json
    
    ["my-jdbc-source", "my-hdfs-sink"]
    
    Copy
    POST /connectors

    Create a new connector, returning the current connector info if successful. Return 409 (Conflict) if rebalance is in process.

    Request JSON Object:
     
    • name (string) -- Name of the connector to create
    • config (map) -- Configuration parameters for the connector. All values should be strings.
    Response JSON Object:
     
    • name (string) -- Name of the created connector
    • config (map) -- Configuration parameters for the connector.
    • tasks (array) -- List of active tasks generated by the connector
    • tasks[i].connector (string) -- The name of the connector the task belongs to
    • tasks[i].task (int) -- Task ID within the connector.

    Example request:

    POST /connectors HTTP/1.1
    Host: connect.example.com
    Content-Type: application/json
    Accept: application/json
    
    {
        "name": "hdfs-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
            "tasks.max": "10",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        }
    }
    
    Copy

    Example response:

    HTTP/1.1 201 Created
    Content-Type: application/json
    
    {
        "name": "hdfs-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
            "tasks.max": "10",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        },
        "tasks": [
            { "connector": "hdfs-sink-connector", "task": 1 },
            { "connector": "hdfs-sink-connector", "task": 2 },
            { "connector": "hdfs-sink-connector", "task": 3 }
        ]
    }
    
    Copy
    GET /connectors/(string:name)

    Get information about the connector.

    Response JSON Object:
     
    • name (string) -- Name of the created connector
    • config (map) -- Configuration parameters for the connector.
    • tasks (array) -- List of active tasks generated by the connector
    • tasks[i].connector (string) -- The name of the connector the task belongs to
    • tasks[i].task (int) -- Task ID within the connector.

    Example request:

    GET /connectors/hdfs-sink-connector HTTP/1.1
    Host: connect.example.com
    Accept: application/json
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    Content-Type: application/json
    
    {
        "name": "hdfs-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
            "tasks.max": "10",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        },
        "tasks": [
            { "connector": "hdfs-sink-connector", "task": 1 },
            { "connector": "hdfs-sink-connector", "task": 2 },
            { "connector": "hdfs-sink-connector", "task": 3 }
        ]
    }
    
    Copy
    GET /connectors/(string:name)/config

    Get the configuration for the connector.

    Response JSON Object:
     
    • config (map) -- Configuration parameters for the connector.

    Example request:

    GET /connectors/hdfs-sink-connector/config HTTP/1.1
    Host: connect.example.com
    Accept: application/json
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    Content-Type: application/json
    
    {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
    
    Copy
    PUT /connectors/(string:name)/config

    Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return 409 (Conflict) if rebalance is in process.

    Request JSON Object:
     
    • config (map) -- Configuration parameters for the connector. All values should be strings.
    Response JSON Object:
     
    • name (string) -- Name of the created connector
    • config (map) -- Configuration parameters for the connector.
    • tasks (array) -- List of active tasks generated by the connector
    • tasks[i].connector (string) -- The name of the connector the task belongs to
    • tasks[i].task (int) -- Task ID within the connector.

    Example request:

    PUT /connectors/hdfs-sink-connector/config HTTP/1.1
    Host: connect.example.com
    Accept: application/json
    
    {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
    
    Copy

    Example response:

    HTTP/1.1 201 Created
    Content-Type: application/json
    
    {
        "name": "hdfs-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
            "tasks.max": "10",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        },
        "tasks": [
            { "connector": "hdfs-sink-connector", "task": 1 },
            { "connector": "hdfs-sink-connector", "task": 2 },
            { "connector": "hdfs-sink-connector", "task": 3 }
        ]
    }
    
    Copy

    Note that in this example the return status indicates that the connector was Created. In the case of a configuration update the status would have been 200 OK.

    GET /connectors/(string:name)/status

    Get current status of the connector, including whether it is running, failed or paused, which worker it is assigned to, error information if it has failed, and the state of all its tasks.

    Response JSON Object:
     
    • name (string) -- The name of the connector.
    • connector (map) -- The map containing connector status.
    • tasks[i] (map) -- The map containing the task status.

    Example request:

    GET /connectors/hdfs-sink-connector/status HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    {
        "name": "hdfs-sink-connector",
        "connector": {
            "state": "RUNNING",
            "worker_id": "fakehost:8083"
        },
        "tasks":
        [
            {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "fakehost:8083"
            },
            {
                "id": 1,
                "state": "FAILED",
                "worker_id": "fakehost:8083",
                "trace": "org.apache.kafka.common.errors.RecordTooLargeException
    "
            }
        ]
    }
    
    Copy
    POST /connectors/(string:name)/restart

    Restart the connector and its tasks. Return 409 (Conflict) if rebalance is in process.

    Example request:

    POST /connectors/hdfs-sink-connector/restart HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    Copy
    PUT /connectors/(string:name)/pause

    Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time.

    Example request:

    PUT /connectors/hdfs-sink-connector/pause HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 202 Accepted
    
    Copy
    PUT /connectors/(string:name)/resume

    Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNINGstate at the same time.

    Example request:

    PUT /connectors/hdfs-sink-connector/resume HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 202 Accepted
    
    Copy
    DELETE /connectors/(string:name)/

    Delete a connector, halting all tasks and deleting its configuration. Return 409 (Conflict) if rebalance is in process.

    Example request:

    DELETE /connectors/hdfs-sink-connector HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 204 No Content
    
    Copy

    Tasks

    GET /connectors/(string:name)/tasks

    Get a list of tasks currently running for the connector.

    Response JSON Object:
     
    • tasks (array) -- List of active task configs that have been created by the connector
    • tasks[i].id (string) -- The ID of task
    • tasks[i].id.connector (string) -- The name of the connector the task belongs to
    • tasks[i].id.task (int) -- Task ID within the connector.
    • tasks[i].config (map) -- Configuration parameters for the task

    Example request:

    GET /connectors/hdfs-sink-connector/tasks HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    [
        {
            "task.class": "io.confluent.connect.hdfs.HdfsSinkTask",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        },
        {
            "task.class": "io.confluent.connect.hdfs.HdfsSinkTask",
            "topics": "test-topic",
            "hdfs.url": "hdfs://fakehost:9000",
            "hadoop.conf.dir": "/opt/hadoop/conf",
            "hadoop.home": "/opt/hadoop",
            "flush.size": "100",
            "rotate.interval.ms": "1000"
        }
    ]
    
    Copy
    GET /connectors/(string:name)/tasks/(int:taskid)/status

    Get a task's status.

    Example request:

    GET /connectors/hdfs-sink-connector/tasks/1/status HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    {"state":"RUNNING","id":1,"worker_id":"192.168.86.101:8083"}
    
    Copy
    POST /connectors/(string:name)/tasks/(int:taskid)/restart

    Restart an individual task.

    Example request:
    POST /connectors/hdfs-sink-connector/tasks/1/restart HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    Copy

    Connector Plugins

    GET /connector-plugins/

    Return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means it is possible to see inconsistent results, especially during a rolling upgrade if you add new connector jars.

    Response JSON Object:
     
    • class (string) -- The connector class name.

    Example request:

    GET /connector-plugins/ HTTP/1.1
    Host: connect.example.com
    
    Copy

    Example response:

    HTTP/1.1 200 OK
    
    [
        {
            "class": "io.confluent.connect.hdfs.HdfsSinkConnector"
        },
        {
            "class": "io.confluent.connect.jdbc.JdbcSourceConnector"
        }
    ]
    
    Copy
    PUT /connector-plugins/(string:name)/config/validate

    Validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.

    Request JSON Object:
     
    • config (map) -- Configuration parameters for the connector. All values should be strings.
    Response JSON Object:
     
    • name (string) -- The class name of the connector plugin.
    • error_count (int) -- The total number of errors encountered during configuration validation.
    • groups (array) -- The list of groups used in configuration definitions.
    • configs[i].definition (map) -- The definition for a config in the connector plugin, which includes the name, type, importance, etc.
    • configs[i].value (map) -- The current value for a config, which includes the name, value, recommended values, etc.

    Example request:

    PUT /connector-plugins/FileStreamSinkConnector/config/validate/ HTTP/1.1
    Host: connect.example.com
    Accept: application/json
    
    {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "test-topic"
    }
    
    Copy

    Example response:HTTP/1.1 200 O

    {
        "name": "FileStreamSinkConnector",
        "error_count": 1,
        "groups": [
            "Common"
        ],
        "configs": [
            {
                "definition": {
                    "name": "topics",
                    "type": "LIST",
                    "required": false,
                    "default_value": "",
                    "importance": "HIGH",
                    "documentation": "",
                    "group": "Common",
                    "width": "LONG",
                    "display_name": "Topics",
                    "dependents": [],
                    "order": 4
            },
                "value": {
                    "name": "topics",
                    "value": "test-topic",
                    "recommended_values": [],
                    "errors": [],
                    "visible": true
                }
            },
            {
                "definition": {
                    "name": "file",
                    "type": "STRING",
                    "required": true,
                    "default_value": "",
                    "importance": "HIGH",
                    "documentation": "Destination filename.",
                    "group": null,
                    "width": "NONE",
                    "display_name": "file",
                    "dependents": [],
                    "order": -1
                },
                "value": {
                    "name": "file",
                    "value": null,
                    "recommended_values": [],
                    "errors": [
                        "Missing required configuration "file" which has no default value."
                    ],
                    "visible": true
                }
            },
            {
                "definition": {
                    "name": "name",
                    "type": "STRING",
                    "required": true,
                    "default_value": "",
                    "importance": "HIGH",
                    "documentation": "Globally unique name to use for this connector.",
                    "group": "Common",
                    "width": "MEDIUM",
                    "display_name": "Connector name",
                    "dependents": [],
                    "order": 1
                },
                "value": {
                    "name": "name",
                    "value": "test",
                    "recommended_values": [],
                    "errors": [],
                    "visible": true
                }
            },
            {
                "definition": {
                    "name": "tasks.max",
                    "type": "INT",
                    "required": false,
                    "default_value": "1",
                    "importance": "HIGH",
                    "documentation": "Maximum number of tasks to use for this connector.",
                    "group": "Common",
                    "width": "SHORT",
                    "display_name": "Tasks max",
                    "dependents": [],
                    "order": 3
            },
                "value": {
                    "name": "tasks.max",
                    "value": "1",
                    "recommended_values": [],
                    "errors": [],
                    "visible": true
                }
            },
            {
                "definition": {
                    "name": "connector.class",
                    "type": "STRING",
                    "required": true,
                    "default_value": "",
                    "importance": "HIGH",
                    "documentation": "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name,  or use "FileStreamSink" or "FileStreamSinkConnector" to make the configuration a bit shorter",
                    "group": "Common",
                    "width": "LONG",
                    "display_name": "Connector class",
                    "dependents": [],
                    "order": 2
                },
                "value": {
                    "name": "connector.class",
                    "value": "org.apache.kafka.connect.file.FileStreamSinkConnector",
                    "recommended_values": [],
                    "errors": [],
                    "visible": true
                }
            }
        ]
    }


    reference:
    https://docs.confluent.io/current/connect/references/restapi.html
  • 相关阅读:
    初级Springboot(一)
    java.lang.NoClassDefFoundError: Could not initialize class xxx
    Python requests.post嵌套多层json参数调用接口
    weblogic安装部署war包——windows
    angularJs 页面{{xxx}}使用三目运算符
    liunx下误删除/var目录下的empty文件,导致ssh连接不上
    Java判断一个时间是否在时间区间内
    centos7下配置免密码登录
    左连接去重(objec)
    java util.Date和sql.Date转换(时区转换)
  • 原文地址:https://www.cnblogs.com/lenmom/p/10943445.html
Copyright © 2011-2022 走看看