看如下几个:
/* * Parameter settings that determine if the WAL can be used for archival * or hot standby. */ int wal_level; int MaxConnections; int max_prepared_xacts; int max_locks_per_xact;
PostgreSQL中多次用到了函数数组:
/* * Method table for resource managers. * * RmgrTable[] is indexed by RmgrId values (see rmgr.h). */ typedef struct RmgrData { const char *rm_name; void (*rm_redo) (XLogRecPtr lsn, XLogRecord *rptr); void (*rm_desc) (StringInfo buf, uint8 xl_info, char *rec); void (*rm_startup) (void); void (*rm_cleanup) (void); bool (*rm_safe_restartpoint) (void); } RmgrData; const RmgrData RmgrTable[RM_MAX_ID + 1] = { {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL}, {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL}, {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL}, {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL}, {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL}, {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL}, {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL}, {"RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL}, {"Standby", standby_redo, standby_desc, NULL, NULL, NULL}, {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL}, {"Heap", heap_redo, heap_desc, NULL, NULL, NULL}, {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} };
看代码:
/* * This must be called ONCE during postmaster or standalone-backend startup */ void StartupXLOG(void) { … /* * Check whether we need to force recovery from WAL. If it appears to * have been a clean shutdown and we did not have a recovery.conf file, * then assume no recovery needed. */ if (XLByteLT(checkPoint.redo, RecPtr)) { if (wasShutdown) ereport(PANIC, (errmsg("invalid redo record in shutdown checkpoint"))); InRecovery = true; } else if (ControlFile->state != DB_SHUTDOWNED) InRecovery = true; else if (InArchiveRecovery) { /* force recovery due to presence of recovery.conf */ InRecovery = true; } /* REDO */ if (InRecovery) { … /* * Find the first record that logically follows the checkpoint --- it * might physically precede it, though. */ if (XLByteLT(checkPoint.redo, RecPtr)) { /* back up to find the record */ record = ReadRecord(&(checkPoint.redo), PANIC, false); } else { /* just have to read next record after CheckPoint */ record = ReadRecord(NULL, LOG, false); } if (record != NULL) { bool recoveryContinue = true; bool recoveryApply = true; bool recoveryPause = false; ErrorContextCallback errcontext; TimestampTz xtime; InRedo = true; ereport(LOG, (errmsg("redo starts at %X/%X", ReadRecPtr.xlogid, ReadRecPtr.xrecoff))); /* * main redo apply loop */ do { #ifdef WAL_DEBUG if (XLOG_DEBUG || (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) || (rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3)) { StringInfoData buf; initStringInfo(&buf); appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ", ReadRecPtr.xlogid, ReadRecPtr.xrecoff, EndRecPtr.xlogid, EndRecPtr.xrecoff); xlog_outrec(&buf, record); appendStringInfo(&buf, " - "); RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, XLogRecGetData(record)); elog(LOG, "%s", buf.data); pfree(buf.data); } #endif /* Handle interrupt signals of startup process */ HandleStartupProcInterrupts(); /* Allow read-only connections if we're consistent now */ CheckRecoveryConsistency(); /* * Have we reached our recovery target? */ if (recoveryStopsHere(record, &recoveryApply)) { /* * Pause only if users can connect to send a resume * message */ if (recoveryPauseAtTarget && standbyState == STANDBY_SNAPSHOT_READY) { SetRecoveryPause(true); recoveryPausesHere(); } reachedStopPoint = true; /* see below */ recoveryContinue = false; if (!recoveryApply) break; } /* Setup error traceback support for ereport() */ errcontext.callback = rm_redo_error_callback; errcontext.arg = (void *) record; errcontext.previous = error_context_stack; error_context_stack = &errcontext; /* nextXid must be beyond record's xid */ if (TransactionIdFollowsOrEquals(record->xl_xid, ShmemVariableCache->nextXid)) { ShmemVariableCache->nextXid = record->xl_xid; TransactionIdAdvance(ShmemVariableCache->nextXid); } /* * Update shared replayEndRecPtr before replaying this record, * so that XLogFlush will update minRecoveryPoint correctly. */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->replayEndRecPtr = EndRecPtr; recoveryPause = xlogctl->recoveryPause; SpinLockRelease(&xlogctl->info_lck); /* * Pause only if users can connect to send a resume message */ if (recoveryPause && standbyState == STANDBY_SNAPSHOT_READY) recoveryPausesHere(); /* * If we are attempting to enter Hot Standby mode, process * XIDs we see */ if (standbyState >= STANDBY_INITIALIZED && TransactionIdIsValid(record->xl_xid)) RecordKnownAssignedTransactionIds(record->xl_xid); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); /* Pop the error context stack */ error_context_stack = errcontext.previous; /* * Update shared recoveryLastRecPtr after this record has been * replayed. */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->recoveryLastRecPtr = EndRecPtr; SpinLockRelease(&xlogctl->info_lck); LastRec = ReadRecPtr; record = ReadRecord(NULL, LOG, false); } while (record != NULL && recoveryContinue); /* * end of main redo apply loop */ ereport(LOG, (errmsg("redo done at %X/%X", ReadRecPtr.xlogid, ReadRecPtr.xrecoff))); xtime = GetLatestXTime(); if (xtime) ereport(LOG, (errmsg("last completed transaction was at log time %s", timestamptz_to_str(xtime)))); InRedo = false; } else { /* there are no WAL records following the checkpoint */ ereport(LOG, (errmsg("redo is not required"))); } } … }
经过分析可以发现,在Startup_XLOG函数中,通过 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
调用了各个redo函数,其中包括: xlog_redo:
/* * XLOG resource manager's routines * * Definitions of info values are in include/catalog/pg_control.h, though * not all record types are related to control file updates. */ void xlog_redo(XLogRecPtr lsn, XLogRecord *record) { … if (info == XLOG_NEXTOID) { … } … else if (info == XLOG_PARAMETER_CHANGE) { xl_parameter_change xlrec; /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts; ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; … UpdateControlFile(); LWLockRelease(ControlFileLock); /* Check to see if any changes to max_connections give problems */ CheckRequiredParameterValues(); } }
看这段话:是说找到checkpoint 后面开始的第一条记录的位置。
/* * Find the first record that logically follows the checkpoint --- it * might physically precede it, though. */