方式一 通过JDBCOutputFormat
在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。
JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。
JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() 2 .setDrivername("com.mysql.jdbc.Driver") 3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx") 4 .setQuery(query) 5 .finish();
如下的sql语句可以作为prepared statement:
String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";
对应的表的结构:
1 CREATE TABLE cases 2 ( 3 caseid VARCHAR(255), 4 tracehash VARCHAR(255) 5 );
但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。
1 DataStream<Case> cases = ... 2 3 DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> { 4 Row row = new Row(2); // our prepared statement has 2 parameters 5 row.setField(0, aCase.getId()); //first parameter is case ID 6 row.setField(1, aCase.getTraceHash()); //second paramater is tracehash 7 return row; 8 });
这样,我们就能添加sink了:
1 rows.writeUsingOutputFormat(jdbcOutput);
这样,你就可以将数据写入mysql了。
但是在你在流上附加了窗口之后,可能会得到下面的报错:
1 "Unknown column type for column %s. Best effort approach to set its value: %s."
因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() 2 .setDrivername("com.mysql.jdbc.Driver") 3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx") 4 .setQuery(query) 5 .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types 6 .finish();
JDBCOutputFormat
has a batchInterval
, which you can specify on the JDBCOutputFormatBuilder
. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.
JDBCOutputFormat
还有一个很有用的参数,batchInterval,见名知意,就是多少数据提交一次,尽量高效率的向数据库提交数据。当然还有比如timeout等其他参数,可以探索。
方式二 通过自定义sink提交
我们通过继承RichSinkFunction<IN>来实现自定义sink:
1 public class RichCaseSink extends RichSinkFunction<Case> { 2 3 private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) " 4 + "VALUES (?, ?) " 5 + "ON CONFLICT (caseid) DO UPDATE SET " 6 + " tracehash=?"; 7 8 private PreparedStatement statement; 9 10 11 @Override 12 public void invoke(Case aCase) throws Exception { 13 14 statement.setString(1, aCase.getId()); 15 statement.setString(2, aCase.getTraceHash()); 16 statement.setString(3, aCase.getTraceHash()); 17 statement.addBatch(); 18 statement.executeBatch(); 19 } 20 21 @Override 22 public void open(Configuration parameters) throws Exception { 23 Class.forName("com.mysql.jdbc.Driver"); 24 Connection connection = 25 DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio"); 26 27 statement = connection.prepareStatement(UPSERT_CASE); 28 } 29 30 }
这样,就可以在流上添加sink 了:
1 DataStream<Case> cases = ... 2 cases.addSink(new RichCaseSink());
当然,上面的实现很简略,没有给出批量提交或者超时提交,这个都可以很容易的添加,比如close()中关闭连接。
但是上面的实现中,最大的问题还是没有跟flink的状态管理相结合,这个才是重头戏。
方式二 加强版的自定义sink
在checkpoint的时候保存数据,继承接口CheckpointedFunction
:
1 @Override 2 public void snapshotState(FunctionSnapshotContext context) throws Exception { 3 long checkpointId = context.getCheckpointId(); 4 List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId); 5 if(cases == null){ 6 cases = new ArrayList<>(); 7 pendingCasesPerCheckpoint.put(checkpointId, cases); 8 } 9 cases.addAll(pendingCases); 10 pendingCases.clear(); 11 }
在消息到达的时候不插入数据,只是留存数据:
1 @Override 2 public void invoke(Case aCase) throws Exception { 3 pendingCases.add(aCase); 4 }
这样,通过继承CheckpointListener,我们就能在某个checkpoint完成的时候插入数据:
1 @Override 2 public void notifyCheckpointComplete(long checkpointId) throws Exception { 3 4 Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt = 5 pendingCasesPerCheckpoint.entrySet().iterator(); 6 7 while (pendingCheckpointsIt.hasNext()) { 8 9 Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next(); 10 Long pastCheckpointId = entry.getKey(); 11 List<Case> pendingCases = entry.getValue(); 12 13 if (pastCheckpointId <= checkpointId) { 14 15 for (Case pendingCase : pendingCases) { 16 statement.setString(1, pendingCase.getId()); 17 statement.setString(2, pendingCase.getTraceHash()); 18 statement.setString(3, pendingCase.getTraceHash()); 19 statement.addBatch(); 20 } 21 pendingCheckpointsIt.remove(); 22 } 23 } 24 statement.executeBatch(); 25 26 }
前提,是需要设置checkpoint,比如:
ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);
这样,每隔10s,当一个checkpoint做成功,就会插入一次数据。
当然,上面的代码验证可用,但不建议在生产环境使用,生产环境需要考虑更多的问题。