zoukankan      html  css  js  c++  java
  • JAVA整合FlinkCDC 监控数据库表变化

    版本至少jdk8

    maven

      <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>1.1.1</version>
            </dependency>

    SqlDwdDeserializationSchema.java

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Field;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    
    import java.util.List;
    
    /**
     * flinkCdc 数据结构解析成json
     * @author .
     */
    public class SqlDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
    
        private static final long serialVersionUID = -3168848963265670603L;
    
        @Override
        public void deserialize(SourceRecord record, Collector<JSONObject> out) throws Exception {
            Struct dataRecord = (Struct) record.value();
    
            Struct afterStruct = dataRecord.getStruct("after");
            Struct beforeStruct = dataRecord.getStruct("before");
    
            JSONObject operateJson = new JSONObject();
    
    
            //操作的sql字段json数据
            JSONObject sqlJson = new JSONObject();
    
            //操作类型
            String operate_type = "";
    
            List<Field> fieldsList = null;
    
            if (afterStruct != null && beforeStruct != null) {
                System.out.println("这是修改数据");
                operate_type = "update";
                fieldsList = afterStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = afterStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else if (afterStruct != null) {
                System.out.println("这是新增数据");
                operate_type = "insert";
                fieldsList = afterStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = afterStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else if (beforeStruct != null) {
                System.out.println("这是删除数据");
                operate_type = "delete";
                fieldsList = beforeStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = beforeStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else {
                System.out.println("error>>>>>>>>>你执行了啥?");
            }
    
            operateJson.put("sqlJson", sqlJson);
    
            Struct source = dataRecord.getStruct("source");
    
            //操作的数据库名
            String database = source.getString("db");
    
            //操作的表名
            String table = source.getString("table");
    
            //操作的时间戳(单位:毫秒)
            Object operate_ms = source.get("ts_ms");
    
            operateJson.put("database", database);
            operateJson.put("table", table);
            operateJson.put("operate_ms", operate_ms);
            operateJson.put("operate_type", operate_type);
    
            String topic = record.topic();
            System.out.println("topic = " + topic);
    
            /**
             * 主键字段,这里就只取一个主键 如果有多个 自行修改
             */
            Struct pk = (Struct) record.key();
            List<Field> pkFieldList = pk.schema().fields();
            if (pkFieldList != null && pkFieldList.size() > 0) {
                Field field = pkFieldList.get(0);
                //主键字段
                Object pkName = field.name();
                //主键字段的值
                Object pkValue = pk.get(field.name());
                operateJson.put("pk_filed", pkName);
            }
    
            out.collect(operateJson);
        }
    
        @Override
        public TypeInformation<JSONObject> getProducedType() {
            return BasicTypeInfo.of(JSONObject.class);
        }
    }

    java代码

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    /**
     * @author*/
    public class MySqlSourceExample {
        public static void main(String[] args) throws Exception {
            SourceFunction<JSONObject> mySqlSource = MySQLSource.<JSONObject>builder()
                    .hostname("yourHostname")
                    .port(yourPort)
                    .databaseList("yourDatabaseName") // set captured database
                    .tableList("yourDatabaseName.yourTableName") // set captured table
                    .username("yourUsername")
                    .password("yourPassword")
                    .deserializer(new SqlDwdDeserializationSchema()) // converts SourceRecord to JSON String
                    .build();
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // enable checkpoint
            env.enableCheckpointing(3000);
    
            env.addSource(mySqlSource).print();
    
            env.execute("Print MySQL Snapshot + Binlog");
        }
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    如何卸载VS 2017之前版本比如VS 2013、VS2015、 VS vNext?
    在SQL Server中如何进行UPDATE TOP .....ORDER BY?
    EntityFramework 6.x和EntityFramework Core插入数据探讨
    2017-2018:时间戳
    http协议进阶(六)代理
    认清性能问题
    <转>安全测试思维导图
    RESTful API浅谈
    http协议进阶(五)连接管理
    聊聊软件测试的职业规划
  • 原文地址:https://www.cnblogs.com/pxblog/p/15711924.html
Copyright © 2011-2022 走看看