zoukankan      html  css  js  c++  java
  • flink写入mysql的两种方式

    方式一 通过JDBCOutputFormat

    在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

    JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

    JDBCOutputFormat用起来很简单,只需要一个prepared statement,driver和database connection,就可以开始使用了。

    1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
    2  .setDrivername("com.mysql.jdbc.Driver")
    3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
    4  .setQuery(query)
    5  .finish();

    如下的sql语句可以作为prepared statement:

    String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

    对应的表的结构:

    1 CREATE TABLE cases
    2 (
    3  caseid VARCHAR(255),
    4  tracehash VARCHAR(255)
    5 );

    但有一点要明确,JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类。这意味着我们需要将流中的case转换为row,通过map就能做的。

    1 DataStream<Case> cases = ...
    2 
    3   DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
    4    Row row = new Row(2); // our prepared statement has 2 parameters
    5    row.setField(0, aCase.getId()); //first parameter is case ID
    6    row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
    7    return row;
    8   });

    这样,我们就能添加sink了:

    1 rows.writeUsingOutputFormat(jdbcOutput);

    这样,你就可以将数据写入mysql了。

    但是在你在流上附加了窗口之后,可能会得到下面的报错:

    1 "Unknown column type for column %s. Best effort approach to set its value: %s."

    因为窗口处理的类型,没有明确的类型定义,如下修改之前的定义,显式的指定类型:

    1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
    2  .setDrivername("com.mysql.jdbc.Driver")
    3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
    4  .setQuery(query)
    5  .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
    6  .finish();

    JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

    JDBCOutputFormat 还有一个很有用的参数,batchInterval,见名知意,就是多少数据提交一次,尽量高效率的向数据库提交数据。当然还有比如timeout等其他参数,可以探索。

    方式二 通过自定义sink提交

    我们通过继承RichSinkFunction<IN>来实现自定义sink:

     1 public class RichCaseSink extends RichSinkFunction<Case> {
     2 
     3   private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
     4       + "VALUES (?, ?) "
     5       + "ON CONFLICT (caseid) DO UPDATE SET "
     6       + "  tracehash=?";
     7 
     8   private PreparedStatement statement;
     9 
    10 
    11   @Override
    12   public void invoke(Case aCase) throws Exception {
    13 
    14     statement.setString(1, aCase.getId());
    15     statement.setString(2, aCase.getTraceHash());
    16     statement.setString(3, aCase.getTraceHash());
    17     statement.addBatch();
    18     statement.executeBatch();
    19   }
    20 
    21   @Override
    22   public void open(Configuration parameters) throws Exception {
    23     Class.forName("com.mysql.jdbc.Driver");
    24     Connection connection =
    25         DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
    26 
    27     statement = connection.prepareStatement(UPSERT_CASE);
    28   }
    29 
    30 }

    这样,就可以在流上添加sink 了:

    1 DataStream<Case> cases = ...
    2 cases.addSink(new RichCaseSink());

    当然,上面的实现很简略,没有给出批量提交或者超时提交,这个都可以很容易的添加,比如close()中关闭连接。

    但是上面的实现中,最大的问题还是没有跟flink的状态管理相结合,这个才是重头戏。

    方式二 加强版的自定义sink

    在checkpoint的时候保存数据,继承接口CheckpointedFunction :

     1 @Override
     2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
     3   long checkpointId = context.getCheckpointId();
     4   List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
     5   if(cases == null){
     6     cases = new ArrayList<>();
     7     pendingCasesPerCheckpoint.put(checkpointId, cases);
     8   }
     9   cases.addAll(pendingCases);
    10   pendingCases.clear();
    11 }

    在消息到达的时候不插入数据,只是留存数据:

    1 @Override
    2 public void invoke(Case aCase) throws Exception {
    3   pendingCases.add(aCase);
    4 }

    这样,通过继承CheckpointListener,我们就能在某个checkpoint完成的时候插入数据:

     1 @Override
     2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
     3 
     4  Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
     5    pendingCasesPerCheckpoint.entrySet().iterator();
     6 
     7  while (pendingCheckpointsIt.hasNext()) {
     8 
     9   Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
    10   Long pastCheckpointId = entry.getKey();
    11   List<Case> pendingCases = entry.getValue();
    12 
    13   if (pastCheckpointId <= checkpointId) {
    14 
    15    for (Case pendingCase : pendingCases) {
    16     statement.setString(1, pendingCase.getId());
    17     statement.setString(2, pendingCase.getTraceHash());
    18     statement.setString(3, pendingCase.getTraceHash());
    19     statement.addBatch();
    20    }
    21    pendingCheckpointsIt.remove();
    22   }
    23  }
    24  statement.executeBatch();
    25 
    26 }

    前提,是需要设置checkpoint,比如:

    ExecutionEnvironment env = ...
    env.enableCheckpointing(10000L);

    这样,每隔10s,当一个checkpoint做成功,就会插入一次数据。

    当然,上面的代码验证可用,但不建议在生产环境使用,生产环境需要考虑更多的问题。

  • 相关阅读:
    CentOS 7 如何设置默认启动方式为命令行模式
    Virtual Box配置CentOS7网络
    序列化后成对象转map,再添加到dataList
    centos7 ping: www.baidu.com: Name or service not known
    协议1
    idea查看接口方法实现
    centos关闭防火墙
    myeclipse配置svn
    eas固定ip避免多次申请许可
    jvm配置
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/10486104.html
Copyright © 2011-2022 走看看