zoukankan      html  css  js  c++  java
  • 初步学习pg_control文件之十四

    接前文 初步学习pg_control文件之十三

    看如下几个:

    /*
         * Parameter settings that determine if the WAL can be used for archival
         * or hot standby.
         */
        int            wal_level;
        int            MaxConnections;
        int            max_prepared_xacts;
        int            max_locks_per_xact;

    PostgreSQL中多次用到了函数数组:

    /*                                        
     * Method table for resource managers.                                        
     *                                        
     * RmgrTable[] is indexed by RmgrId values (see rmgr.h).                                        
     */                                        
    typedef struct RmgrData                                        
    {                                        
        const char *rm_name;                                    
        void        (*rm_redo) (XLogRecPtr lsn, XLogRecord *rptr);                            
        void        (*rm_desc) (StringInfo buf, uint8 xl_info, char *rec);                            
        void        (*rm_startup) (void);                            
        void        (*rm_cleanup) (void);                            
        bool        (*rm_safe_restartpoint) (void);                            
    } RmgrData;                                        
                                            
                                            
    const RmgrData RmgrTable[RM_MAX_ID + 1] = {                                        
        {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},                                    
        {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL},                                    
        {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL},                                    
        {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL},                                    
        {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},                                    
        {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},                                    
        {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},                                    
        {"RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL},                                    
        {"Standby", standby_redo, standby_desc, NULL, NULL, NULL},                                    
        {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},                                    
        {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},                                    
        {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, 
        {"Hash", hash_redo, hash_desc, NULL, NULL, NULL},                                    
        {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},   
        {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, 
        {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}                                    
    };                                        

    看代码:

    /*                                            
     * This must be called ONCE during postmaster or standalone-backend startup                                            
     */                                            
    void                                            
    StartupXLOG(void)                                            
    {                                            
        …                                        
        /*                                        
         * Check whether we need to force recovery from WAL.  If it appears to    
         * have been a clean shutdown and we did not have a recovery.conf file, 
         * then assume no recovery needed.                                        
         */                                        
        if (XLByteLT(checkPoint.redo, RecPtr))                                        
        {                                        
            if (wasShutdown)                                    
                ereport(PANIC,                                
                        (errmsg("invalid redo record in shutdown checkpoint")));                        
            InRecovery = true;                                    
        }                                        
        else if (ControlFile->state != DB_SHUTDOWNED)                                        
            InRecovery = true;                                    
        else if (InArchiveRecovery)                                        
        {                                        
            /* force recovery due to presence of recovery.conf */                                    
            InRecovery = true;                                    
        }                                        
                                                
        /* REDO */                                        
        if (InRecovery)                                        
        {                                        
            …                                    
            /*                                    
             * Find the first record that logically follows the checkpoint --- it 
             * might physically precede it, though.                                    
             */                                    
            if (XLByteLT(checkPoint.redo, RecPtr))                                    
            {                                    
                /* back up to find the record */                                
                record = ReadRecord(&(checkPoint.redo), PANIC, false);                                
            }                                    
            else                                    
            {                                    
                /* just have to read next record after CheckPoint */                                
                record = ReadRecord(NULL, LOG, false);                                
            }                                    
                                                
            if (record != NULL)                                    
            {                                    
                bool        recoveryContinue = true;                        
                bool        recoveryApply = true;                        
                bool        recoveryPause = false;                        
                ErrorContextCallback errcontext;                                
                TimestampTz xtime;                                
                                                
                InRedo = true;                                
                                                
                ereport(LOG,                                
                        (errmsg("redo starts at %X/%X",                        
                                ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));                
                                                
                /*                                
                 * main redo apply loop                                
                 */                                
                do                                
                {                                
                    #ifdef WAL_DEBUG                            
                    if (XLOG_DEBUG ||                            
                     (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||                            
                        (rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))                        
                    {                            
                        StringInfoData buf;                        
                                                
                        initStringInfo(&buf);                        
                        appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",                        
                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,        
                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);        
                        xlog_outrec(&buf, record);                        
                        appendStringInfo(&buf, " - ");                        
                        RmgrTable[record->xl_rmid].rm_desc(&buf,                        
                                           record->xl_info,        
                                         XLogRecGetData(record));        
                        elog(LOG, "%s", buf.data);                        
                        pfree(buf.data);                        
                    }                            
                    #endif                            
                                                
                    /* Handle interrupt signals of startup process */                            
                    HandleStartupProcInterrupts();                            
                                                
                    /* Allow read-only connections if we're consistent now */                            
                    CheckRecoveryConsistency();                            
                                                
                    /*                            
                     * Have we reached our recovery target?                            
                     */                            
                    if (recoveryStopsHere(record, &recoveryApply))                            
                    {                            
                        /*                        
                         * Pause only if users can connect to send a resume                        
                         * message                        
                         */                        
                        if (recoveryPauseAtTarget && standbyState == STANDBY_SNAPSHOT_READY)                        
                        {                        
                            SetRecoveryPause(true);                    
                            recoveryPausesHere();                    
                        }                        
                        reachedStopPoint = true;    /* see below */                    
                        recoveryContinue = false;                        
                        if (!recoveryApply)                        
                            break;                    
                    }                            
                                                
                    /* Setup error traceback support for ereport() */                            
                    errcontext.callback = rm_redo_error_callback;                            
                    errcontext.arg = (void *) record;                            
                    errcontext.previous = error_context_stack;                            
                    error_context_stack = &errcontext;                            
                                                
                    /* nextXid must be beyond record's xid */                            
                    if (TransactionIdFollowsOrEquals(record->xl_xid,                            
                                     ShmemVariableCache->nextXid))            
                    {                            
                        ShmemVariableCache->nextXid = record->xl_xid;                        
                        TransactionIdAdvance(ShmemVariableCache->nextXid);                        
                    }                            
                                                
                    /*                            
                     * Update shared replayEndRecPtr before replaying this record,                            
                     * so that XLogFlush will update minRecoveryPoint correctly.                            
                     */                            
                    SpinLockAcquire(&xlogctl->info_lck);                            
                    xlogctl->replayEndRecPtr = EndRecPtr;                            
                    recoveryPause = xlogctl->recoveryPause;                            
                    SpinLockRelease(&xlogctl->info_lck);                            
                                                
                    /*                            
                     * Pause only if users can connect to send a resume message                            
                     */                            
                    if (recoveryPause && standbyState == STANDBY_SNAPSHOT_READY)                            
                        recoveryPausesHere();                        
                                                
                    /*                            
                     * If we are attempting to enter Hot Standby mode, process                            
                     * XIDs we see                            
                     */                            
                    if (standbyState >= STANDBY_INITIALIZED &&                            
                        TransactionIdIsValid(record->xl_xid))                        
                        RecordKnownAssignedTransactionIds(record->xl_xid);                        
                                                
                    RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);                            
                                                
                    /* Pop the error context stack */                            
                    error_context_stack = errcontext.previous;                            
                                                
                    /*                            
                     * Update shared recoveryLastRecPtr after this record has been                            
                     * replayed.                            
                     */                            
                    SpinLockAcquire(&xlogctl->info_lck);                            
                    xlogctl->recoveryLastRecPtr = EndRecPtr;                            
                    SpinLockRelease(&xlogctl->info_lck);                            
                                                
                    LastRec = ReadRecPtr;                            
                                                
                    record = ReadRecord(NULL, LOG, false);                            
                } while (record != NULL && recoveryContinue);                                
                                                
                /*                                
                 * end of main redo apply loop                                
                 */                                
                                                
                ereport(LOG,                                
                    (errmsg("redo done at %X/%X",                            
                            ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));                    
                                                
                xtime = GetLatestXTime();                                
                if (xtime)                                
                    ereport(LOG,                            
                         (errmsg("last completed transaction was at log time %s",                        
                                 timestamptz_to_str(xtime))));                
                InRedo = false;                                
            }                                    
            else                                    
            {                                    
                /* there are no WAL records following the checkpoint */                                
                ereport(LOG,                                
                        (errmsg("redo is not required")));                        
            }                                    
        }                                        
                                                
        …                                        
    }                                            

    经过分析可以发现,在Startup_XLOG函数中,通过 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);

    调用了各个redo函数,其中包括: xlog_redo:

    /*                                    
     * XLOG resource manager's routines                                    
     *                                    
     * Definitions of info values are in include/catalog/pg_control.h, though                                    
     * not all record types are related to control file updates.                                    
     */                                    
    void                                    
    xlog_redo(XLogRecPtr lsn, XLogRecord *record)                                    
    {                                    
        …                                
        if (info == XLOG_NEXTOID)                                
        {                                
            …                            
        }                                
        …                                
        else if (info == XLOG_PARAMETER_CHANGE)                                
        {                                
            xl_parameter_change xlrec;                            
                                        
            /* Update our copy of the parameters in pg_control */                            
            memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));                            
                                        
            LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);                            
            ControlFile->MaxConnections = xlrec.MaxConnections;                            
            ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;                            
            ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;                            
            ControlFile->wal_level = xlrec.wal_level; 
            …                            
            UpdateControlFile();                            
            LWLockRelease(ControlFileLock);                            
                                        
            /* Check to see if any changes to max_connections give problems */                            
            CheckRequiredParameterValues();                            
        }                                
    }                                    

    看这段话:是说找到checkpoint 后面开始的第一条记录的位置。

            /*                                    
             * Find the first record that logically follows the checkpoint --- it 
             * might physically precede it, though.                                    
             */  
  • 相关阅读:
    TTL与RS-485电平转换芯片MAX485/MAX3485
    RS485芯片介绍及典型应用电路
    脉冲电能表的组成及脉冲装置工作原理
    django-redis的安装及使用
    Python折线图——机器人UPtime Trend Chart
    ASP.Net DropDownList控件的使用方法
    C# ASP.Net数据库连接(Oracle)
    django根据已有数据库表生成model类
    Python Outlook发送邮件
    oracle将excel数据导入数据库
  • 原文地址:https://www.cnblogs.com/gaojian/p/3232181.html
Copyright © 2011-2022 走看看