zoukankan      html  css  js  c++  java
  • flink table&sql canal使用案例

    本文项目源码见github:https://github.com/felixzh2020/felixzh-learning-flink/tree/master/canal

    版本信息

    产品版本
    Flink 1.11.1
    flink-cdc-connectors 1.1.0
    Java 1.8.0_231
    MySQL 5.7.16

    Mavan依赖

    • pom.xml 依赖部分
      <properties>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <flink.version>1.11.1</flink.version>
      </properties>
      
      <dependencies>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-common</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-java</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-clients_2.11</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-streaming-java_2.11</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-api-java-bridge_2.11</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-api-java</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-planner-blink_2.11</artifactId>
              <version>${flink.version}</version>
          </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-planner-blink_2.11</artifactId>
              <version>${flink.version}</version>
              <type>test-jar</type>
          </dependency>
          <!-- Flink-CDC -->
          <dependency>
              <groupId>com.alibaba.ververica</groupId>
              <artifactId>flink-connector-mysql-cdc</artifactId>
              <version>1.1.0</version>
          </dependency>
      
      </dependencies>

      主从同步配置、数据准备

      • 关闭MySQL服务
      • 在需要被同步的MySQL节点,添加如下配置(可供参考的文档
        [mysqld]
        # 前面还有其他配置
        # 添加的部分
        server-id = 12345
        log-bin = mysql-bin
        # 必须为ROW
        binlog_format = ROW
        # 必须为FULL,MySQL-5.7后才有该参数
        binlog_row_image  = FULL
        expire_logs_days  = 10
      • 启动MySQL服务
      • 使用如下命令,可查看binlog相关变量配置
        SHOW VARIABLES LIKE '%binlog%';
      • 创建待测试的库、表、数据
        CREATE DATABASE db_inventory_cdc;
        
        CREATE TABLE tb_products_cdc(
        	id INT PRIMARY KEY AUTO_INCREMENT,
        	name VARCHAR(64),
        	description VARCHAR(128)
        );
        
        INSERT INTO tb_products_cdc
        VALUES 
        	(DEFAULT, 'zhangsan', 'aaa'),
        	(DEFAULT, 'lisi', 'bbb'),
        	(DEFAULT, 'wangwu', 'ccc');
      • 创建用于同步的用户,并给予权限(可供参考的文档
        -- 设置拥有同步权限的用户
        CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
        -- 赋予同步相关权限
        GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
      • 创建用户并赋予权限成功后,使用该用户登录MySQL,可以使用以下命令查看主从同步相关信息
        SHOW MASTER STATUS
        SHOW SLAVE STATUS
        SHOW BINARY LOGS

      使用Flink-CDC

        • sql-cli点击查看
        • 编码方式,方便提交jar包,示例如下
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironimport org.apache.flink.table.api.EnvironmentSettings;
      import org.apache.flink.table.api.TableResult;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.planner.factories.TestValuesTableFactory;
      public class FlinkCDCSQLTest {
      
          public static void main(String[] args) throws Exception {
              EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                      .useBlinkPlanner()
                      .inStreamingMode()
                      .build();
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
              
              // 数据源表
              String sourceDDL =
                      "CREATE TABLE mysql_binlog (
      " +
                      " id INT NOT NULL,
      " +
                      " name STRING,
      " +
                      " description STRING
      " +
                      ") WITH (
      " +
                      " 'connector' = 'mysql-cdc',
      " +
                      " 'hostname' = 'localhost',
      " +
                      " 'port' = '3306',
      " +
                      " 'username' = 'flinkuser',
      " +
                      " 'password' = 'flinkpassword',
      " +
                      " 'database-name' = 'db_inventory_cdc',
      " +
                      " 'table-name' = 'tb_products_cdc'
      " +
                      ")";
              // 输出目标表
              String sinkDDL =
                      "CREATE TABLE tb_sink (
      " +
                      " name STRING,
      " +
                      " countSum BIGINT,
      " +
                      " PRIMARY KEY (name) NOT ENFORCED
      " +
                      ") WITH (
      " +
                      " 'connector' = 'print'
      " +
                      ")";
              // 简单的聚合处理
              String transformSQL =
                      "INSERT INTO tb_sink " +
                      "SELECT name, COUNT(1) " +
                      "FROM mysql_binlog " +
                      "GROUP BY name";
                      
              tableEnv.executeSql(sourceDDL);
              tableEnv.executeSql(sinkDDL);
              TableResult result = tableEnv.executeSql(transformSQL);
              
              // 等待flink-cdc完成快照
              waitForSnapshotStarted("tb_sink");
              result.print();
              
              result.getJobClient().get().cancel().get();
          }
      
          private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
              while (sinkSize(sinkName) == 0) {
                  Thread.sleep(100);
              }
          }
      
          private static int sinkSize(String sinkName) {
              synchronized (TestValuesTableFactory.class) {
                  try {
                      return TestValuesTableFactory.getRawResults(sinkName).size();
                  } catch (IllegalArgumentException e) {
                      // job is not started yet
                      return 0;
                  }
              }
          }
      
      }

      简单的测试

        • 进行简单测试,开始修改MySQL表的数据
          -- SQL测试数据,对照Flink应用
          
          INSERT INTO tb_products_cdc VALUE(DEFAULT, 'lisi', 'ddd');
          
          DELETE FROM tb_products_cdc WHERE id=4;
          
          UPDATE tb_products_cdc SET name='wangwu' WHERE id=2;
        • 执行一条SQL,查看一下Flink的结果变化
  • 相关阅读:
    混合背包
    二维背包
    0/1背包问题(DP)
    冒泡排序
    快速排序
    最长上升子序列
    二分查找
    n后问题
    crontab 定时任务
    删除以某字符串开头的表
  • 原文地址:https://www.cnblogs.com/felixzh/p/14086046.html
Copyright © 2011-2022 走看看