zoukankan      html  css  js  c++  java
  • JAVA整合FlinkCDC 监控数据库表变化

    版本至少jdk8

    maven

      <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>flink-connector-mysql-cdc</artifactId>
                <version>1.1.1</version>
            </dependency>

    SqlDwdDeserializationSchema.java

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Field;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    
    import java.util.List;
    
    /**
     * flinkCdc 数据结构解析成json
     * @author .
     */
    public class SqlDwdDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
    
        private static final long serialVersionUID = -3168848963265670603L;
    
        @Override
        public void deserialize(SourceRecord record, Collector<JSONObject> out) throws Exception {
            Struct dataRecord = (Struct) record.value();
    
            Struct afterStruct = dataRecord.getStruct("after");
            Struct beforeStruct = dataRecord.getStruct("before");
    
            JSONObject operateJson = new JSONObject();
    
    
            //操作的sql字段json数据
            JSONObject sqlJson = new JSONObject();
    
            //操作类型
            String operate_type = "";
    
            List<Field> fieldsList = null;
    
            if (afterStruct != null && beforeStruct != null) {
                System.out.println("这是修改数据");
                operate_type = "update";
                fieldsList = afterStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = afterStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else if (afterStruct != null) {
                System.out.println("这是新增数据");
                operate_type = "insert";
                fieldsList = afterStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = afterStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else if (beforeStruct != null) {
                System.out.println("这是删除数据");
                operate_type = "delete";
                fieldsList = beforeStruct.schema().fields();
                for (Field field : fieldsList) {
                    String fieldName = field.name();
                    Object fieldValue = beforeStruct.get(fieldName);
                    sqlJson.put(fieldName, fieldValue);
                }
            } else {
                System.out.println("error>>>>>>>>>你执行了啥?");
            }
    
            operateJson.put("sqlJson", sqlJson);
    
            Struct source = dataRecord.getStruct("source");
    
            //操作的数据库名
            String database = source.getString("db");
    
            //操作的表名
            String table = source.getString("table");
    
            //操作的时间戳(单位:毫秒)
            Object operate_ms = source.get("ts_ms");
    
            operateJson.put("database", database);
            operateJson.put("table", table);
            operateJson.put("operate_ms", operate_ms);
            operateJson.put("operate_type", operate_type);
    
            String topic = record.topic();
            System.out.println("topic = " + topic);
    
            /**
             * 主键字段,这里就只取一个主键 如果有多个 自行修改
             */
            Struct pk = (Struct) record.key();
            List<Field> pkFieldList = pk.schema().fields();
            if (pkFieldList != null && pkFieldList.size() > 0) {
                Field field = pkFieldList.get(0);
                //主键字段
                Object pkName = field.name();
                //主键字段的值
                Object pkValue = pk.get(field.name());
                operateJson.put("pk_filed", pkName);
            }
    
            out.collect(operateJson);
        }
    
        @Override
        public TypeInformation<JSONObject> getProducedType() {
            return BasicTypeInfo.of(JSONObject.class);
        }
    }

    java代码

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    /**
     * @author*/
    public class MySqlSourceExample {
        public static void main(String[] args) throws Exception {
            SourceFunction<JSONObject> mySqlSource = MySQLSource.<JSONObject>builder()
                    .hostname("yourHostname")
                    .port(yourPort)
                    .databaseList("yourDatabaseName") // set captured database
                    .tableList("yourDatabaseName.yourTableName") // set captured table
                    .username("yourUsername")
                    .password("yourPassword")
                    .deserializer(new SqlDwdDeserializationSchema()) // converts SourceRecord to JSON String
                    .build();
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // enable checkpoint
            env.enableCheckpointing(3000);
    
            env.addSource(mySqlSource).print();
    
            env.execute("Print MySQL Snapshot + Binlog");
        }
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    servlet的之前与之后的基本使用
    java HashMap插入重复Key值问题
    ConcurrentHashMap底层实现原理(JDK1.7 & 1.8)
    spring cloud实现热加载
    spring cloud各个组件以及概念的解释和基本使用
    深入理解java 虚拟机 jvm高级特性与最佳实践目录
    【leetcode】1、两数之和
    【Java 基础领域】二维数组创建内存图
    【Java EE领域】com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'salary' in 'fi
    【JavaEE领域】com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'mp.employee' doesn't exi
  • 原文地址:https://www.cnblogs.com/pxblog/p/15711924.html
Copyright © 2011-2022 走看看