zoukankan      html  css  js  c++  java
  • 一文讲清楚FusionInsight MRS CDL如何使用

    摘要:CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

    本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。

    说明

    CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

    CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。

    本此实践介绍以mysql作为数据源进行数据抓取

    前提条件

    • MRS集群已安装CDL服务。
    • MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。

    查看MySQL是否开启bin log:

    使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。

    例如在navicat工具选择"File > New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。

    show variables like 'log_%'

    工具准备

    现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。

    完成之后安装rest client插件:

    完成之后创建一个cdl.http的文件进行编辑:

    创建CDL任务

    CDL任务创建的流程图如下所示:

    说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。

    MySQL link部分rest请求代码

    @hostname = 172.16.9.113
    @port = 21495
    @host = {{hostname}}:{{port}}
    @bootstrap = "172.16.9.113:21007"
    @bootstrap_normal = "172.16.9.113:21005"
    @mysql_host = "172.16.2.118"
    @mysql_port = "3306"
    @mysql_database = "hudi"
    @mysql_user = "root"
    @mysql_password = "Huawei@123"
    
    ### get links
    get https://{{host}}/api/v1/cdl/link
    
    ### mysql link validate
    
    post https://{{host}}/api/v1/cdl/link?validate=true
    content-type: application/json
    
    {
    "name": "MySQL_link", //link名,全局唯一,不能重复
    "description":"MySQL connection", //link描述
    "link-type":"mysql", //link的类型
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
            { "name": "port", "value": {{mysql_port}} },//数据库监听的端口
            { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
            { "name": "user", "value": {{mysql_user}} }, //用户
            { "name": "password","value": {{mysql_password}} } ,//密码
            { "name":"schema", "value": {{mysql_database}}}//同数据库名
            ]
        }
    }
    
    ### mysql link create
    
    post https://{{host}}/api/v1/cdl/link
    content-type: application/json
    
    {
    "name": "MySQL_link", //link名,全局唯一,不能重复
    "description":"MySQL connection", //link描述
    "link-type":"mysql", //link的类型
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
            { "name": "port", "value": {{mysql_port}} },//数据库监听的端口
            { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
            { "name": "user", "value": {{mysql_user}} }, //用户
            { "name": "password","value": {{mysql_password}} } ,//密码
            { "name":"schema", "value": {{mysql_database}}}//同数据库名
            ]
        }
    }
    
    ### mysql link update
    
    put https://{{host}}/api/v1/cdl/link/MySQL_link
    content-type: application/json
    
    {
    "name": "MySQL_link", //link名,全局唯一,不能重复
    "description":"MySQL connection", //link描述
    "link-type":"mysql", //link的类型
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip
            { "name": "port", "value": {{mysql_port}} },//数据库监听的端口
            { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名
            { "name": "user", "value": {{mysql_user}} }, //用户
            { "name": "password","value": {{mysql_password}} } ,//密码
            { "name":"schema", "value": {{mysql_database}}}//同数据库名
            ]
        }
    }

    Kafka link部分rest请求代码

    ### get links
    get https://{{host}}/api/v1/cdl/link
    
    ### kafka link validate
    
    post https://{{host}}/api/v1/cdl/link?validate=true
    content-type: application/json
    
    {
    "name": "kafka_link",
    "description":"test kafka link",
    "link-type":"kafka",
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
            { "name": "sasl.kerberos.service.name", "value": "kafka" },
            { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
            ]
        }
    }
    
    ### kafka link create
    
    post https://{{host}}/api/v1/cdl/link
    content-type: application/json
    
    {
    "name": "kafka_link",
    "description":"test kafka link",
    "link-type":"kafka",
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
            { "name": "sasl.kerberos.service.name", "value": "kafka" },
            { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
            ]
        }
    }
    
    ### kafka link update
    
    put https://{{host}}/api/v1/cdl/link/kafka_link
    content-type: application/json
    
    {
    "name": "kafka_link",
    "description":"test kafka link",
    "link-type":"kafka",
    "enabled":"true",
    "link-config-values":  {
    "inputs": [
            { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },
            { "name": "sasl.kerberos.service.name", "value": "kafka" },
            { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
            ]
        }
    }

    CDL任务命令部分rest请求代码

    @hostname = 172.16.9.113
    @port = 21495
    @host = {{hostname}}:{{port}}
    @bootstrap = "172.16.9.113:21007"
    @bootstrap_normal = "172.16.9.113:21005"
    @mysql_host = "172.16.2.118"
    @mysql_port = "3306"
    @mysql_database = "hudi"
    @mysql_user = "root"
    @mysql_password = "Huawei@123"
    
    ### create job
    post https://{{host}}/api/v1/cdl/job
    content-type: application/json
    
    {
        "job_type": "CDL_JOB", //job类型,目前只支持CDL_JOB这一种
        "name": "mysql_to_kafka", //job名称
        "description":"mysql_to_kafka", //job描述
        "from-link-name": "MySQL_link",  //数据源Link
        "to-link-name": "kafka_link", //目标源Link
        "from-config-values": {
            "inputs": [
                {"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},
                {"name" : "schema", "value" : "hudi"},
                {"name" : "db.name.alias", "value" : "hudi"},
                {"name" : "whitelist", "value" : "hudisource"},
                {"name" : "tables", "value" : "hudisource"},
                {"name" : "tasks.max", "value" : "10"},
                {"name" : "mode", "value" : "insert,update,delete"},
                {"name" : "parse.dml.data", "value" : "true"},
                {"name" : "schema.auto.creation", "value" : "false"},
                {"name" : "errors.tolerance", "value" : "all"},
                {"name" : "multiple.topic.partitions.enable", "value" : "false"},
                {"name" : "topic.table.mapping", "value" : "[
                        {"topicName":"huditableout", "tableName":"hudisource"}
                    ]"
                },
                  {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
                {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT
            ]
        },
        "to-config-values": {"inputs": []},
        "job-config-values": {
            "inputs": [
                {"name" : "global.topic", "value" : "demo"}
            ]
        }
    }
    
    ### get all job
    get https://{{host}}/api/v1/cdl/job
    ### submit job
    put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start
    ### get job status
    get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka
    ### stop job
    put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop
    ### delete job
    DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka

    场景验证

    生产库MySQL原始数据如下:

    提交CDL任务之后

    增加操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

    对应kafka消息体:

    更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;

    对应kafka消息体:

    删除操作:delete from hudi.hudisource where uid=11;

    对应kafka消息体:

     

    点击关注,第一时间了解华为云新鲜技术~

  • 相关阅读:
    IE下JS文件失效问题总结
    什么是RFC?
    CHROME对CSS的解析
    php_network_getaddresses: getaddrinfo failed
    Fedora10下配置Apache和虚拟主机
    Apache的Charset设置
    网页设计中的面包屑路径
    利用JS实现的根据经纬度计算地球上两点之间的距离
    【OpenCV学习】子矩阵操作
    【OpenCV学习】ROI区域
  • 原文地址:https://www.cnblogs.com/huaweiyun/p/15355464.html
Copyright © 2011-2022 走看看