Statement statement = connection.createStatement(); final ResultSet resultSet = statement.executeQuery(String sql);
上面两句是普通的查询过程,下面分析下驱动程序是如何进行查询和数据返回的,说明一下,本质上就是驱动程序通过Connection建立的TCP连接,将sql语句发送到MySQL服务器,然后接受mysql服务器的返回结果,解析成为ResultSet对象,重点就是这个发送和获取返回值的过程。
public java.sql.Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { checkClosed(); StatementImpl stmt = new StatementImpl(getLoadBalanceSafeProxy(), this.database); stmt.setResultSetType(resultSetType); stmt.setResultSetConcurrency(resultSetConcurrency); return stmt; }
在StatementImpl类当中的
public java.sql.ResultSet executeQuery(String sql) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { MySQLConnection locallyScopedConn = this.connection; this.retrieveGeneratedKeys = false; resetCancelledState(); checkNullOrEmptyQuery(sql); boolean doStreaming = createStreamingResultSet(); // Adjust net_write_timeout to a higher value if we're // streaming result sets. More often than not, someone runs into // an issue where they blow net_write_timeout when using this // feature, and if they're willing to hold a result set open // for 30 seconds or more, one more round-trip isn't going to hurt // // This is reset by RowDataDynamic.close(). if (doStreaming && this.connection.getNetTimeoutForStreamingResults() > 0) { executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout=" + this.connection.getNetTimeoutForStreamingResults()); } if (this.doEscapeProcessing) { Object escapedSqlResult = EscapeProcessor.escapeSQL(sql, locallyScopedConn.serverSupportsConvertFn(), this.connection); if (escapedSqlResult instanceof String) { sql = (String) escapedSqlResult; } else { sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql; } } char firstStatementChar = StringUtils.firstNonWsCharUc(sql, findStartOfStatement(sql)); if (sql.charAt(0) == '/') { if (sql.startsWith(PING_MARKER)) { doPingInstead(); return this.results; } } checkForDml(sql, firstStatementChar); if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) { if (this.results != null) { this.results.realClose(false); } if (this.generatedKeysResults != null) { this.generatedKeysResults.realClose(false); } closeAllOpenResults(); } CachedResultSetMetaData cachedMetaData = null; // If there isn't a limit clause in the SQL // then limit the number of rows to return in // an efficient manner. Only do this if // setMaxRows() hasn't been used on any Statements // generated from the current Connection (saves // a query, and network traffic). if (useServerFetch()) { this.results = createResultSetUsingServerFetch(sql); return this.results; } CancelTask timeoutTask = null; String oldCatalog = null; try { if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) { timeoutTask = new CancelTask(this); locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis); } if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) { oldCatalog = locallyScopedConn.getCatalog(); locallyScopedConn.setCatalog(this.currentCatalog); } // // Check if we have cached metadata for this query... // Field[] cachedFields = null; if (locallyScopedConn.getCacheResultSetMetadata()) { cachedMetaData = locallyScopedConn.getCachedMetaData(sql); if (cachedMetaData != null) { cachedFields = cachedMetaData.fields; } } if (locallyScopedConn.useMaxRows()) { // We need to execute this all together // So synchronize on the Connection's mutex (because // even queries going through there synchronize // on the connection if (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$ this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency, doStreaming, this.currentCatalog, cachedFields); } else { if (this.maxRows <= 0) { executeSimpleNonQuery(locallyScopedConn, "SET SQL_SELECT_LIMIT=DEFAULT"); } else { executeSimpleNonQuery(locallyScopedConn, "SET SQL_SELECT_LIMIT=" + this.maxRows); } statementBegins(); this.results = locallyScopedConn.execSQL(this, sql, -1, null, this.resultSetType, this.resultSetConcurrency, doStreaming, this.currentCatalog, cachedFields); if (oldCatalog != null) { locallyScopedConn.setCatalog(oldCatalog); } } } else { statementBegins(); this.results = locallyScopedConn.execSQL(this, sql, -1, null, this.resultSetType, this.resultSetConcurrency, doStreaming, this.currentCatalog, cachedFields); } if (timeoutTask != null) { if (timeoutTask.caughtWhileCancelling != null) { throw timeoutTask.caughtWhileCancelling; } timeoutTask.cancel(); locallyScopedConn.getCancelTimer().purge(); timeoutTask = null; } synchronized (this.cancelTimeoutMutex) { if (this.wasCancelled) { SQLException cause = null; if (this.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } resetCancelledState(); throw cause; } } } finally { this.statementExecuting.set(false); if (timeoutTask != null) { timeoutTask.cancel(); locallyScopedConn.getCancelTimer().purge(); } if (oldCatalog != null) { locallyScopedConn.setCatalog(oldCatalog); } } this.lastInsertId = this.results.getUpdateID(); if (cachedMetaData != null) { locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData, this.results); } else { if (this.connection.getCacheResultSetMetadata()) { locallyScopedConn.initializeResultsMetadataFromCache(sql, null /* will be created */, this.results); } } return this.results; } }
this.results = locallyScopedConn.execSQL(this, sql, -1, null, this.resultSetType, this.resultSetConcurrency, doStreaming, this.currentCatalog, cachedFields);
MySQLConnection locallyScopedConn = this.connection;
在ConnectionImpl类当中
public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException { synchronized (getConnectionMutex()) { // // Fall-back if the master is back online if we've // issued queriesBeforeRetryMaster queries since // we failed over // long queryStartTime = 0; int endOfQueryPacketPosition = 0; if (packet != null) { endOfQueryPacketPosition = packet.getPosition(); } if (getGatherPerformanceMetrics()) { queryStartTime = System.currentTimeMillis(); } this.lastQueryFinishedTime = 0; // we're busy! if ((getHighAvailability()) && (this.autoCommit || getAutoReconnectForPools()) && this.needsPing && !isBatch) { try { pingInternal(false, 0); this.needsPing = false; } catch (Exception Ex) { createNewIO(true); } } try { if (packet == null) { String encoding = null; if (getUseUnicode()) { encoding = getEncoding(); } return this.io.sqlQueryDirect(callingStatement, sql, encoding, null, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); } return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, cachedMetadata); } catch (java.sql.SQLException sqlE) { // don't clobber SQL exceptions if (getDumpQueriesOnException()) { String extractedSql = extractSqlFromPacket(sql, packet, endOfQueryPacketPosition); StringBuffer messageBuf = new StringBuffer(extractedSql .length() + 32); messageBuf .append(" Query being executed when exception was thrown: "); messageBuf.append(extractedSql); messageBuf.append(" "); sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor()); } if ((getHighAvailability())) { this.needsPing = true; } else { String sqlState = sqlE.getSQLState(); if ((sqlState != null) && sqlState .equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) { cleanup(sqlE); } } throw sqlE; } catch (Exception ex) { if (getHighAvailability()) { this.needsPing = true; } else if (ex instanceof IOException) { cleanup(ex); } SQLException sqlEx = SQLError.createSQLException( Messages.getString("Connection.UnexpectedException"), SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor()); sqlEx.initCause(ex); throw sqlEx; } finally { if (getMaintainTimeStats()) { this.lastQueryFinishedTime = System.currentTimeMillis(); } if (getGatherPerformanceMetrics()) { long queryTime = System.currentTimeMillis() - queryStartTime; registerQueryExecutionTime(queryTime); } } } }
在MySqlIO类当中的
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception { this.statementExecutionDepth++; try { if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement, false); if (interceptedResults != null) { return interceptedResults; } } long queryStartTime = 0; long queryEndTime = 0; String statementComment = this.connection.getStatementComment(); if (this.connection.getIncludeThreadNamesAsStatementComment()) { statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName(); } if (query != null) { // We don't know exactly how many bytes we're going to get // from the query. Since we're dealing with Unicode, the // max is 2, so pad it (2 * query) + space for headers int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2; byte[] commentAsBytes = null; if (statementComment != null) { commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, this.connection .getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), getExceptionInterceptor()); packLength += commentAsBytes.length; packLength += 6; // for /*[space] [space]*/ } if (this.sendPacket == null) { this.sendPacket = new Buffer(packLength); } else { this.sendPacket.clear(); } this.sendPacket.writeByte((byte) MysqlDefs.QUERY); if (commentAsBytes != null) { this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES); this.sendPacket.writeBytesNoNull(commentAsBytes); this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES); } if (characterEncoding != null) { if (this.platformDbCharsetMatches) { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), this.connection); } else { if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$ this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query)); } else { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), this.connection); } } } else { this.sendPacket.writeStringNoNull(query); } queryPacket = this.sendPacket; } byte[] queryBuf = null; int oldPacketPosition = 0; if (needToGrabQueryFromPacket) { queryBuf = queryPacket.getByteBuffer(); // save the packet position oldPacketPosition = queryPacket.getPosition(); queryStartTime = getCurrentTimeNanosOrMillis(); } if (this.autoGenerateTestcaseScript) { String testcaseQuery = null; if (query != null) { if (statementComment != null) { testcaseQuery = "/* " + statementComment + " */ " + query; } else { testcaseQuery = query; } } else { testcaseQuery = StringUtils.toString(queryBuf, 5, (oldPacketPosition - 5)); } StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32); this.connection.generateConnectionCommentBlock(debugBuf); debugBuf.append(testcaseQuery); debugBuf.append(';'); this.connection.dumpTestcaseQuery(debugBuf.toString()); } // Send query command and sql query string Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0); long fetchBeginTime = 0; long fetchEndTime = 0; String profileQueryToLog = null; boolean queryWasSlow = false; if (this.profileSql || this.logSlowQueries) { queryEndTime = getCurrentTimeNanosOrMillis(); boolean shouldExtractQuery = false; if (this.profileSql) { shouldExtractQuery = true; } else if (this.logSlowQueries) { long queryTime = queryEndTime - queryStartTime; boolean logSlow = false; if (!this.useAutoSlowLog) { logSlow = queryTime > this.connection.getSlowQueryThresholdMillis(); } else { logSlow = this.connection.isAbonormallyLongQuery(queryTime); this.connection.reportQueryTime(queryTime); } if (logSlow) { shouldExtractQuery = true; queryWasSlow = true; } } if (shouldExtractQuery) { // Extract the actual query from the network packet boolean truncated = false; int extractPosition = oldPacketPosition; if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) { extractPosition = this.connection.getMaxQuerySizeToLog() + 5; truncated = true; } profileQueryToLog = StringUtils.toString(queryBuf, 5, (extractPosition - 5)); if (truncated) { profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$ } } fetchBeginTime = queryEndTime; } ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, false, -1L, cachedMetadata); if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) { StringBuffer mesgBuf = new StringBuffer(48 + profileQueryToLog.length()); mesgBuf.append(Messages.getString("MysqlIO.SlowQuery", new Object[] {String.valueOf(this.useAutoSlowLog ? " 95% of all queries " : this.slowQueryThreshold), queryTimingUnits, Long.valueOf(queryEndTime - queryStartTime)})); mesgBuf.append(profileQueryToLog); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (int) (queryEndTime - queryStartTime), queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf.toString())); if (this.connection.getExplainSlowQueries()) { if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) { explainSlowQuery(queryPacket.getBytes(5, (oldPacketPosition - 5)), profileQueryToLog); } else { this.connection.getLog().logWarn(Messages.getString( "MysqlIO.28") //$NON-NLS-1$ +MAX_QUERY_SIZE_TO_EXPLAIN + Messages.getString("MysqlIO.29")); //$NON-NLS-1$ } } } if (this.logSlowQueries) { ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); if (this.queryBadIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages.getString("MysqlIO.33") //$NON-NLS-1$ +profileQueryToLog)); } if (this.queryNoIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages.getString("MysqlIO.35") //$NON-NLS-1$ +profileQueryToLog)); } if (this.serverQueryWasSlow && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$ +profileQueryToLog)); } } if (this.profileSql) { fetchEndTime = getCurrentTimeNanosOrMillis(); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog)); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, LogUtils.findCallingClassAndMethod(new Throwable()), null)); } if (this.hadWarnings) { scanForAndThrowDataTruncation(); } if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost( query, callingStatement, rs, false, null); if (interceptedResults != null) { rs = interceptedResults; } } return rs; } catch (SQLException sqlEx) { if (this.statementInterceptors != null) { invokeStatementInterceptorsPost( query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case } if (callingStatement != null) { synchronized (callingStatement.cancelTimeoutMutex) { if (callingStatement.wasCancelled) { SQLException cause = null; if (callingStatement.wasCancelledByTimeout) { cause = new MySQLTimeoutException(); } else { cause = new MySQLStatementCancelledException(); } callingStatement.resetCancelledState(); throw cause; } } } throw sqlEx; } finally { this.statementExecutionDepth--; } }
重点看到其中的
// Send query command and sql query string
Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
false, null, 0);
final Buffer sendCommand(int command, String extraData, Buffer queryPacket, boolean skipCheck, String extraDataCharEncoding, int timeoutMillis) throws SQLException { this.commandCount++; // // We cache these locally, per-command, as the checks // for them are in very 'hot' sections of the I/O code // and we save 10-15% in overall performance by doing this... // this.enablePacketDebug = this.connection.getEnablePacketDebug(); this.readPacketSequence = 0; int oldTimeout = 0; if (timeoutMillis != 0) { try { oldTimeout = this.mysqlConnection.getSoTimeout(); this.mysqlConnection.setSoTimeout(timeoutMillis); } catch (SocketException e) { throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, e, getExceptionInterceptor()); } } try { checkForOutstandingStreamingData(); // Clear serverStatus...this value is guarded by an // external mutex, as you can only ever be processing // one command at a time this.oldServerStatus = this.serverStatus; this.serverStatus = 0; this.hadWarnings = false; this.warningCount = 0; this.queryNoIndexUsed = false; this.queryBadIndexUsed = false; this.serverQueryWasSlow = false; // // Compressed input stream needs cleared at beginning // of each command execution... // if (this.useCompression) { int bytesLeft = this.mysqlInput.available(); if (bytesLeft > 0) { this.mysqlInput.skip(bytesLeft); } } try { clearInputStream(); // // PreparedStatements construct their own packets, // for efficiency's sake. // // If this is a generic query, we need to re-use // the sending packet. // if (queryPacket == null) { int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 + ((extraData != null) ? extraData.length() : 0) + 2; if (this.sendPacket == null) { this.sendPacket = new Buffer(packLength); } this.packetSequence = -1; this.compressedPacketSequence = -1; this.readPacketSequence = 0; this.checkPacketSequence = true; this.sendPacket.clear(); this.sendPacket.writeByte((byte) command); if ((command == MysqlDefs.INIT_DB) || (command == MysqlDefs.CREATE_DB) || (command == MysqlDefs.DROP_DB) || (command == MysqlDefs.QUERY) || (command == MysqlDefs.COM_PREPARE)) { if (extraDataCharEncoding == null) { this.sendPacket.writeStringNoNull(extraData); } else { this.sendPacket.writeStringNoNull(extraData, extraDataCharEncoding, this.connection.getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), this.connection); } } else if (command == MysqlDefs.PROCESS_KILL) { long id = Long.parseLong(extraData); this.sendPacket.writeLong(id); } send(this.sendPacket, this.sendPacket.getPosition()); } else { this.packetSequence = -1; this.compressedPacketSequence = -1; send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement } } catch (SQLException sqlEx) { // don't wrap SQLExceptions throw sqlEx; } catch (Exception ex) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex, getExceptionInterceptor()); } Buffer returnPacket = null; if (!skipCheck) { if ((command == MysqlDefs.COM_EXECUTE) || (command == MysqlDefs.COM_RESET_STMT)) { this.readPacketSequence = 0; this.packetSequenceReset = true; } returnPacket = checkErrorPacket(command); } return returnPacket; } catch (IOException ioEx) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor()); } finally { if (timeoutMillis != 0) { try { this.mysqlConnection.setSoTimeout(oldTimeout); } catch (SocketException e) { throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, e, getExceptionInterceptor()); } } } }
在上述方法中完成了sql数据的发送以及结果的获取,都是通过Socket的outputstream和inputstream完成的。
private final void send(Buffer packet, int packetLen) throws SQLException { try { if (this.maxAllowedPacket > 0 && packetLen > this.maxAllowedPacket) { throw new PacketTooBigException(packetLen, this.maxAllowedPacket); } if ((this.serverMajorVersion >= 4) && (packetLen - HEADER_LENGTH >= this.maxThreeBytes || (this.useCompression && packetLen - HEADER_LENGTH >= this.maxThreeBytes - COMP_HEADER_LENGTH) )) { sendSplitPackets(packet, packetLen); } else { this.packetSequence++; Buffer packetToSend = packet; packetToSend.setPosition(0); packetToSend.writeLongInt(packetLen - HEADER_LENGTH); packetToSend.writeByte(this.packetSequence); if (this.useCompression) { this.compressedPacketSequence++; int originalPacketLen = packetLen; packetToSend = compressPacket(packetToSend, 0, packetLen); packetLen = packetToSend.getPosition(); if (this.traceProtocol) { StringBuffer traceMessageBuf = new StringBuffer(); traceMessageBuf.append(Messages.getString("MysqlIO.57")); //$NON-NLS-1$ traceMessageBuf.append(getPacketDumpToLog( packetToSend, packetLen)); traceMessageBuf.append(Messages.getString("MysqlIO.58")); //$NON-NLS-1$ traceMessageBuf.append(getPacketDumpToLog(packet, originalPacketLen)); this.connection.getLog().logTrace(traceMessageBuf.toString()); } } else { if (this.traceProtocol) { StringBuffer traceMessageBuf = new StringBuffer(); traceMessageBuf.append(Messages.getString("MysqlIO.59")); //$NON-NLS-1$ traceMessageBuf.append("host: '"); traceMessageBuf.append(host); traceMessageBuf.append("' threadId: '"); traceMessageBuf.append(threadId); traceMessageBuf.append("' "); traceMessageBuf.append(packetToSend.dump(packetLen)); this.connection.getLog().logTrace(traceMessageBuf.toString()); } } this.mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen); this.mysqlOutput.flush(); } if (this.enablePacketDebug) { enqueuePacketForDebugging(true, false, packetLen + 5, this.packetHeaderBuf, packet); } // // Don't hold on to large packets // if (packet == this.sharedSendPacket) { reclaimLargeSharedSendPacket(); } if (this.connection.getMaintainTimeStats()) { this.lastPacketSentTimeMs = System.currentTimeMillis(); } } catch (IOException ioEx) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor()); } }
private Buffer checkErrorPacket(int command) throws SQLException { //int statusCode = 0; Buffer resultPacket = null; this.serverStatus = 0; try { // Check return value, if we get a java.io.EOFException, // the server has gone away. We'll pass it on up the // exception chain and let someone higher up decide // what to do (barf, reconnect, etc). resultPacket = reuseAndReadPacket(this.reusablePacket); } catch (SQLException sqlEx) { // Don't wrap SQL Exceptions throw sqlEx; } catch (Exception fallThru) { throw SQLError.createCommunicationsException(this.connection, this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, fallThru, getExceptionInterceptor()); } checkErrorPacket(resultPacket); return resultPacket; }
--------------------------------------------------------
在JDBC驱动的sql请求数据发送给mysql服务器的过程中,可以配置是否采用压缩,压缩采用的是java.util.zip当中的
java.util.zip
类 Deflater
此类使用流行的 ZLIB 压缩程序库为通用压缩提供支持。ZLIB 压缩程序库最初是作为 PNG 图形标准的一部分开发的,不受专利的保护。有关该规范的完整描述,请参见 java.util.zip 包描述。
协议格式:
packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
packetToSend.writeByte(this.packetSequence);
用于处理TCP粘包的问题,从而MySQL服务器将不同的sql请求分离开来。
我们来看一下JDBC驱动程序中的Connection的相关类图
java.sql.Connection---->com.mysql.jdbc.Connection----->com.mysql.jdbc.MySQLConnection----->com.mysql.jdbc.ConnectionImpl---->JDBC
4Connection