zoukankan      html  css  js  c++  java
  • flink Stream Api cdc demo

    最近在研究 flink-cdc ,因为想到table api 的 cdc 都是针对单表,如果在同一个数据库上,有很多表需要实时采集(比如: 100 张表),会不会对 mysql 造成压力,如果 mysql 数据量又比较大,是不是会对 mysql 所在服务器造成磁盘和网络的压力。

    对 binlog 有所了解的都知道,binlog 是不区分数据库和表的,所以在读取 binlog 的时候,即使只需要一张表的 binlog,也需要解析全部的 binlog 文件,如果 cdc 的表很多,可以想象,资源的消耗是成倍的增加。

    基于这样的问题,有个新的思路,用一个任务把所有需要的表的 binlog 全部解析成 json 发到 kafka 中,将 mysql 的压力转嫁到 kafka 上,而mysql 都可以承受的压力,对 kafka 来说就称不上是压力了(不过这样跟直接部署个 canal 或 Debezium 基本一样了)。

    ## 官网案例

    flink-cdc 官网 Stream API 案例如下:

    官网链接: https://github.com/ververica/flink-cdc-connectors/wiki#usage-for-datastream-api

    public class MySqlBinlogSourceExample {
      public static void main(String[] args) throws Exception {
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
          .hostname("localhost")
          .port(3306)
          .databaseList("inventory") // monitor all tables under inventory database
          .username("flinkuser")
          .password("flinkpw")
          .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
          .build();
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env
          .addSource(sourceFunction)
          .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
        env.execute();
      }
    }

    我们需要做的就是将 StringDebeziumDeserializationSchema 修改了,并写个 kafka sink

    ## 解析

    解析 Debezium 格式的 binlog,官方提供了两个 DeserializationSchema: StringDebeziumDeserializationSchema 和 RowDataDebeziumDeserializeSchema , StringDebeziumDeserializationSchema 就是输出 Debezium 的 SourceRecord 的 toString 结果,而 RowDataDebeziumDeserializeSchema 需要预先定义表的 scheme,跟我们的需求不同。

    所有,我自己解析了 Debezium 的 SourceRecord,将结果转成了 json 的,并把一些如: host、端口、数据库、表等信息加入了其中(可能会有分库分表)

    /**
     * deserialize debezium format binlog
     */
    public class CommonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    
        private String host;
        private int port;
    
    
        public CommonStringDebeziumDeserializationSchema(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void deserialize(SourceRecord record, Collector<String> out) {
            JsonObject jsonObject = new JsonObject();
    
            jsonObject.addProperty("host", host);
            jsonObject.addProperty("port", port);
            jsonObject.addProperty("file", (String) record.sourceOffset().get("file"));
            jsonObject.addProperty("pos", (Long) record.sourceOffset().get("pos"));
            jsonObject.addProperty("ts_sec", (Long) record.sourceOffset().get("ts_sec"));
            String[] name = record.valueSchema().name().split("\.");
            jsonObject.addProperty("db", name[1]);
            jsonObject.addProperty("table", name[2]);
            Struct value = ((Struct) record.value());
            String operatorType = value.getString("op");
            jsonObject.addProperty("operator_type", operatorType);
            // c : create, u: update, d: delete, r: read
            // insert update
            if (!"d".equals(operatorType)) {
                Struct after = value.getStruct("after");
                JsonObject afterJsonObject = parseRecord(after);
                jsonObject.add("after", afterJsonObject);
            }
            // update & delete
            if ("u".equals(operatorType) || "d".equals(operatorType)) {
                Struct source = value.getStruct("before");
                JsonObject beforeJsonObject = parseRecord(source);
                jsonObject.add("before", beforeJsonObject);
            }
            jsonObject.addProperty("parse_time", System.currentTimeMillis() / 1000);
    
            out.collect(jsonObject.toString());
        }
    
        private JsonObject parseRecord(Struct after) {
            JsonObject jo = new JsonObject();
            for (Field field : after.schema().fields()) {
                switch ((field.schema()).type()) {
                    case INT8:
                        int resultInt8 = after.getInt8(field.name());
                        jo.addProperty(field.name(), resultInt8);
                        break;
                    case INT64:
                        Long resultInt = after.getInt64(field.name());
                        jo.addProperty(field.name(), resultInt);
                        break;
                    case FLOAT32:
                        Float resultFloat32 = after.getFloat32(field.name());
                        jo.addProperty(field.name(), resultFloat32);
                        break;
                    case FLOAT64:
                        Double resultFloat64 = after.getFloat64(field.name());
                        jo.addProperty(field.name(), resultFloat64);
                        break;
                    case BYTES:
                        // json ignore byte column
                        // byte[] resultByte = after.getBytes(field.name());
                        // jo.addProperty(field.name(), String.valueOf(resultByte));
                        break;
                    case STRING:
                        String resultStr = after.getString(field.name());
                        jo.addProperty(field.name(), resultStr);
                        break;
                    default:
                }
            }
    
            return jo;
        }
    
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }

    核心方法是 deserialize 解析数据 和 parseRecord 解析表中字段内容

    解析出来的数据如下:

    插入:

    sql : insert into user_log1(user_id, item_id, category_id, behavior, ts) values('venn1', 'item_1', 'category_1', 'read', now());
    {
    "host":"localhost","port":3306,"file":"binlog.000002","pos":13781,"ts_sec":null,"db":"venn","table":"user_log","operator_type":"c",
    "after":{"id":16,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619358456000},"parse_time":1619360320}

    更新:

    sql : update user_log set user_id = 'zhangsan1' where id = 10;
    
    {"host":"localhost","port":3306,"file":"binlog.000002","pos":14205,"ts_sec":1619360393,"db":"venn","table":"user_log","operator_type":"u",
    "after":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},
    "before":{"id":10,"user_id":"venn1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360394}

    删除:

    delete from user_log where id = 10;
    {
    "host":"localhost","port":3306,"file":"binlog.000002","pos":14598,"ts_sec":1619360441,"db":"venn","table":"user_log","operator_type":"d","before":{"id":10,"user_id":"zhangsan1","item_id":"item_1","category_id":"category_1","behavior":"read","ts":1619342074000},"parse_time":1619360441}

    注: operator_type: c : create, u: update, d: delete, r: read
    before 为原始数据, after 为插入、修改后的数据

    ## sink

    由于需要解析的表可能很多,所有单独写了个 sink,将不同表的数据,发往不同的 topic,代码如下:

    @Override
    public void invoke(String element, Context context) {
    
        JsonObject jsonObject = parser.parse(element).getAsJsonObject();
        String db = jsonObject.get("db").getAsString();
        String table = jsonObject.get("table").getAsString();
        // topic 不存在就自动创建
        String topic = db + "_" + table;
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, element);
        kafkaProducer.send(record);
    }

    如果不需要将数据写到不同的topic,直接用flink 提供的 FlinkkakfaProducer 即可

    遇到个问题: MySQL 8 的报错,不能检索公钥,url 中不能指定 allowPublicKeyRetrieval 参数

    Caused by: org.apache.kafka.connect.errors.ConnectException: Error reading MySQL variables: Public Key Retrieval is not allowed
        at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:342)
        at io.debezium.connector.mysql.MySqlJdbcContext.readMySqlSystemVariables(MySqlJdbcContext.java:321)
        at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:79)
        at io.debezium.connector.mysql.MySqlTaskContext.<init>(MySqlTaskContext.java:52)
        at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:350)
        at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:143)
        at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
        at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
        at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.sql.SQLNonTransientConnectionException: Public Key Retrieval is not allowed
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:836)
        at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:456)
        at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
        at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:197)
        at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:230)
        at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:871)
        at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:866)
        at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:412)
        at io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:328)
        ... 11 more

    在 git 上提了个 issue,看下大佬的回复吧,不行就自己改下源码,添加这个参数 : https://github.com/ververica/flink-cdc-connectors/issues/173

    完整代码参见 github : https://github.com/springMoon/flink-rookie  MySqlBinlogSourceExample

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

     

  • 相关阅读:
    Visual Studio调试提示未能找到路径 bin oslyncsc.exe
    泛型
    Java中System.setProperty()
    jQuery的基础·知识
    移动端事件与touch.js库(js)
    js中,求1~100之间的质数
    匿名函数,闭包与ajax(js)
    正则(js)
    面向对象的概念,创建,实例,call与apply,继承(js)
    事件基础,事件绑定,DOM事件流与事件的默认行为,键盘事件,滚轮事件,事件委托(js)
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14702474.html
Copyright © 2011-2022 走看看