zoukankan      html  css  js  c++  java
  • 【流数据处理】MySql/PG/Oracle+Kafka+Flink(CDC捕获) 部署及实时计算

    主要介绍实时数仓得部署、计算

    文章主要分3部分
    图片1

    • 数据采集
    • $color{red}{[E]}$ 关系型数据库MySql/PG/Oracle+Debezium+Kafka Connector
    • 数据计算
    • $color{red}{[T]}$ Flink
    • 数据存储
    • $color{red}{[L]}$ 传输,关系型数据库/列式数据库 clickhouse/hbase

    注:这里贡献2篇阿里巴巴得文章供参考
    Flink JDBC Connector:Flink 与数据库集成最佳实践
    基于 Flink SQL CDC 的实时数据同步方案
    Debezium监控MySql
    Debezium监控Oracle
    Debezium-Github
    Oracle部署参考文档

    1. 环境要求

    软件要求:

    1. Kafka集群:本实验用得是CDH5.14版本得Kafka集群
    2. 数据库:Mysql 8.x/PG 10.x/Oracle11G docker搭建。(Mysql开启行日志模式,Oracle开启归档)
    3. 计算引擎:Flink 1.13
    4. Kafka Connector:
      debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
      debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
      debezium-connector-oracle-1.4.0.Final-plugin.tar.gz

    注:以下操作都需要在3台Kafka集群中操作

    Kafka配置目录:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist
    Kafka Bin目录:/opt/cloudera/parcels/KAFKA/lib/kafka

    1. 数据采集

    1.1 Kafka部署(Mysql/PG/Oracle相同)

    • 下载软件[debezium-connector-mysql-1.4.0.Final-plugin.tar.gz],并解压,目录可以随便选。
    • 本人放得目录为:/opt/cloudera/parcels/KAFKA/lib/kafka/kafka_connect
    • 并把 debezium-connector-mysql 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中
    • 把Mysql/PG得jdbc包放入libs中 [mysql-connector-java-8.0.21.ja]
    • Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs
      下载地址
    • 修改 ${KAFKA_HOME}/bin 或者 [opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist] 中配置文件
    • 正常/CDH环境
    • 单机部署修改 [connect-standalone.properties]
    • 集群部署修改 [connect-distributed.properties]
    • 修改 Kafka cluster,打开 plugin.path 配置,并配置目录
      Kafka cluster
    • 如果有多个不同的数据库(Mysql/PG/Oracle)需要监控,目录之间用逗号分隔
      plugin.path
    • 启动Kafka集群,设置环境变量
    export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties #不设置后面kafka会报错
    ./bin/connect-distributed.sh ../../etc/kafka/conf.dist/connect-distributed.properties
    
    • 提交mysql-connector,监视Mysql数据库
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
    { 
    "name" : "debezium-mysql",
    "config":{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "10.20.60.44", #mysql的IP地址
    "database.port": "3306", #mysql的端口号
    "database.user": "yaowentao", #mysql的用户名
    "database.password": "Sx202101", #mysql用户对应的密码
    "database.server.id" :"1739",
    "database.server.name": "Mysql", #mysql服务的逻辑名,例如Mysql
    "database.history.kafka.bootstrap.servers": "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092", #Kafka集群地址
    "database.history.kafka.topic": "dbhistory.mydb", #Kafka topic名称
    "database.whitelist": "mydb", 
    #"table.whitelist":"mydb.orders",
    "include.schema.changes" : "true" ,
    "decimal.handling.mode": "string", #处理浮点值
    "mode" : "incrementing",
    "incrementing.column.name" : "id",
    "database.history.skip.unparseable.ddl" : "true"
    }
    }'
    
    • 提交Oracle-connector,监视Mysql数据库
    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.58.172:8083/connectors/ -d '
    {
    "name": "debezium-oracle-yaowentao",
    "config": {
    "connector.class" : "io.debezium.connector.oracle.OracleConnector",
    "tasks.max" : "1",
    "database.server.name" : "helowin",
    "database.hostname" : "10.20.60.44",
    "database.port" : "1521",
    "database.user" : "dbzuser",
    "database.password" : "dbz",
    "database.dbname" : "helowin",
    "database.schema" : "scott",
    "database.connection.adapter": "logminer", #1.4版本需要设置
    "database.tablename.case.insensitive": "true",
    "table.include.list" : "scott.*", #表白名单
    "snapshot.mode" : "initial",
    "schema.include.list" : "scott",#schema白名单
    "database.history.kafka.bootstrap.servers" : "192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
    }
    }'
    
    • 查看是否启动成功,JPS
    • 如果是CDH集群,会报一个日志文件找不到得情况
      图3
      解决办法:将配置文件得路径指向
    export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-log4j.properties
    

    图4

    • 查看connectors得情况,也可以在浏览器中查询。本案例在命令行中执行
    查看创建的connector列表
    curl -X GET http://192.168.58.171:8083/connectors
    http://192.168.58.172:8083/connectors
    

    图9

    查看创建的connector状态
    curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/status
    http://192.168.58.172:8083/connectors
    

    图9

    查看创建的connector配置
    curl -X GET http://192.168.58.171:8083/connectors/debezium-mysql/config
    

    图9

    删除connector
    curl -X DELETE http://192.168.58.171:8083/connectors/debezium-mysql
    

    图5

    • Kafka Connector启动后 会将监视得库中每个表都创建个一个topic,且该topic只包含该表得增删改(insert/delete/update)操作。DDL操作会统一写入以配置文件中得database.server.name参数的值为名称的topic内。命名方式:
    • DDL topic:serverName
    • DML topic:serverName.databaseName.tableName
      图6

    2. 数据计算

    2.1

    # Flink执行sql语句
    DROP TABLE ORDERS;
    CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    customer_name STRING,
    price double,
    product_id INT,
    order_status INT
    ) WITH (
    'connector' = 'kafka',
    'format' = 'debezium-json',
    'topic' = 'Mysql.mydb.orders',
    'properties.bootstrap.servers' = '192.168.58.171:9092,192.168.58.172:9092,192.168.58.177:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'debezium-json.schema-include' = 'true'
    );
    #时间需要转换
    SELECT TO_TIMESTAMP_LTZ(cast(t.order_date as bigint),3) order_date_times,t.* from orders t;
    

    图7

    3. 数据导入

    3.1 Flink中创建表,直接可以导入

    4. 补充,Oralce数据库配置(11G往后的配置可参考官网)

    alter system set db_recovery_file_dest_size=5G; #按要求修改,不然会报错
    
    #Oracle 开启归档日志
    alter database add supplemental log data (all) columns; #开启行模式
    
    #创建 新得表空间与dbzuser,并赋予相应得权限
    CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/helowin/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS ;
    
    GRANT CREATE SESSION TO dbzuser;
    GRANT SELECT ON V_$DATABASE TO dbzuser;
    GRANT FLASHBACK ANY TABLE TO dbzuser;
    GRANT SELECT ANY TABLE TO dbzuser;
    GRANT SELECT_CATALOG_ROLE TO dbzuser;
    GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
    GRANT SELECT ANY TRANSACTION TO dbzuser;
    GRANT SELECT ANY DICTIONARY TO dbzuser;
    
    GRANT CREATE TABLE TO dbzuser;
    GRANT ALTER ANY TABLE TO dbzuser;
    GRANT LOCK ANY TABLE TO dbzuser;
    GRANT CREATE SEQUENCE TO dbzuser;
    
    GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
    GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
    GRANT SELECT ON V_$LOGFILE TO dbzuser;
    GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;
    
    #暂时可以不用,官网有做要求,暂时没明白有什么用
    CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
    GRANT CONNECT TO debezium;
    GRANT CREATE SESSION TO debezium;
    GRANT CREATE TABLE TO debezium;
    GRANT CREATE SEQUENCE to debezium;
    ALTER USER debezium QUOTA 100M on users;
    
    目标:致力于技术的发展,整理自己工作中的内容,并用于以后的学习。 邮箱:taotao8810@hotmail.com 转载请注明出处!!!
  • 相关阅读:
    Dell FC Switch zone configuration
    RMAN参考使用手册[转载]
    CentOS下SVN简介、下载、安装
    教你制作启动U盘 用U盘装系统(转载)
    RMAN简明使用手册[转载]
    控制文件和重做日志文件(1)[转载)
    RMAN快速入门指南[转载]
    Dell服务转移
    BE Learing 8 异常及解决办法
    10g rman备份恢复案例[转载]
  • 原文地址:https://www.cnblogs.com/yaowentao/p/14944739.html
Copyright © 2011-2022 走看看