zoukankan      html  css  js  c++  java
  • MySQL驱动阅读------executeQuery查询的过程,基于JDBC-----5.1.26

    Statement statement = connection.createStatement();
    final ResultSet resultSet = statement.executeQuery(String sql);
    

    上面两句是普通的查询过程,下面分析下驱动程序是如何进行查询和数据返回的,说明一下,本质上就是驱动程序通过Connection建立的TCP连接,将sql语句发送到MySQL服务器,然后接受mysql服务器的返回结果,解析成为ResultSet对象,重点就是这个发送和获取返回值的过程。

    public java.sql.Statement createStatement(int resultSetType,
    			int resultSetConcurrency) throws SQLException {
    		checkClosed();
    
    		StatementImpl stmt = new StatementImpl(getLoadBalanceSafeProxy(), this.database);
    		stmt.setResultSetType(resultSetType);
    		stmt.setResultSetConcurrency(resultSetConcurrency);
    
    		return stmt;
    	}
    

    在StatementImpl类当中的

    public java.sql.ResultSet executeQuery(String sql)
    			throws SQLException {
    		synchronized (checkClosed().getConnectionMutex()) {
    			MySQLConnection locallyScopedConn = this.connection;
    			
    			this.retrieveGeneratedKeys = false;
    			
    			resetCancelledState();
    
    			checkNullOrEmptyQuery(sql);
    
    			boolean doStreaming = createStreamingResultSet();
    
    			// Adjust net_write_timeout to a higher value if we're
    			// streaming result sets. More often than not, someone runs into
    			// an issue where they blow net_write_timeout when using this
    			// feature, and if they're willing to hold a result set open
    			// for 30 seconds or more, one more round-trip isn't going to hurt
    			//
    			// This is reset by RowDataDynamic.close().
    
    			if (doStreaming
    					&& this.connection.getNetTimeoutForStreamingResults() > 0) {
    				executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout="
    						+ this.connection.getNetTimeoutForStreamingResults());
    			}
    
    			if (this.doEscapeProcessing) {
    				Object escapedSqlResult = EscapeProcessor.escapeSQL(sql,
    						locallyScopedConn.serverSupportsConvertFn(), this.connection);
    
    				if (escapedSqlResult instanceof String) {
    					sql = (String) escapedSqlResult;
    				} else {
    					sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;
    				}
    			}
    
    			char firstStatementChar = StringUtils.firstNonWsCharUc(sql,
    					findStartOfStatement(sql));
    
    			if (sql.charAt(0) == '/') {
    				if (sql.startsWith(PING_MARKER)) {
    					doPingInstead();
    				
    					return this.results;
    				}
    			}
    			
    			checkForDml(sql, firstStatementChar);
    
    			if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) {
    				if (this.results != null) {
    					this.results.realClose(false);
    				}
    				if (this.generatedKeysResults != null) {
    					this.generatedKeysResults.realClose(false);
    				}
    				closeAllOpenResults();
    			}
    
    			CachedResultSetMetaData cachedMetaData = null;
    
    			// If there isn't a limit clause in the SQL
    			// then limit the number of rows to return in
    			// an efficient manner. Only do this if
    			// setMaxRows() hasn't been used on any Statements
    			// generated from the current Connection (saves
    			// a query, and network traffic).
    
    			if (useServerFetch()) {
    				this.results = createResultSetUsingServerFetch(sql);
    
    				return this.results;
    			}
    
    			CancelTask timeoutTask = null;
    
    			String oldCatalog = null;
    
    			try {
    				if (locallyScopedConn.getEnableQueryTimeouts() &&
    						this.timeoutInMillis != 0
    						&& locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
    					timeoutTask = new CancelTask(this);
    					locallyScopedConn.getCancelTimer().schedule(timeoutTask,
    							this.timeoutInMillis);
    				}
    
    				if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
    					oldCatalog = locallyScopedConn.getCatalog();
    					locallyScopedConn.setCatalog(this.currentCatalog);
    				}
    
    				//
    				// Check if we have cached metadata for this query...
    				//
    
    				Field[] cachedFields = null;
    
    				if (locallyScopedConn.getCacheResultSetMetadata()) {
    					cachedMetaData = locallyScopedConn.getCachedMetaData(sql);
    
    					if (cachedMetaData != null) {
    						cachedFields = cachedMetaData.fields;
    					}
    				}
    
    				if (locallyScopedConn.useMaxRows()) {
    					// We need to execute this all together
    					// So synchronize on the Connection's mutex (because
    					// even queries going through there synchronize
    					// on the connection
    					if (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$
    						this.results = locallyScopedConn.execSQL(this, sql,
    								this.maxRows, null, this.resultSetType,
    								this.resultSetConcurrency,
    								doStreaming,
    								this.currentCatalog, cachedFields);
    					} else {
    						if (this.maxRows <= 0) {
    							executeSimpleNonQuery(locallyScopedConn,
    									"SET SQL_SELECT_LIMIT=DEFAULT");
    						} else {
    							executeSimpleNonQuery(locallyScopedConn,
    									"SET SQL_SELECT_LIMIT=" + this.maxRows);
    						}
    
    						statementBegins();
    						
    						this.results = locallyScopedConn.execSQL(this, sql, -1,
    								null, this.resultSetType,
    								this.resultSetConcurrency,
    								doStreaming,
    								this.currentCatalog, cachedFields);
    
    						if (oldCatalog != null) {
    							locallyScopedConn.setCatalog(oldCatalog);
    						}
    					}
    				} else {
    					statementBegins();
    					
    					this.results = locallyScopedConn.execSQL(this, sql, -1, null,
    							this.resultSetType, this.resultSetConcurrency,
    							doStreaming,
    							this.currentCatalog, cachedFields);
    				}
    
    				if (timeoutTask != null) {
    					if (timeoutTask.caughtWhileCancelling != null) {
    						throw timeoutTask.caughtWhileCancelling;
    					}
    
    					timeoutTask.cancel();
    					
    					locallyScopedConn.getCancelTimer().purge();
    					
    					timeoutTask = null;
    				}
    
    				synchronized (this.cancelTimeoutMutex) {
    					if (this.wasCancelled) {
    						SQLException cause = null;
    						
    						if (this.wasCancelledByTimeout) {
    							cause = new MySQLTimeoutException();
    						} else {
    							cause = new MySQLStatementCancelledException();
    						}
    						
    						resetCancelledState();
    						
    						throw cause;
    					}
    				}
    			} finally {
    				this.statementExecuting.set(false);
    				
    				if (timeoutTask != null) {
    					timeoutTask.cancel();
    					
    					locallyScopedConn.getCancelTimer().purge();
    				}
    
    				if (oldCatalog != null) {
    					locallyScopedConn.setCatalog(oldCatalog);
    				}
    			}
    
    			this.lastInsertId = this.results.getUpdateID();
    
    			if (cachedMetaData != null) {
    				locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData,
    						this.results);
    			} else {
    				if (this.connection.getCacheResultSetMetadata()) {
    					locallyScopedConn.initializeResultsMetadataFromCache(sql,
    							null /* will be created */, this.results);
    				}
    			}
    
    			return this.results;
    		}
    	}
    

      

    	this.results = locallyScopedConn.execSQL(this, sql, -1,
    								null, this.resultSetType,
    								this.resultSetConcurrency,
    								doStreaming,
    								this.currentCatalog, cachedFields);
    
    MySQLConnection locallyScopedConn = this.connection;
    

    在ConnectionImpl类当中

    public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows,
    			Buffer packet, int resultSetType, int resultSetConcurrency,
    			boolean streamResults, String catalog,
    			Field[] cachedMetadata,
    			boolean isBatch) throws SQLException {
    		synchronized (getConnectionMutex()) {
    			//
    			// Fall-back if the master is back online if we've
    			// issued queriesBeforeRetryMaster queries since
    			// we failed over
    			//
    	
    			long queryStartTime = 0;
    	
    			int endOfQueryPacketPosition = 0;
    	
    			if (packet != null) {
    				endOfQueryPacketPosition = packet.getPosition();
    			}
    	
    			if (getGatherPerformanceMetrics()) {
    				queryStartTime = System.currentTimeMillis();
    			}
    	
    			this.lastQueryFinishedTime = 0; // we're busy!
    	
    			if ((getHighAvailability())
    					&& (this.autoCommit || getAutoReconnectForPools())
    					&& this.needsPing && !isBatch) {
    				try {
    					pingInternal(false, 0);
    	
    					this.needsPing = false;
    				} catch (Exception Ex) {
    					createNewIO(true);
    				}
    			}
    	
    			try {
    				if (packet == null) {
    					String encoding = null;
    	
    					if (getUseUnicode()) {
    						encoding = getEncoding();
    					}
    	
    					return this.io.sqlQueryDirect(callingStatement, sql,
    							encoding, null, maxRows, resultSetType,
    							resultSetConcurrency, streamResults, catalog,
    							cachedMetadata);
    				}
    	
    				return this.io.sqlQueryDirect(callingStatement, null, null,
    						packet, maxRows, resultSetType,
    						resultSetConcurrency, streamResults, catalog,
    						cachedMetadata);
    			} catch (java.sql.SQLException sqlE) {
    				// don't clobber SQL exceptions
    	
    				if (getDumpQueriesOnException()) {
    					String extractedSql = extractSqlFromPacket(sql, packet,
    							endOfQueryPacketPosition);
    					StringBuffer messageBuf = new StringBuffer(extractedSql
    							.length() + 32);
    					messageBuf
    							.append("
    
    Query being executed when exception was thrown:
    ");
    					messageBuf.append(extractedSql);
    					messageBuf.append("
    
    ");
    	
    					sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());
    				}
    	
    				if ((getHighAvailability())) {
    					this.needsPing = true;
    				} else {
    					String sqlState = sqlE.getSQLState();
    	
    					if ((sqlState != null)
    							&& sqlState
    									.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {
    						cleanup(sqlE);
    					}
    				}
    	
    				throw sqlE;
    			} catch (Exception ex) {
    				if (getHighAvailability()) {
    					this.needsPing = true;
    				} else if (ex instanceof IOException) {
    					cleanup(ex);
    				}
    	
    				SQLException sqlEx = SQLError.createSQLException(
    						Messages.getString("Connection.UnexpectedException"),
    						SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());
    				sqlEx.initCause(ex);
    				
    				throw sqlEx;
    			} finally {
    				if (getMaintainTimeStats()) {
    					this.lastQueryFinishedTime = System.currentTimeMillis();
    				}
    	
    	
    				if (getGatherPerformanceMetrics()) {
    					long queryTime = System.currentTimeMillis()
    							- queryStartTime;
    	
    					registerQueryExecutionTime(queryTime);
    				}
    			}
    		}
    	}
    

      

    在MySqlIO类当中的

     final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query,
        		String characterEncoding, Buffer queryPacket, int maxRows,
        		int resultSetType, int resultSetConcurrency,
        		boolean streamResults, String catalog, Field[] cachedMetadata)
        throws Exception {
        	this.statementExecutionDepth++;
    
        	try {
    	    	if (this.statementInterceptors != null) {
    	    		ResultSetInternalMethods interceptedResults =
    	    			invokeStatementInterceptorsPre(query, callingStatement, false);
    
    	    		if (interceptedResults != null) {
    	    			return interceptedResults;
    	    		}
    	    	}
    
    	    	long queryStartTime = 0;
    	    	long queryEndTime = 0;
    
        		String statementComment = this.connection.getStatementComment();
        		
        		if (this.connection.getIncludeThreadNamesAsStatementComment()) {
        			statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();
        		}
        		
    	    	if (query != null) {
    	    		// We don't know exactly how many bytes we're going to get
    	    		// from the query. Since we're dealing with Unicode, the
    	    		// max is 2, so pad it (2 * query) + space for headers
    	    		int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2;
    
    	    		byte[] commentAsBytes = null;
    
    	    		if (statementComment != null) {
    	    			commentAsBytes = StringUtils.getBytes(statementComment, null,
    	    					characterEncoding, this.connection
    	    					.getServerCharacterEncoding(),
    	    					this.connection.parserKnowsUnicode(), getExceptionInterceptor());
    
    	    			packLength += commentAsBytes.length;
    	    			packLength += 6; // for /*[space] [space]*/
    	    		}
    
    	    		if (this.sendPacket == null) {
    	    			this.sendPacket = new Buffer(packLength);
    	    		} else {
    	    			this.sendPacket.clear();
    	    		}
    
    	    		this.sendPacket.writeByte((byte) MysqlDefs.QUERY);
    
    	    		if (commentAsBytes != null) {
    	    			this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES);
    	    			this.sendPacket.writeBytesNoNull(commentAsBytes);
    	    			this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);
    	    		}
    
    	    		if (characterEncoding != null) {
    	    			if (this.platformDbCharsetMatches) {
    	    				this.sendPacket.writeStringNoNull(query, characterEncoding,
    	    						this.connection.getServerCharacterEncoding(),
    	    						this.connection.parserKnowsUnicode(),
    	    						this.connection);
    	    			} else {
    	    				if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$
    	    					this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));
    	    				} else {
    	    					this.sendPacket.writeStringNoNull(query,
    	    							characterEncoding,
    	    							this.connection.getServerCharacterEncoding(),
    	    							this.connection.parserKnowsUnicode(),
    	    							this.connection);
    	    				}
    	    			}
    	    		} else {
    	    			this.sendPacket.writeStringNoNull(query);
    	    		}
    
    	    		queryPacket = this.sendPacket;
    	    	}
    
    	    	byte[] queryBuf = null;
    	    	int oldPacketPosition = 0;
    
    	    	if (needToGrabQueryFromPacket) {
    	    		queryBuf = queryPacket.getByteBuffer();
    
    	    		// save the packet position
    	    		oldPacketPosition = queryPacket.getPosition();
    
    	    		queryStartTime = getCurrentTimeNanosOrMillis();
    	    	}
    	    	
    	    	if (this.autoGenerateTestcaseScript) {
    	    		String testcaseQuery = null;
    
    	    		if (query != null) {
    	    			if (statementComment != null) {
    	    				testcaseQuery = "/* " + statementComment + " */ " + query;
    	    			} else {
    	    				testcaseQuery = query;
    	    			}
    	    		} else {
    	    			testcaseQuery = StringUtils.toString(queryBuf, 5,
    	    					(oldPacketPosition - 5));
    	    		}
    
    	    		StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32);
    	    		this.connection.generateConnectionCommentBlock(debugBuf);
    	    		debugBuf.append(testcaseQuery);
    	    		debugBuf.append(';');
    	    		this.connection.dumpTestcaseQuery(debugBuf.toString());
    	    	}
    
    	    	// Send query command and sql query string
    	    	Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
    	    			false, null, 0);
    
    	    	long fetchBeginTime = 0;
    	    	long fetchEndTime = 0;
    
    	    	String profileQueryToLog = null;
    
    	    	boolean queryWasSlow = false;
    
    	    	if (this.profileSql || this.logSlowQueries) {
    	    		queryEndTime = getCurrentTimeNanosOrMillis();
    
    	    		boolean shouldExtractQuery = false;
    
    	    		if (this.profileSql) {
    	    			shouldExtractQuery = true;
    	    		} else if (this.logSlowQueries) {
    	    			long queryTime = queryEndTime - queryStartTime;
    	    			
    	    			boolean logSlow = false;
    	    			
    	    			if (!this.useAutoSlowLog) {
    	    				logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();
    	    			} else {
    	    				logSlow = this.connection.isAbonormallyLongQuery(queryTime);
    	    				
    	    				this.connection.reportQueryTime(queryTime);
    	    			}
    	    			
    	    			if (logSlow) {
    	    				shouldExtractQuery = true;
    	    				queryWasSlow = true;
    	    			}
    	    		}
    
    	    		if (shouldExtractQuery) {
    	    			// Extract the actual query from the network packet
    	    			boolean truncated = false;
    
    	    			int extractPosition = oldPacketPosition;
    
    	    			if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {
    	    				extractPosition = this.connection.getMaxQuerySizeToLog() + 5;
    	    				truncated = true;
    	    			}
    
    	    			profileQueryToLog = StringUtils.toString(queryBuf, 5,
    	    					(extractPosition - 5));
    
    	    			if (truncated) {
    	    				profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$
    	    			}
    	    		}
    
    	    		fetchBeginTime = queryEndTime;
    	    	}
    
    	    	ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType,
    	    			resultSetConcurrency, streamResults, catalog, resultPacket,
    	    			false, -1L, cachedMetadata);
    
    	    	if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {
    	    		StringBuffer mesgBuf = new StringBuffer(48 +
    	    				profileQueryToLog.length());
    
    	    		mesgBuf.append(Messages.getString("MysqlIO.SlowQuery",
    	    				new Object[] {String.valueOf(this.useAutoSlowLog ? 
    	    						" 95% of all queries " : this.slowQueryThreshold),
    	    				queryTimingUnits,
    	    				Long.valueOf(queryEndTime - queryStartTime)}));
    	    		mesgBuf.append(profileQueryToLog);
    
    	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);
    
    	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY,
    	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
    	    				(callingStatement != null) ? callingStatement.getId() : 999,
    	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
    	    						(int) (queryEndTime - queryStartTime), queryTimingUnits, null,
    	    						LogUtils.findCallingClassAndMethod(new Throwable()), mesgBuf.toString()));
    
    	    		if (this.connection.getExplainSlowQueries()) {
    	    			if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {
    	    				explainSlowQuery(queryPacket.getBytes(5,
    	    						(oldPacketPosition - 5)), profileQueryToLog);
    	    			} else {
    	    				this.connection.getLog().logWarn(Messages.getString(
    	    						"MysqlIO.28") //$NON-NLS-1$
    	    						+MAX_QUERY_SIZE_TO_EXPLAIN +
    	    						Messages.getString("MysqlIO.29")); //$NON-NLS-1$
    	    			}
    	    		}
    	    	}
    
    	    	if (this.logSlowQueries) {
    
    	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);
    
    	    		if (this.queryBadIndexUsed && this.profileSql) {
    	    			eventSink.consumeEvent(new ProfilerEvent(
    	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
    	    					this.connection.getId(),
    	    					(callingStatement != null) ? callingStatement.getId()
    	    							: 999, ((ResultSetImpl)rs).resultId,
    	    							System.currentTimeMillis(),
    	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
    	    							null,
    	    							LogUtils.findCallingClassAndMethod(new Throwable()),
    	    							Messages.getString("MysqlIO.33") //$NON-NLS-1$
    	    							+profileQueryToLog));
    	    		}
    
    	    		if (this.queryNoIndexUsed && this.profileSql) {
    	    			eventSink.consumeEvent(new ProfilerEvent(
    	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
    	    					this.connection.getId(),
    	    					(callingStatement != null) ? callingStatement.getId()
    	    							: 999, ((ResultSetImpl)rs).resultId,
    	    							System.currentTimeMillis(),
    	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
    	    							null,
    	    							LogUtils.findCallingClassAndMethod(new Throwable()),
    	    							Messages.getString("MysqlIO.35") //$NON-NLS-1$
    	    							+profileQueryToLog));
    	    		}
    	    		
    	    		if (this.serverQueryWasSlow && this.profileSql) {
    	    			eventSink.consumeEvent(new ProfilerEvent(
    	    					ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$
    	    					this.connection.getId(),
    	    					(callingStatement != null) ? callingStatement.getId()
    	    							: 999, ((ResultSetImpl)rs).resultId,
    	    							System.currentTimeMillis(),
    	    							(queryEndTime - queryStartTime), this.queryTimingUnits,
    	    							null,
    	    							LogUtils.findCallingClassAndMethod(new Throwable()),
    	    							Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$
    	    							+profileQueryToLog));
    	    		}
    	    	}
    
    	    	if (this.profileSql) {
    	    		fetchEndTime = getCurrentTimeNanosOrMillis();
    
    	    		ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);
    
    	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY,
    	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
    	    				(callingStatement != null) ? callingStatement.getId() : 999,
    	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
    	    						(queryEndTime - queryStartTime), this.queryTimingUnits,
    	    						null,
    	    						LogUtils.findCallingClassAndMethod(new Throwable()), profileQueryToLog));
    
    	    		eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH,
    	    				"", catalog, this.connection.getId(), //$NON-NLS-1$
    	    				(callingStatement != null) ? callingStatement.getId() : 999,
    	    						((ResultSetImpl)rs).resultId, System.currentTimeMillis(),
    	    						(fetchEndTime - fetchBeginTime), this.queryTimingUnits,
    	    						null,
    	    						LogUtils.findCallingClassAndMethod(new Throwable()), null));
    	    	}
    
    	    	if (this.hadWarnings) {
    	    		scanForAndThrowDataTruncation();
    	    	}
    
    	    	if (this.statementInterceptors != null) {
    	    		ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(
    	    				query, callingStatement, rs, false, null);
    
    	    		if (interceptedResults != null) {
    	    			rs = interceptedResults;
    	    		}
    	    	}
    
    	    	return rs;
        	} catch (SQLException sqlEx) {
        		if (this.statementInterceptors != null) {
    	    		invokeStatementInterceptorsPost(
    	    				query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case
        		}
        		
        		if (callingStatement != null) {
        			synchronized (callingStatement.cancelTimeoutMutex) {
    	    			if (callingStatement.wasCancelled) {
    						SQLException cause = null;
    						
    						if (callingStatement.wasCancelledByTimeout) {
    							cause = new MySQLTimeoutException();
    						} else {
    							cause = new MySQLStatementCancelledException();
    						}
    						
    						callingStatement.resetCancelledState();
    						
    						throw cause;
    					}
        			}
        		}
        		
        		throw sqlEx;
        	} finally {
        		this.statementExecutionDepth--;
        	}
        }
    

      

    重点看到其中的


    // Send query command and sql query string
    Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,
    false, null, 0);

    final Buffer sendCommand(int command, String extraData, Buffer queryPacket,
            boolean skipCheck, String extraDataCharEncoding, int timeoutMillis)
            throws SQLException {
        	this.commandCount++;
        	
            //
            // We cache these locally, per-command, as the checks
            // for them are in very 'hot' sections of the I/O code
            // and we save 10-15% in overall performance by doing this...
            //
            this.enablePacketDebug = this.connection.getEnablePacketDebug();
            this.readPacketSequence = 0;
    
            int oldTimeout = 0;
            
            if (timeoutMillis != 0) {
            	try {
            		oldTimeout = this.mysqlConnection.getSoTimeout();
    				this.mysqlConnection.setSoTimeout(timeoutMillis);
    			} catch (SocketException e) {
    				throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, 
    						lastPacketReceivedTimeMs, e, getExceptionInterceptor());
    			}
            }
            
            try {
    
                checkForOutstandingStreamingData();
    
                // Clear serverStatus...this value is guarded by an
                // external mutex, as you can only ever be processing
                // one command at a time
                this.oldServerStatus = this.serverStatus;
                this.serverStatus = 0;
                this.hadWarnings = false;
                this.warningCount = 0;
    
                this.queryNoIndexUsed = false;
                this.queryBadIndexUsed = false;
                this.serverQueryWasSlow = false;
                
                //
                // Compressed input stream needs cleared at beginning
                // of each command execution...
                //
                if (this.useCompression) {
                    int bytesLeft = this.mysqlInput.available();
    
                    if (bytesLeft > 0) {
                        this.mysqlInput.skip(bytesLeft);
                    }
                }
    
                try {
                    clearInputStream();
    
                    //
                    // PreparedStatements construct their own packets,
                    // for efficiency's sake.
                    //
                    // If this is a generic query, we need to re-use
                    // the sending packet.
                    //
                    if (queryPacket == null) {
                        int packLength = HEADER_LENGTH + COMP_HEADER_LENGTH + 1 +
                            ((extraData != null) ? extraData.length() : 0) + 2;
    
                        if (this.sendPacket == null) {
                            this.sendPacket = new Buffer(packLength);
                        }
    
                        this.packetSequence = -1;
                        this.compressedPacketSequence = -1;
                        this.readPacketSequence = 0;
                        this.checkPacketSequence = true;
                        this.sendPacket.clear();
    
                        this.sendPacket.writeByte((byte) command);
    
                        if ((command == MysqlDefs.INIT_DB) ||
                                (command == MysqlDefs.CREATE_DB) ||
                                (command == MysqlDefs.DROP_DB) ||
                                (command == MysqlDefs.QUERY) ||
                                (command == MysqlDefs.COM_PREPARE)) {
                            if (extraDataCharEncoding == null) {
                                this.sendPacket.writeStringNoNull(extraData);
                            } else {
                                this.sendPacket.writeStringNoNull(extraData,
                                    extraDataCharEncoding,
                                    this.connection.getServerCharacterEncoding(),
                                    this.connection.parserKnowsUnicode(), this.connection);
                            }
                        } else if (command == MysqlDefs.PROCESS_KILL) {
                            long id = Long.parseLong(extraData);
                            this.sendPacket.writeLong(id);
                        }
    
                        send(this.sendPacket, this.sendPacket.getPosition());
                    } else {
                        this.packetSequence = -1;
                        this.compressedPacketSequence = -1;
                        send(queryPacket, queryPacket.getPosition()); // packet passed by PreparedStatement
                    }
                } catch (SQLException sqlEx) {
                    // don't wrap SQLExceptions
                    throw sqlEx;
                } catch (Exception ex) {
                    throw SQLError.createCommunicationsException(this.connection,
                        this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ex, getExceptionInterceptor());
                }
    
                Buffer returnPacket = null;
    
                if (!skipCheck) {
                    if ((command == MysqlDefs.COM_EXECUTE) ||
                            (command == MysqlDefs.COM_RESET_STMT)) {
                        this.readPacketSequence = 0;
                        this.packetSequenceReset = true;
                    }
    
                    returnPacket = checkErrorPacket(command);
                }
    
                return returnPacket;
            } catch (IOException ioEx) {
                throw SQLError.createCommunicationsException(this.connection,
                    this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
            } finally {
            	if (timeoutMillis != 0) {
            		try {
            			this.mysqlConnection.setSoTimeout(oldTimeout);
            		} catch (SocketException e) {
        				throw SQLError.createCommunicationsException(this.connection, lastPacketSentTimeMs, 
        						lastPacketReceivedTimeMs, e, getExceptionInterceptor());
        			}
            	}
            }
        }
    

    在上述方法中完成了sql数据的发送以及结果的获取,都是通过Socket的outputstream和inputstream完成的。

    private final void send(Buffer packet, int packetLen)
            throws SQLException {
            try {
                if (this.maxAllowedPacket > 0 && packetLen > this.maxAllowedPacket) {
                    throw new PacketTooBigException(packetLen, this.maxAllowedPacket);
                }
    
                if ((this.serverMajorVersion >= 4) &&
                		(packetLen - HEADER_LENGTH >= this.maxThreeBytes ||
                		(this.useCompression && packetLen - HEADER_LENGTH >= this.maxThreeBytes - COMP_HEADER_LENGTH)
                		)) {
                    sendSplitPackets(packet, packetLen);
    
                } else {
                    this.packetSequence++;
    
                    Buffer packetToSend = packet;
                    packetToSend.setPosition(0);
                    packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
                    packetToSend.writeByte(this.packetSequence);
    
                    if (this.useCompression) {
                    	this.compressedPacketSequence++;
                        int originalPacketLen = packetLen;
    
                        packetToSend = compressPacket(packetToSend, 0, packetLen);
                        packetLen = packetToSend.getPosition();
    
                        if (this.traceProtocol) {
                            StringBuffer traceMessageBuf = new StringBuffer();
    
                            traceMessageBuf.append(Messages.getString("MysqlIO.57")); //$NON-NLS-1$
                            traceMessageBuf.append(getPacketDumpToLog(
                                    packetToSend, packetLen));
                            traceMessageBuf.append(Messages.getString("MysqlIO.58")); //$NON-NLS-1$
                            traceMessageBuf.append(getPacketDumpToLog(packet,
                                    originalPacketLen));
    
                            this.connection.getLog().logTrace(traceMessageBuf.toString());
                        }
                    } else {
    
                        if (this.traceProtocol) {
                            StringBuffer traceMessageBuf = new StringBuffer();
    
                            traceMessageBuf.append(Messages.getString("MysqlIO.59")); //$NON-NLS-1$
                            traceMessageBuf.append("host: '");
                            traceMessageBuf.append(host);
                            traceMessageBuf.append("' threadId: '");
                            traceMessageBuf.append(threadId);
                            traceMessageBuf.append("'
    ");
                            traceMessageBuf.append(packetToSend.dump(packetLen));
    
                            this.connection.getLog().logTrace(traceMessageBuf.toString());
                        }
                    }
    
                    this.mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
                    this.mysqlOutput.flush();
                }
    
                if (this.enablePacketDebug) {
                    enqueuePacketForDebugging(true, false, packetLen + 5,
                        this.packetHeaderBuf, packet);
                }
    
                //
                // Don't hold on to large packets
                //
                if (packet == this.sharedSendPacket) {
                    reclaimLargeSharedSendPacket();
                }
                
                if (this.connection.getMaintainTimeStats()) {
    				this.lastPacketSentTimeMs = System.currentTimeMillis();
    			}
            } catch (IOException ioEx) {
                throw SQLError.createCommunicationsException(this.connection,
                    this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
            }
        }
    

      

     private Buffer checkErrorPacket(int command) throws SQLException {
            //int statusCode = 0;
            Buffer resultPacket = null;
            this.serverStatus = 0;
    
            try {
                // Check return value, if we get a java.io.EOFException,
                // the server has gone away. We'll pass it on up the
                // exception chain and let someone higher up decide
                // what to do (barf, reconnect, etc).
                resultPacket = reuseAndReadPacket(this.reusablePacket);
            } catch (SQLException sqlEx) {
                // Don't wrap SQL Exceptions
                throw sqlEx;
            } catch (Exception fallThru) {
                throw SQLError.createCommunicationsException(this.connection,
                    this.lastPacketSentTimeMs, this.lastPacketReceivedTimeMs, fallThru, getExceptionInterceptor());
            }
    
            checkErrorPacket(resultPacket);
    
            return resultPacket;
        }

    --------------------------------------------------------

    在JDBC驱动的sql请求数据发送给mysql服务器的过程中,可以配置是否采用压缩,压缩采用的是java.util.zip当中的

    java.util.zip
    类 Deflater

    此类使用流行的 ZLIB 压缩程序库为通用压缩提供支持。ZLIB 压缩程序库最初是作为 PNG 图形标准的一部分开发的,不受专利的保护。有关该规范的完整描述,请参见 java.util.zip 包描述。 

    协议格式:

    packetToSend.writeLongInt(packetLen - HEADER_LENGTH);
    packetToSend.writeByte(this.packetSequence);

    用于处理TCP粘包的问题,从而MySQL服务器将不同的sql请求分离开来。

    我们来看一下JDBC驱动程序中的Connection的相关类图

    java.sql.Connection---->com.mysql.jdbc.Connection----->com.mysql.jdbc.MySQLConnection----->com.mysql.jdbc.ConnectionImpl---->JDBC

    4Connection

  • 相关阅读:
    General procedures for upgrading Debian
    Install documentation for GCC on Debian Buster
    本地学习环境minikube安装
    解决phpstorm中ftp读取远程目录出现嵌套循环情况
    Linux 文件句柄&文件描述符
    VBA如何实现筛选条件之“排除某些值”
    利用Python多线程快速爬取某网站数据
    利用云服务器搭建远程办公访问(frp实现内网穿透)
    printf()函数
    malloc和calloc的区别
  • 原文地址:https://www.cnblogs.com/wuxinliulei/p/5166729.html
Copyright © 2011-2022 走看看