<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
</dependency>
<!-- Flink-CDC -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
public class FlinkCDCSQLTest {
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
// 数据源表
String sourceDDL =
"CREATE TABLE mysql_binlog (
" +
" id INT NOT NULL,
" +
" name STRING,
" +
" description STRING
" +
") WITH (
" +
" 'connector' = 'mysql-cdc',
" +
" 'hostname' = 'localhost',
" +
" 'port' = '3306',
" +
" 'username' = 'flinkuser',
" +
" 'password' = 'flinkpassword',
" +
" 'database-name' = 'db_inventory_cdc',
" +
" 'table-name' = 'tb_products_cdc'
" +
")";
// 输出目标表
String sinkDDL =
"CREATE TABLE tb_sink (
" +
" name STRING,
" +
" countSum BIGINT,
" +
" PRIMARY KEY (name) NOT ENFORCED
" +
") WITH (
" +
" 'connector' = 'print'
" +
")";
// 简单的聚合处理
String transformSQL =
"INSERT INTO tb_sink " +
"SELECT name, COUNT(1) " +
"FROM mysql_binlog " +
"GROUP BY name";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
// 等待flink-cdc完成快照
waitForSnapshotStarted("tb_sink");
result.print();
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}