在使用MySQL的JDBC时,如果查询结果集过大,使用一次查询,可能会出现Java.lang.OutOfMemoryError: Java heap space问题,因为DB服务器端一次将查询到的结果集全部发送到Java端保存在内存中而造成OOM。
MySQL JDBC需要一条SQL从数据库读取大量数据,而不发生JVM OOM,可以采用以下方法之一:
1、当statement设置以下属性时,采用的是流数据接收方式,每次只从服务器接收部份数据,直到所有数据处理完毕,不会发生JVM OOM。
1 setResultSetType(ResultSet.TYPE_FORWARD_ONLY); 2 setFetchSize(Integer.MIN_VALUE);
2、调用statement的enableStreamingResults方法,实际上enableStreamingResults方法内部封装的就是第1种方式。
3、设置连接属性useCursorFetch=true (5.0版驱动开始支持),statement以TYPE_FORWARD_ONLY打开,再设置fetch size参数,表示采用服务器端游标,每次从服务器取fetch_size条数据。
故采用如下方式就可以解决OOM问题:
1 ps = (PreparedStatement) con.prepareStatement("select * from bigTable", 2 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 3 ps.setFetchSize(Integer.MIN_VALUE); 4 ps.setFetchDirection(ResultSet.FETCH_REVERSE);
-------------------------------------------------------------------------------------------------
在Statement和ResultSet接口中都有setFetchSize方法
1 void setFetchSize(int rows) throws SQLException
查看API文档
Statement接口中是这样解释的:
为JDBC 驱动程序提供一个提示,它提示此Statement
生成的ResultSet
对象需要更多行时应该从数据库获取的行数。指定的行数仅影响使用此语句创建的结果集合。如果指定的值为 0,则忽略该提示。默认值为 0。
ResultSet中是这样解释的:
为 JDBC 驱动程序设置此ResultSet
对象需要更多行时应该从数据库获取的行数。如果指定的获取大小为零,则 JDBC 驱动程序忽略该值,随意对获取大小作出它自己的最佳猜测。默认值由创建结果集的Statement
对象设置。获取大小可以在任何时间更改。
网上有下面这样的一段摘录1:
缺省时,驱动程序一次从查询里获取所有的结果。这样可能对于大的数据集来说是不方便的, 因此 JDBC 驱动提供了一个用于设置从一个数据库游标抽取若干行的 ResultSet 的方法。在连接的客户端这边缓冲了一小部分数据行,并且在用尽之后, 则通过重定位游标检索下一个数据行块。
摘录2:
setFetchSize 最主要是为了减少网络交互次数设计的。访问ResultSet时,如果它每次只从服务器上取一行数据,则会产生大量的开销。setFetchSize的意 思是当调用rs.next时,ResultSet会一次性从服务器上取得多少行数据回来,这样在下次rs.next时,它可以直接从内存中获取出数据而不 需要网络交互,提高了效率。 这个设置可能会被某些JDBC驱动忽略的,而且设置过大也会造成内存的上升。
------------------------------------------------------------------------------------------------------------
源码分析:
fetchSize
这里以postgres jdbc driver为例,主要是因为postgres的jdbc driver有公开源码,而且命名比较规范。之前看oracle jdbc,由于没有源码,反编译出来一大堆var1,var2等的变量命名,非常晦涩。默认情况下pgjdbc driver会一次性拉取所有结果集,也就是在executeQuery的时候。对于大数据量的查询来说,非常容易造成OOM。这种场景就需要设置fetchSize,执行query的时候先返回第一批数据,之后next完一批数据之后再去拉取下一批。
但是这个有几个要求:
- 数据库必须使用V3协议,即pg7.4+
- connection的autoCommit必须为false,因为开启autoCommit的话,查询完成cursor会被关闭,那么下次就不能再fetch了。另外ResultSet必须是ResultSet.TYPE_FORWARD_ONLY类型,这个是默认的。也就是说无法向后滚动。
- 查询语句必须是单条,不能是用分号组成的多条查询
实例代码
1 @Test 2 public void testReadTimeout() throws SQLException { 3 Connection connection = dataSource.getConnection(); 4 //https://jdbc.postgresql.org/documentation/head/query.html 5 connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false 6 7 String sql = "select * from demo_table"; 8 PreparedStatement pstmt; 9 try { 10 pstmt = (PreparedStatement)connection.prepareStatement(sql); 11 pstmt.setFetchSize(50); 12 System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); 13 System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); 14 System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); 15 System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); 16 17 ResultSet rs = pstmt.executeQuery(); 18 //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据 19 int col = rs.getMetaData().getColumnCount(); 20 System.out.println("============================"); 21 while (rs.next()) { 22 for (int i = 1; i <= col; i++) { 23 System.out.print(rs.getObject(i)); 24 } 25 System.out.println(""); 26 } 27 System.out.println("============================"); 28 } catch (SQLException e) { 29 e.printStackTrace(); 30 } finally { 31 //close resources 32 } 33 }
源码解析
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
1 /* 2 * A Prepared SQL query is executed and its ResultSet is returned 3 * 4 * @return a ResultSet that contains the data produced by the * query - never null 5 * 6 * @exception SQLException if a database access error occurs 7 */ 8 public java.sql.ResultSet executeQuery() throws SQLException { 9 if (!executeWithFlags(0)) { 10 throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA); 11 } 12 13 if (result.getNext() != null) { 14 throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."), 15 PSQLState.TOO_MANY_RESULTS); 16 } 17 18 return result.getResultSet(); 19 } 20 executeQuery首先调用executeWithFlags方法,源码里头直接写在if里头的,这个不是推荐的方式,因为放在if比较容易忽略。 21 executeWithFlags 22 public boolean executeWithFlags(int flags) throws SQLException { 23 try { 24 checkClosed(); 25 26 if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) { 27 flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE; 28 } 29 30 execute(preparedQuery, preparedParameters, flags); 31 32 return (result != null && result.getResultSet() != null); 33 } finally { 34 defaultTimeZone = null; 35 } 36 } 37 38 protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags) 39 throws SQLException { 40 try { 41 executeInternal(cachedQuery, queryParameters, flags); 42 } catch (SQLException e) { 43 // Don't retry composite queries as it might get partially executed 44 if (cachedQuery.query.getSubqueries() != null 45 || !connection.getQueryExecutor().willHealOnRetry(e)) { 46 throw e; 47 } 48 cachedQuery.query.close(); 49 // Execute the query one more time 50 executeInternal(cachedQuery, queryParameters, flags); 51 } 52 }
这里又调用execute方法,在调用executeInternal
executeInternal
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
1 private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) 2 throws SQLException { 3 closeForNextExecution(); 4 5 // Enable cursor-based resultset if possible. 6 if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() 7 && !wantsHoldableResultSet()) { 8 flags |= QueryExecutor.QUERY_FORWARD_CURSOR; 9 } 10 11 if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { 12 flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; 13 14 // If the no results flag is set (from executeUpdate) 15 // clear it so we get the generated keys results. 16 // 17 if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { 18 flags &= ~(QueryExecutor.QUERY_NO_RESULTS); 19 } 20 } 21 22 if (isOneShotQuery(cachedQuery)) { 23 flags |= QueryExecutor.QUERY_ONESHOT; 24 } 25 // Only use named statements after we hit the threshold. Note that only 26 // named statements can be transferred in binary format. 27 28 if (connection.getAutoCommit()) { 29 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; 30 } 31 32 // updateable result sets do not yet support binary updates 33 if (concurrency != ResultSet.CONCUR_READ_ONLY) { 34 flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; 35 } 36 37 Query queryToExecute = cachedQuery.query; 38 39 if (queryToExecute.isEmpty()) { 40 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; 41 } 42 43 if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers 44 && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { 45 // Simple 'Q' execution does not need to know parameter types 46 // When binaryTransfer is forced, then we need to know resulting parameter and column types, 47 // thus sending a describe request. 48 int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; 49 StatementResultHandler handler2 = new StatementResultHandler(); 50 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, 51 flags2); 52 ResultWrapper result2 = handler2.getResults(); 53 if (result2 != null) { 54 result2.getResultSet().close(); 55 } 56 } 57 58 StatementResultHandler handler = new StatementResultHandler(); 59 result = null; 60 try { 61 startTimer(); 62 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, 63 fetchSize, flags); 64 } finally { 65 killTimerTask(); 66 } 67 result = firstUnclosedResult = handler.getResults(); 68 69 if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { 70 generatedKeys = result; 71 result = result.getNext(); 72 73 if (wantsGeneratedKeysOnce) { 74 wantsGeneratedKeysOnce = false; 75 } 76 } 77 78 } 79 主要看这段 80 connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, 81 fetchSize, flags); 82 通过把fetchSize传递进去,拉取指定大小的result 83 最后调用sendExecute以及processResults方法来拉取数据 84 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java 85 86 private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException { 87 // 88 // Send Execute. 89 // 90 91 if (logger.logDebug()) { 92 logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")"); 93 } 94 95 byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName()); 96 int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length); 97 98 // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows) 99 pgStream.sendChar('E'); // Execute 100 pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size 101 if (encodedPortalName != null) { 102 pgStream.send(encodedPortalName); // portal name 103 } 104 pgStream.sendChar(0); // portal name terminator 105 pgStream.sendInteger4(limit); // row limit 106 107 pendingExecuteQueue.add(new ExecuteRequest(query, portal, false)); 108 } 109 110 protected void processResults(ResultHandler handler, int flags) throws IOException { 111 boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0; 112 boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0; 113 114 List<byte[][]> tuples = null; 115 116 int c; 117 boolean endQuery = false; 118 119 // At the end of a command execution we have the CommandComplete 120 // message to tell us we're done, but with a describeOnly command 121 // we have no real flag to let us know we're done. We've got to 122 // look for the next RowDescription or NoData message and return 123 // from there. 124 boolean doneAfterRowDescNoData = false; 125 126 while (!endQuery) { 127 c = pgStream.receiveChar(); 128 switch (c) { 129 case 'A': // Asynchronous Notify 130 receiveAsyncNotify(); 131 break; 132 133 case '1': // Parse Complete (response to Parse) 134 pgStream.receiveInteger4(); // len, discarded 135 136 SimpleQuery parsedQuery = pendingParseQueue.removeFirst(); 137 String parsedStatementName = parsedQuery.getStatementName(); 138 //... 139 } 140 } 141 } 142 next 143 postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java 144 145 public boolean next() throws SQLException { 146 checkClosed(); 147 if (onInsertRow) { 148 throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), 149 PSQLState.INVALID_CURSOR_STATE); 150 } 151 if (current_row + 1 >= rows.size()) { 152 if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { 153 current_row = rows.size(); 154 this_row = null; 155 rowBuffer = null; 156 return false; // End of the resultset. 157 } 158 159 // Ask for some more data. 160 row_offset += rows.size(); // We are discarding some data. 161 162 int fetchRows = fetchSize; 163 if (maxRows != 0) { 164 if (fetchRows == 0 || row_offset + fetchRows > maxRows) { 165 // Fetch would exceed maxRows, limit it. 166 fetchRows = maxRows - row_offset; 167 } 168 } 169 // Execute the fetch and update this resultset. 170 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); 171 current_row = 0; 172 173 // Test the new rows array. 174 if (rows.isEmpty()) { 175 this_row = null; 176 rowBuffer = null; 177 return false; 178 } 179 } else { 180 current_row++; 181 } 182 initRowBuffer(); 183 return true; 184 }
next方法可以看到,首先判断current_row + 1是否小于rows.size(),小于的话,那就current_row++;否则表示这一批fetchSize的数据被消费完了,需要判断是否结束或者拉取下一批数据,之后更新current_row
1 connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
这个方法拉取fetchRows条数的下一批数据
- initRowBuffer
1 private void initRowBuffer() { 2 this_row = rows.get(current_row); 3 // We only need a copy of the current row if we're going to 4 // modify it via an updatable resultset. 5 if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) { 6 rowBuffer = new byte[this_row.length][]; 7 System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length); 8 } else { 9 rowBuffer = null; 10 } 11 }
这就是next移动之后,把要消费的这行数据放到rowBuffer里头。
小结
对于查询数据量大的场景下,非常有必要设置fetchSize,否则全量拉取很容易OOM,但是使用fetchSize的时候,要求数据能够在遍历resultSet的时候及时处理,而不是收集完所有数据返回回去再去处理。