zoukankan      html  css  js  c++  java
  • 聊聊jdbc statement的fetchSize

    在使用MySQL的JDBC时,如果查询结果集过大,使用一次查询,可能会出现Java.lang.OutOfMemoryError: Java heap space问题,因为DB服务器端一次将查询到的结果集全部发送到Java端保存在内存中而造成OOM。

    MySQL JDBC需要一条SQL从数据库读取大量数据,而不发生JVM OOM,可以采用以下方法之一:
        1、当statement设置以下属性时,采用的是流数据接收方式,每次只从服务器接收部份数据,直到所有数据处理完毕,不会发生JVM OOM。

    1 setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
    2 setFetchSize(Integer.MIN_VALUE); 

        2、调用statement的enableStreamingResults方法,实际上enableStreamingResults方法内部封装的就是第1种方式。

        3、设置连接属性useCursorFetch=true (5.0版驱动开始支持),statement以TYPE_FORWARD_ONLY打开,再设置fetch size参数,表示采用服务器端游标,每次从服务器取fetch_size条数据。

    故采用如下方式就可以解决OOM问题:

    1 ps = (PreparedStatement) con.prepareStatement("select * from bigTable",
    2             ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    3 ps.setFetchSize(Integer.MIN_VALUE);
    4 ps.setFetchDirection(ResultSet.FETCH_REVERSE);

    -------------------------------------------------------------------------------------------------

     在Statement和ResultSet接口中都有setFetchSize方法

    1 void setFetchSize(int rows) throws SQLException

    查看API文档

    Statement接口中是这样解释的:

    为JDBC 驱动程序提供一个提示,它提示此Statement 生成的ResultSet 对象需要更多行时应该从数据库获取的行数。指定的行数仅影响使用此语句创建的结果集合。如果指定的值为 0,则忽略该提示。默认值为 0。

    ResultSet中是这样解释的:

    为 JDBC 驱动程序设置此ResultSet 对象需要更多行时应该从数据库获取的行数。如果指定的获取大小为零,则 JDBC 驱动程序忽略该值,随意对获取大小作出它自己的最佳猜测。默认值由创建结果集的Statement 对象设置。获取大小可以在任何时间更改。

    网上有下面这样的一段摘录1:

    缺省时,驱动程序一次从查询里获取所有的结果。这样可能对于大的数据集来说是不方便的, 因此 JDBC 驱动提供了一个用于设置从一个数据库游标抽取若干行的 ResultSet 的方法。在连接的客户端这边缓冲了一小部分数据行,并且在用尽之后, 则通过重定位游标检索下一个数据行块。

    摘录2:

    setFetchSize 最主要是为了减少网络交互次数设计的。访问ResultSet时,如果它每次只从服务器上取一行数据,则会产生大量的开销。setFetchSize的意 思是当调用rs.next时,ResultSet会一次性从服务器上取得多少行数据回来,这样在下次rs.next时,它可以直接从内存中获取出数据而不 需要网络交互,提高了效率。 这个设置可能会被某些JDBC驱动忽略的,而且设置过大也会造成内存的上升。

    ------------------------------------------------------------------------------------------------------------

    源码分析:

    fetchSize

    这里以postgres jdbc driver为例,主要是因为postgres的jdbc driver有公开源码,而且命名比较规范。之前看oracle jdbc,由于没有源码,反编译出来一大堆var1,var2等的变量命名,非常晦涩。

    默认情况下pgjdbc driver会一次性拉取所有结果集,也就是在executeQuery的时候。对于大数据量的查询来说,非常容易造成OOM。这种场景就需要设置fetchSize,执行query的时候先返回第一批数据,之后next完一批数据之后再去拉取下一批。

    但是这个有几个要求:

    • 数据库必须使用V3协议,即pg7.4+
    • connection的autoCommit必须为false,因为开启autoCommit的话,查询完成cursor会被关闭,那么下次就不能再fetch了。另外ResultSet必须是ResultSet.TYPE_FORWARD_ONLY类型,这个是默认的。也就是说无法向后滚动。
    • 查询语句必须是单条,不能是用分号组成的多条查询

    实例代码

     1  @Test
     2     public void testReadTimeout() throws SQLException {
     3         Connection connection = dataSource.getConnection();
     4         //https://jdbc.postgresql.org/documentation/head/query.html
     5         connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false
     6 
     7         String sql = "select * from demo_table";
     8         PreparedStatement pstmt;
     9         try {
    10             pstmt = (PreparedStatement)connection.prepareStatement(sql);
    11             pstmt.setFetchSize(50); 
    12             System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
    13             System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
    14             System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
    15             System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());
    16 
    17             ResultSet rs = pstmt.executeQuery(); 
    18             //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据
    19             int col = rs.getMetaData().getColumnCount();
    20             System.out.println("============================");
    21             while (rs.next()) { 
    22                 for (int i = 1; i <= col; i++) {
    23                     System.out.print(rs.getObject(i));
    24                 }
    25                 System.out.println("");
    26             }
    27             System.out.println("============================");
    28         } catch (SQLException e) {
    29             e.printStackTrace();
    30         } finally {
    31             //close resources
    32         }
    33     }

    源码解析

    postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

     1 /*
     2    * A Prepared SQL query is executed and its ResultSet is returned
     3    *
     4    * @return a ResultSet that contains the data produced by the * query - never null
     5    *
     6    * @exception SQLException if a database access error occurs
     7    */
     8   public java.sql.ResultSet executeQuery() throws SQLException {
     9     if (!executeWithFlags(0)) {
    10       throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
    11     }
    12 
    13     if (result.getNext() != null) {
    14       throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
    15           PSQLState.TOO_MANY_RESULTS);
    16     }
    17 
    18     return result.getResultSet();
    19   }
    20 executeQuery首先调用executeWithFlags方法,源码里头直接写在if里头的,这个不是推荐的方式,因为放在if比较容易忽略。
    21 executeWithFlags
    22 public boolean executeWithFlags(int flags) throws SQLException {
    23     try {
    24       checkClosed();
    25 
    26       if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
    27         flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
    28       }
    29 
    30       execute(preparedQuery, preparedParameters, flags);
    31 
    32       return (result != null && result.getResultSet() != null);
    33     } finally {
    34       defaultTimeZone = null;
    35     }
    36   }
    37 
    38 protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
    39       throws SQLException {
    40     try {
    41       executeInternal(cachedQuery, queryParameters, flags);
    42     } catch (SQLException e) {
    43       // Don't retry composite queries as it might get partially executed
    44       if (cachedQuery.query.getSubqueries() != null
    45           || !connection.getQueryExecutor().willHealOnRetry(e)) {
    46         throw e;
    47       }
    48       cachedQuery.query.close();
    49       // Execute the query one more time
    50       executeInternal(cachedQuery, queryParameters, flags);
    51     }
    52   }
    这里又调用execute方法,在调用executeInternal

    executeInternal

    postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

      1 private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
      2       throws SQLException {
      3     closeForNextExecution();
      4 
      5     // Enable cursor-based resultset if possible.
      6     if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
      7         && !wantsHoldableResultSet()) {
      8       flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
      9     }
     10 
     11     if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
     12       flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;
     13 
     14       // If the no results flag is set (from executeUpdate)
     15       // clear it so we get the generated keys results.
     16       //
     17       if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
     18         flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
     19       }
     20     }
     21 
     22     if (isOneShotQuery(cachedQuery)) {
     23       flags |= QueryExecutor.QUERY_ONESHOT;
     24     }
     25     // Only use named statements after we hit the threshold. Note that only
     26     // named statements can be transferred in binary format.
     27 
     28     if (connection.getAutoCommit()) {
     29       flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
     30     }
     31 
     32     // updateable result sets do not yet support binary updates
     33     if (concurrency != ResultSet.CONCUR_READ_ONLY) {
     34       flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
     35     }
     36 
     37     Query queryToExecute = cachedQuery.query;
     38 
     39     if (queryToExecute.isEmpty()) {
     40       flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
     41     }
     42 
     43     if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
     44         && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
     45       // Simple 'Q' execution does not need to know parameter types
     46       // When binaryTransfer is forced, then we need to know resulting parameter and column types,
     47       // thus sending a describe request.
     48       int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
     49       StatementResultHandler handler2 = new StatementResultHandler();
     50       connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
     51           flags2);
     52       ResultWrapper result2 = handler2.getResults();
     53       if (result2 != null) {
     54         result2.getResultSet().close();
     55       }
     56     }
     57 
     58     StatementResultHandler handler = new StatementResultHandler();
     59     result = null;
     60     try {
     61       startTimer();
     62       connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
     63           fetchSize, flags);
     64     } finally {
     65       killTimerTask();
     66     }
     67     result = firstUnclosedResult = handler.getResults();
     68 
     69     if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
     70       generatedKeys = result;
     71       result = result.getNext();
     72 
     73       if (wantsGeneratedKeysOnce) {
     74         wantsGeneratedKeysOnce = false;
     75       }
     76     }
     77 
     78   }
     79 主要看这段
     80 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
     81           fetchSize, flags);
     82 通过把fetchSize传递进去,拉取指定大小的result
     83 最后调用sendExecute以及processResults方法来拉取数据
     84 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
     85 
     86 private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException {
     87     //
     88     // Send Execute.
     89     //
     90 
     91     if (logger.logDebug()) {
     92       logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")");
     93     }
     94 
     95     byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName());
     96     int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length);
     97 
     98     // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
     99     pgStream.sendChar('E'); // Execute
    100     pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
    101     if (encodedPortalName != null) {
    102       pgStream.send(encodedPortalName); // portal name
    103     }
    104     pgStream.sendChar(0); // portal name terminator
    105     pgStream.sendInteger4(limit); // row limit
    106 
    107     pendingExecuteQueue.add(new ExecuteRequest(query, portal, false));
    108   }
    109 
    110 protected void processResults(ResultHandler handler, int flags) throws IOException {
    111     boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    112     boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
    113 
    114     List<byte[][]> tuples = null;
    115 
    116     int c;
    117     boolean endQuery = false;
    118 
    119     // At the end of a command execution we have the CommandComplete
    120     // message to tell us we're done, but with a describeOnly command
    121     // we have no real flag to let us know we're done. We've got to
    122     // look for the next RowDescription or NoData message and return
    123     // from there.
    124     boolean doneAfterRowDescNoData = false;
    125 
    126     while (!endQuery) {
    127       c = pgStream.receiveChar();
    128       switch (c) {
    129         case 'A': // Asynchronous Notify
    130           receiveAsyncNotify();
    131           break;
    132 
    133         case '1': // Parse Complete (response to Parse)
    134           pgStream.receiveInteger4(); // len, discarded
    135 
    136           SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
    137           String parsedStatementName = parsedQuery.getStatementName();
    138           //...
    139       }
    140   }
    141 }        
    142 next
    143 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
    144 
    145 public boolean next() throws SQLException {
    146     checkClosed();
    147     if (onInsertRow) {
    148       throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
    149           PSQLState.INVALID_CURSOR_STATE);
    150     }
    151     if (current_row + 1 >= rows.size()) {
    152       if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
    153         current_row = rows.size();
    154         this_row = null;
    155         rowBuffer = null;
    156         return false; // End of the resultset.
    157       }
    158 
    159       // Ask for some more data.
    160       row_offset += rows.size(); // We are discarding some data.
    161 
    162       int fetchRows = fetchSize;
    163       if (maxRows != 0) {
    164         if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
    165           // Fetch would exceed maxRows, limit it.
    166           fetchRows = maxRows - row_offset;
    167         }
    168       }
    169       // Execute the fetch and update this resultset.
    170       connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
    171       current_row = 0;
    172 
    173       // Test the new rows array.
    174       if (rows.isEmpty()) {
    175         this_row = null;
    176         rowBuffer = null;
    177         return false;
    178       }
    179     } else {
    180       current_row++;
    181     }
    182     initRowBuffer();
    183     return true;
    184   }
    next方法可以看到,首先判断current_row + 1是否小于rows.size(),小于的话,那就current_row++;否则表示这一批fetchSize的数据被消费完了,需要判断是否结束或者拉取下一批数据,之后更新current_row
    1 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
    这个方法拉取fetchRows条数的下一批数据
    • initRowBuffer
     1 private void initRowBuffer() {
     2     this_row = rows.get(current_row);
     3     // We only need a copy of the current row if we're going to
     4     // modify it via an updatable resultset.
     5     if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) {
     6       rowBuffer = new byte[this_row.length][];
     7       System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length);
     8     } else {
     9       rowBuffer = null;
    10     }
    11   }
    这就是next移动之后,把要消费的这行数据放到rowBuffer里头。

    小结

    对于查询数据量大的场景下,非常有必要设置fetchSize,否则全量拉取很容易OOM,但是使用fetchSize的时候,要求数据能够在遍历resultSet的时候及时处理,而不是收集完所有数据返回回去再去处理。

  • 相关阅读:
    Java密钥库的不同类型 -- 概述
    【Spring Boot】Filter
    【VUE】开发环境
    【Java Web开发学习】Spring 注解
    【TongWeb】问题记录
    python的u,r,b分别什么意思?
    nil
    goland安装
    vscode调试和设置
    函数类型
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/10245971.html
Copyright © 2011-2022 走看看