zoukankan      html  css  js  c++  java
  • 对PostgreSQL xmin的深入学习

    当PostgreSQL需要insert 一条记录的时候,它会把记录头放入xmin,xmax等字段。

    xmin的值,就是当前的Transaction的TransactionId。这是为了满足MVCC的需要。

    跟踪程序进行了解:

    /*
     * Allocate the next XID for a new transaction or subtransaction.
     *
     * The new XID is also stored into MyProc before returning.
     *
     * Note: when this is called, we are actually already inside a valid
     * transaction, since XIDs are now not allocated until the transaction
     * does something.    So it is safe to do a database lookup if we want to
     * issue a warning about XID wrap.
     */
    TransactionId
    GetNewTransactionId(bool isSubXact)
    {
        TransactionId xid;
    
        /*
         * During bootstrap initialization, we return the special bootstrap
         * transaction id.
         */
        if (IsBootstrapProcessingMode())
        {
            Assert(!isSubXact);
            MyProc->xid = BootstrapTransactionId;
            return BootstrapTransactionId;
        }
    
        /* safety check, we should never get this far in a HS slave */
        if (RecoveryInProgress())
            elog(ERROR, "cannot assign TransactionIds during recovery");
    
        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
    
        xid = ShmemVariableCache->nextXid;
    
        //fprintf(stderr,"In GetNewTransactionId--------1, xid is :%d
    ",xid);
    
        /*----------
         * Check to see if it's safe to assign another XID.  This protects against
         * catastrophic data loss due to XID wraparound.  The basic rules are:
         *
         * If we're past xidVacLimit, start trying to force autovacuum cycles.
         * If we're past xidWarnLimit, start issuing warnings.
         * If we're past xidStopLimit, refuse to execute transactions, unless
         * we are running in a standalone backend (which gives an escape hatch
         * to the DBA who somehow got past the earlier defenses).
         *----------
         */
        if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
        {
            /*
             * For safety's sake, we release XidGenLock while sending signals,
             * warnings, etc.  This is not so much because we care about
             * preserving concurrency in this situation, as to avoid any
             * possibility of deadlock while doing get_database_name(). First,
             * copy all the shared values we'll need in this path.
             */
            TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
            TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
            TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
            Oid            oldest_datoid = ShmemVariableCache->oldestXidDB;
    
            LWLockRelease(XidGenLock);
    
            /*
             * To avoid swamping the postmaster with signals, we issue the autovac
             * request only once per 64K transaction starts.  This still gives
             * plenty of chances before we get into real trouble.
             */
            if (IsUnderPostmaster && (xid % 65536) == 0)
                SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
    
            if (IsUnderPostmaster &&
                TransactionIdFollowsOrEquals(xid, xidStopLimit))
            {
                char       *oldest_datname = get_database_name(oldest_datoid);
    
                /* complain even if that DB has disappeared */
                if (oldest_datname)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                             errmsg("database is not accepting commands to avoid wraparound data loss in database "%s"",
                                    oldest_datname),
                             errhint("Stop the postmaster and use a standalone backend to vacuum that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                             errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
                                    oldest_datoid),
                             errhint("Stop the postmaster and use a standalone backend to vacuum that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
            }
            else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
            {
                char       *oldest_datname = get_database_name(oldest_datoid);
    
                /* complain even if that DB has disappeared */
                if (oldest_datname)
                    ereport(WARNING,
                            (errmsg("database "%s" must be vacuumed within %u transactions",
                                    oldest_datname,
                                    xidWrapLimit - xid),
                             errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
                else
                    ereport(WARNING,
                            (errmsg("database with OID %u must be vacuumed within %u transactions",
                                    oldest_datoid,
                                    xidWrapLimit - xid),
                             errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
            }
    
            /* Re-acquire lock and start over */
            LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
            xid = ShmemVariableCache->nextXid;
        }
    
        /*
         * If we are allocating the first XID of a new page of the commit log,
         * zero out that commit-log page before returning. We must do this while
         * holding XidGenLock, else another xact could acquire and commit a later
         * XID before we zero the page.  Fortunately, a page of the commit log
         * holds 32K or more transactions, so we don't have to do this very often.
         *
         * Extend pg_subtrans too.
         */
        ExtendCLOG(xid);
        ExtendSUBTRANS(xid);
    
        /*
         * Now advance the nextXid counter.  This must not happen until after we
         * have successfully completed ExtendCLOG() --- if that routine fails, we
         * want the next incoming transaction to try it again.    We cannot assign
         * more XIDs until there is CLOG space for them.
         */
        TransactionIdAdvance(ShmemVariableCache->nextXid);
    
        /*
         * We must store the new XID into the shared ProcArray before releasing
         * XidGenLock.    This ensures that every active XID older than
         * latestCompletedXid is present in the ProcArray, which is essential for
         * correct OldestXmin tracking; see src/backend/access/transam/README.
         *
         * XXX by storing xid into MyProc without acquiring ProcArrayLock, we are
         * relying on fetch/store of an xid to be atomic, else other backends
         * might see a partially-set xid here.    But holding both locks at once
         * would be a nasty concurrency hit.  So for now, assume atomicity.
         *
         * Note that readers of PGPROC xid fields should be careful to fetch the
         * value only once, rather than assume they can read a value multiple
         * times and get the same answer each time.
         *
         * The same comments apply to the subxact xid count and overflow fields.
         *
         * A solution to the atomic-store problem would be to give each PGPROC its
         * own spinlock used only for fetching/storing that PGPROC's xid and
         * related fields.
         *
         * If there's no room to fit a subtransaction XID into PGPROC, set the
         * cache-overflowed flag instead.  This forces readers to look in
         * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
         * race-condition window, in that the new XID will not appear as running
         * until its parent link has been placed into pg_subtrans. However, that
         * will happen before anyone could possibly have a reason to inquire about
         * the status of the XID, so it seems OK.  (Snapshots taken during this
         * window *will* include the parent XID, so they will deliver the correct
         * answer later on when someone does have a reason to inquire.)
         */
        {
            /*
             * Use volatile pointer to prevent code rearrangement; other backends
             * could be examining my subxids info concurrently, and we don't want
             * them to see an invalid intermediate state, such as incrementing
             * nxids before filling the array entry.  Note we are assuming that
             * TransactionId and int fetch/store are atomic.
             */
            volatile PGPROC *myproc = MyProc;
    
            if (!isSubXact)
                myproc->xid = xid;
            else
            {
                int            nxids = myproc->subxids.nxids;
    
                if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
                {
                    myproc->subxids.xids[nxids] = xid;
                    myproc->subxids.nxids = nxids + 1;
                }
                else
                    myproc->subxids.overflowed = true;
            }
        }
    
        LWLockRelease(XidGenLock);
    
        //fprintf(stderr,"In GetNewTransactionId--------2, xid is :%d
    ",xid);
    
        return xid;
    }
    函数 GetNewTransactionId 为 AssignTransactionId 调用:
    /*
     * AssignTransactionId
     *
     * Assigns a new permanent XID to the given TransactionState.
     * We do not assign XIDs to transactions until/unless this is called.
     * Also, any parent TransactionStates that don't yet have XIDs are assigned
     * one; this maintains the invariant that a child transaction has an XID
     * following its parent's.
     */
    static void
    AssignTransactionId(TransactionState s)
    {
    
        fprintf(stderr,"---------------------In AssignTransactionId
    ");
        bool        isSubXact = (s->parent != NULL);
        ResourceOwner currentOwner;
    
        /* Assert that caller didn't screw up */
        Assert(!TransactionIdIsValid(s->transactionId));
        Assert(s->state == TRANS_INPROGRESS);
    
        /*
         * Ensure parent(s) have XIDs, so that a child always has an XID later
         * than its parent.  Musn't recurse here, or we might get a stack overflow
         * if we're at the bottom of a huge stack of subtransactions none of which
         * have XIDs yet.
         */
        if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
        {
            TransactionState p = s->parent;
            TransactionState *parents;
            size_t        parentOffset = 0;
    
            parents = palloc(sizeof(TransactionState) * s->nestingLevel);
            while (p != NULL && !TransactionIdIsValid(p->transactionId))
            {
                parents[parentOffset++] = p;
                p = p->parent;
            }
    
            /*
             * This is technically a recursive call, but the recursion will never
             * be more than one layer deep.
             */
            while (parentOffset != 0)
                AssignTransactionId(parents[--parentOffset]);
    
            pfree(parents);
        }
    
        /*
         * Generate a new Xid and record it in PG_PROC and pg_subtrans.
         *
         * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
         * shared storage other than PG_PROC; because if there's no room for it in
         * PG_PROC, the subtrans entry is needed to ensure that other backends see
         * the Xid as "running".  See GetNewTransactionId.
         */
        s->transactionId = GetNewTransactionId(isSubXact);
    
        fprintf(stderr,"In AssignTransactionId transaction is: %d 
    ",s->transactionId);
    
        if (isSubXact)
            SubTransSetParent(s->transactionId, s->parent->transactionId, false);
    
        /*
         * If it's a top-level transaction, the predicate locking system needs to
         * be told about it too.
         */
        if (!isSubXact)
            RegisterPredicateLockingXid(s->transactionId);
    
        /*
         * Acquire lock on the transaction XID.  (We assume this cannot block.) We
         * have to ensure that the lock is assigned to the transaction's own
         * ResourceOwner.
         */
        currentOwner = CurrentResourceOwner;
        PG_TRY();
        {
            CurrentResourceOwner = s->curTransactionOwner;
            XactLockTableInsert(s->transactionId);
        }
        PG_CATCH();
        {
            /* Ensure CurrentResourceOwner is restored on error */
            CurrentResourceOwner = currentOwner;
            PG_RE_THROW();
        }
        PG_END_TRY();
        CurrentResourceOwner = currentOwner;
    
        /*
         * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each
         * top-level transaction we issue a WAL record for the assignment. We
         * include the top-level xid and all the subxids that have not yet been
         * reported using XLOG_XACT_ASSIGNMENT records.
         *
         * This is required to limit the amount of shared memory required in a hot
         * standby server to keep track of in-progress XIDs. See notes for
         * RecordKnownAssignedTransactionIds().
         *
         * We don't keep track of the immediate parent of each subxid, only the
         * top-level transaction that each subxact belongs to. This is correct in
         * recovery only because aborted subtransactions are separately WAL
         * logged.
         */
        if (isSubXact && XLogStandbyInfoActive())
        {
            unreportedXids[nUnreportedXids] = s->transactionId;
            nUnreportedXids++;
    
            /*
             * ensure this test matches similar one in
             * RecoverPreparedTransactions()
             */
            if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
            {
                XLogRecData rdata[2];
                xl_xact_assignment xlrec;
    
                /*
                 * xtop is always set by now because we recurse up transaction
                 * stack to the highest unassigned xid and then come back down
                 */
                xlrec.xtop = GetTopTransactionId();
                Assert(TransactionIdIsValid(xlrec.xtop));
                xlrec.nsubxacts = nUnreportedXids;
    
                rdata[0].data = (char *) &xlrec;
                rdata[0].len = MinSizeOfXactAssignment;
                rdata[0].buffer = InvalidBuffer;
                rdata[0].next = &rdata[1];
    
                rdata[1].data = (char *) unreportedXids;
                rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
                rdata[1].buffer = InvalidBuffer;
                rdata[1].next = NULL;
    
                (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
    
                nUnreportedXids = 0;
            }
        }
    }

    而  AssignTransactionId 函数,为  所调用

    /*
     *    GetCurrentTransactionId
     *
     * This will return the XID of the current transaction (main or sub
     * transaction), assigning one if it's not yet set.  Be careful to call this
     * only inside a valid xact.
     */
    TransactionId
    GetCurrentTransactionId(void)
    {
        TransactionState s = CurrentTransactionState;
        if (!TransactionIdIsValid(s->transactionId))
            fprintf(stderr,"transaction id invalid.......
    ");
        else
            fprintf(stderr,"transaction id OK!.......
    ");
    if (!TransactionIdIsValid(s->transactionId)) AssignTransactionId(s);
    return s->transactionId; }

    而 GetCurrentTransactionId 函数为 heap_insert  函数所调用:

    /*
     *    heap_insert        - insert tuple into a heap
     *
     * The new tuple is stamped with current transaction ID and the specified
     * command ID.
     *
     * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
     * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
     * requires that we arrange that all new tuples go into new pages not
     * containing any tuples from other transactions, and that the relation gets
     * fsync'd before commit.  (See also heap_sync() comments)
     *
     * The HEAP_INSERT_SKIP_FSM option is passed directly to
     * RelationGetBufferForTuple, which see for more info.
     *
     * Note that these options will be applied when inserting into the heap's
     * TOAST table, too, if the tuple requires any out-of-line data.
     *
     * The BulkInsertState object (if any; bistate can be NULL for default
     * behavior) is also just passed through to RelationGetBufferForTuple.
     *
     * The return value is the OID assigned to the tuple (either here or by the
     * caller), or InvalidOid if no OID.  The header fields of *tup are updated
     * to match the stored tuple; in particular tup->t_self receives the actual
     * TID where the tuple was stored.    But note that any toasting of fields
     * within the tuple data is NOT reflected into *tup.
     */
    Oid
    heap_insert(Relation relation, HeapTuple tup, CommandId cid,
                int options, BulkInsertState bistate)
    {
    
        fprintf(stderr,"In heap_insert------------------------------1
    ");
    
        TransactionId xid = GetCurrentTransactionId();
    
        fprintf(stderr,"xid is :%d.......
    ",(int)xid);
    
        HeapTuple    heaptup;
        Buffer        buffer;
        bool        all_visible_cleared = false;
    
        if (relation->rd_rel->relhasoids)
        {
    #ifdef NOT_USED
            /* this is redundant with an Assert in HeapTupleSetOid */
            Assert(tup->t_data->t_infomask & HEAP_HASOID);
    #endif
    
            /*
             * If the object id of this tuple has already been assigned, trust the
             * caller.    There are a couple of ways this can happen.  At initial db
             * creation, the backend program sets oids for tuples. When we define
             * an index, we set the oid.  Finally, in the future, we may allow
             * users to set their own object ids in order to support a persistent
             * object store (objects need to contain pointers to one another).
             */
            if (!OidIsValid(HeapTupleGetOid(tup)))
                HeapTupleSetOid(tup, GetNewOid(relation));
        }
        else
        {
            /* check there is not space for an OID */
            Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
        }
    
        tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
        tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
        tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
        HeapTupleHeaderSetXmin(tup->t_data, xid);
        HeapTupleHeaderSetCmin(tup->t_data, cid);
        HeapTupleHeaderSetXmax(tup->t_data, 0);        /* for cleanliness */
        tup->t_tableOid = RelationGetRelid(relation);
    
        /*
         * If the new tuple is too big for storage or contains already toasted
         * out-of-line attributes from some other relation, invoke the toaster.
         *
         * Note: below this point, heaptup is the data we actually intend to store
         * into the relation; tup is the caller's original untoasted data.
         */
        if (relation->rd_rel->relkind != RELKIND_RELATION)
        {
            /* toast table entries should never be recursively toasted */
            Assert(!HeapTupleHasExternal(tup));
            heaptup = tup;
        }
        else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
            heaptup = toast_insert_or_update(relation, tup, NULL, options);
        else
            heaptup = tup;
    
        /*
         * We're about to do the actual insert -- but check for conflict first,
         * to avoid possibly having to roll back work we've just done.
         *
         * For a heap insert, we only need to check for table-level SSI locks.
         * Our new tuple can't possibly conflict with existing tuple locks, and
         * heap page locks are only consolidated versions of tuple locks; they do
         * not lock "gaps" as index page locks do.  So we don't need to identify
         * a buffer before making the call.
         */
        CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
    
        /* Find buffer to insert this tuple into */
        buffer = RelationGetBufferForTuple(relation, heaptup->t_len,
                                           InvalidBuffer, options, bistate);
    
        /* NO EREPORT(ERROR) from here till changes are logged */
        START_CRIT_SECTION();
    
        RelationPutHeapTuple(relation, buffer, heaptup);
    
        if (PageIsAllVisible(BufferGetPage(buffer)))
        {
            all_visible_cleared = true;
            PageClearAllVisible(BufferGetPage(buffer));
        }
    
        /*
         * XXX Should we set PageSetPrunable on this page ?
         *
         * The inserting transaction may eventually abort thus making this tuple
         * DEAD and hence available for pruning. Though we don't want to optimize
         * for aborts, if no other tuple in this page is UPDATEd/DELETEd, the
         * aborted tuple will never be pruned until next vacuum is triggered.
         *
         * If you do add PageSetPrunable here, add it in heap_xlog_insert too.
         */
    
        MarkBufferDirty(buffer);
    
        /* XLOG stuff */
        if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
        {
            xl_heap_insert xlrec;
            xl_heap_header xlhdr;
            XLogRecPtr    recptr;
            XLogRecData rdata[3];
            Page        page = BufferGetPage(buffer);
            uint8        info = XLOG_HEAP_INSERT;
    
            xlrec.all_visible_cleared = all_visible_cleared;
            xlrec.target.node = relation->rd_node;
            xlrec.target.tid = heaptup->t_self;
            rdata[0].data = (char *) &xlrec;
            rdata[0].len = SizeOfHeapInsert;
            rdata[0].buffer = InvalidBuffer;
            rdata[0].next = &(rdata[1]);
    
            xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
            xlhdr.t_infomask = heaptup->t_data->t_infomask;
            xlhdr.t_hoff = heaptup->t_data->t_hoff;
    
            /*
             * note we mark rdata[1] as belonging to buffer; if XLogInsert decides
             * to write the whole page to the xlog, we don't need to store
             * xl_heap_header in the xlog.
             */
            rdata[1].data = (char *) &xlhdr;
            rdata[1].len = SizeOfHeapHeader;
            rdata[1].buffer = buffer;
            rdata[1].buffer_std = true;
            rdata[1].next = &(rdata[2]);
    
            /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
            rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
            rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
            rdata[2].buffer = buffer;
            rdata[2].buffer_std = true;
            rdata[2].next = NULL;
    
            /*
             * If this is the single and first tuple on page, we can reinit the
             * page instead of restoring the whole thing.  Set flag, and hide
             * buffer references from XLogInsert.
             */
            if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
                PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
            {
                info |= XLOG_HEAP_INIT_PAGE;
                rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
            }
    
            recptr = XLogInsert(RM_HEAP_ID, info, rdata);
    
            PageSetLSN(page, recptr);
            PageSetTLI(page, ThisTimeLineID);
        }
    
        END_CRIT_SECTION();
    
        UnlockReleaseBuffer(buffer);
    
        /* Clear the bit in the visibility map if necessary */
        if (all_visible_cleared)
            visibilitymap_clear(relation,
                                ItemPointerGetBlockNumber(&(heaptup->t_self)));
    
        /*
         * If tuple is cachable, mark it for invalidation from the caches in case
         * we abort.  Note it is OK to do this after releasing the buffer, because
         * the heaptup data structure is all in local memory, not in the shared
         * buffer.
         */
        CacheInvalidateHeapTuple(relation, heaptup);
    
        pgstat_count_heap_insert(relation);
    
        /*
         * If heaptup is a private copy, release it.  Don't forget to copy t_self
         * back to the caller's image, too.
         */
        if (heaptup != tup)
        {
            tup->t_self = heaptup->t_self;
            heap_freetuple(heaptup);
        }
    
        fprintf(stderr,"In heap_insert------------------------------2
    ");
        return HeapTupleGetOid(tup);
    }

    而heap_insert 函数为   所调用:

    /* ----------------------------------------------------------------
     *        ExecInsert
     *
     *        For INSERT, we have to insert the tuple into the target relation
     *        and insert appropriate tuples into the index relations.
     *
     *        Returns RETURNING result if any, otherwise NULL.
     * ----------------------------------------------------------------
     */
    static TupleTableSlot *
    ExecInsert(TupleTableSlot *slot,
               TupleTableSlot *planSlot,
               EState *estate,
               bool canSetTag)
    {
        HeapTuple    tuple;
        ResultRelInfo *resultRelInfo;
        Relation    resultRelationDesc;
        Oid            newId;
        List       *recheckIndexes = NIL;
    
        /**
        if (slot == NULL)
            fprintf(stderr,"---In ExecInsert...slot is null
    ");
        else
            fprintf(stderr,"---In ExecInsert...slot is not null
    ");
    
    
        fprintf(stderr,"IN ExecInsert-----------------------------100
    ");
    
        if (slot->tts_isempty)
            fprintf(stderr,"slot. tts_isempty!
    ");
        else
        {
            fprintf(stderr,"slot, tts not empty!
    ");
    
            HeapTuple htp = slot->tts_tuple;
    
            if (htp == NULL)
                fprintf(stderr,"htp is NULL
    ");
            else
                fprintf(stderr,"htp is NOT NULL
    ");
        }
    
        fprintf(stderr,"--------------------------------------------
    ");
        */
        //////////////////////////
    
        /*
         * get the heap tuple out of the tuple table slot, making sure we have a
         * writable copy
         */
        tuple = ExecMaterializeSlot(slot);
    
        fprintf(stderr,"--------------------------------------------150
    ");
    
        if (slot->tts_isempty)
            fprintf(stderr,"slot. tts_isempty!
    ");
        else
        {
            ///fprintf(stderr,"slot, tts not empty!
    ");
    
            HeapTuple htp = slot->tts_tuple;
    
            HeapTupleHeader theader = htp->t_data;
    
            if (theader == NULL)
                fprintf(stderr,"heap tuple header is NULL
    ");
            else{
    
                ////fprintf(stderr,"heap tuple header is NOT NULL
    ");
    
                HeapTupleFields htfds = theader->t_choice.t_heap;
    
                TransactionId txmin = htfds.t_xmin;
                TransactionId txmax = htfds.t_xmax;
    
                fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
    
            }
        }
    
        /*
         * get information on the (current) result relation
         */
        resultRelInfo = estate->es_result_relation_info;
        resultRelationDesc = resultRelInfo->ri_RelationDesc;
    
        /*
         * If the result relation has OIDs, force the tuple's OID to zero so that
         * heap_insert will assign a fresh OID.  Usually the OID already will be
         * zero at this point, but there are corner cases where the plan tree can
         * return a tuple extracted literally from some table with the same
         * rowtype.
         *
         * XXX if we ever wanted to allow users to assign their own OIDs to new
         * rows, this'd be the place to do it.  For the moment, we make a point of
         * doing this before calling triggers, so that a user-supplied trigger
         * could hack the OID if desired.
         */
        if (resultRelationDesc->rd_rel->relhasoids)
            HeapTupleSetOid(tuple, InvalidOid);
    
        /* BEFORE ROW INSERT Triggers */
        if (resultRelInfo->ri_TrigDesc &&
            resultRelInfo->ri_TrigDesc->trig_insert_before_row)
        {
            slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
    
            if (slot == NULL)        /* "do nothing" */
                return NULL;
    
            /* trigger might have changed tuple */
            tuple = ExecMaterializeSlot(slot);
        }
    
    
        fprintf(stderr,"--------------------------------------------200
    ");
    
        if (slot->tts_isempty)
            fprintf(stderr,"slot. tts_isempty!
    ");
        else
        {
            ///fprintf(stderr,"slot, tts not empty!
    ");
    
            HeapTuple htp = slot->tts_tuple;
    
            HeapTupleHeader theader = htp->t_data;
    
            if (theader == NULL)
                fprintf(stderr,"heap tuple header is NULL
    ");
            else{
    
                ////fprintf(stderr,"heap tuple header is NOT NULL
    ");
    
                HeapTupleFields htfds = theader->t_choice.t_heap;
    
                TransactionId txmin = htfds.t_xmin;
                TransactionId txmax = htfds.t_xmax;
    
                fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
    
            }
        }
    
        /* INSTEAD OF ROW INSERT Triggers */
        if (resultRelInfo->ri_TrigDesc &&
            resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
        {
            ////fprintf(stderr,"x--------------------------1
    ");
            slot = ExecIRInsertTriggers(estate, resultRelInfo, slot);
    
            if (slot == NULL)        /* "do nothing" */
                return NULL;
    
            /* trigger might have changed tuple */
            tuple = ExecMaterializeSlot(slot);
            newId = InvalidOid;
        }
        else
        {
            fprintf(stderr,"x--------------------------2
    ");
            /*
             * Check the constraints of the tuple
             */
            if (resultRelationDesc->rd_att->constr)
                ExecConstraints(resultRelInfo, slot, estate);
    
            fprintf(stderr,"IN ExecInsert-----------------------------210
    ");
    
            if (slot->tts_isempty)
                fprintf(stderr,"slot. tts_isempty!
    ");
            else
            {
                ///fprintf(stderr,"slot, tts not empty!
    ");
                HeapTuple htp = slot->tts_tuple;
    
                HeapTupleHeader theader = htp->t_data;
    
                if (theader == NULL)
                    fprintf(stderr,"heap tuple header is NULL
    ");
                else{
                   ///fprintf(stderr,"heap tuple header is NOT NULL
    ");
                    HeapTupleFields htfds = theader->t_choice.t_heap;
    
                    TransactionId txmin = htfds.t_xmin;
                    TransactionId txmax = htfds.t_xmax;
    
                    fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                    fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
                    /**
                    if ( htfds == NULL )
                        fprintf(stderr,"t_heap is NULL
    ");
                    else
                        fprintf(stderr,"t_heap is not NULL
    ");
                    */
    
                }
            }
    
            /*
             * insert the tuple
             *
             * Note: heap_insert returns the tid (location) of the new tuple in
             * the t_self field.
             */
            newId = heap_insert(resultRelationDesc, tuple,
                                estate->es_output_cid, 0, NULL);
    
            fprintf(stderr,"IN ExecInsert-----------------------------230
    ");
    
            if (slot->tts_isempty)
                fprintf(stderr,"slot. tts_isempty!
    ");
            else
            {
                ///fprintf(stderr,"slot, tts not empty!
    ");
    
                HeapTuple htp = slot->tts_tuple;
    
                HeapTupleHeader theader = htp->t_data;
    
                if (theader == NULL)
                    fprintf(stderr,"heap tuple header is NULL
    ");
                else{
                    ///fprintf(stderr,"heap tuple header is NOT NULL
    ");
                    HeapTupleFields htfds = theader->t_choice.t_heap;
                    TransactionId txmin = htfds.t_xmin;
                    TransactionId txmax = htfds.t_xmax;
    
                    fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                    fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
                    /**
                    if ( htfds == NULL )
                        fprintf(stderr,"t_heap is NULL
    ");
                    else
                        fprintf(stderr,"t_heap is not NULL
    ");
                    */
                }
            }
    
            /*
             * insert index entries for tuple
             */
            if (resultRelInfo->ri_NumIndices > 0)
                recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
                                                       estate);
        }
    
        fprintf(stderr,"IN ExecInsert-----------------------------250
    ");
    
        if (slot->tts_isempty)
            fprintf(stderr,"slot. tts_isempty!
    ");
        else
        {
            ///fprintf(stderr,"slot, tts not empty!
    ");
            HeapTuple htp = slot->tts_tuple;
            HeapTupleHeader theader = htp->t_data;
    
            if (theader == NULL)
                fprintf(stderr,"heap tuple header is NULL
    ");
            else{
                ///fprintf(stderr,"heap tuple header is NOT NULL
    ");
                HeapTupleFields htfds = theader->t_choice.t_heap;
                TransactionId txmin = htfds.t_xmin;
                TransactionId txmax = htfds.t_xmax;
                fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
                /**
                if ( htfds == NULL )
                    fprintf(stderr,"t_heap is NULL
    ");
                else
                    fprintf(stderr,"t_heap is not NULL
    ");
                */
    
            }
        }
    
        if (canSetTag)
        {
            (estate->es_processed)++;
            estate->es_lastoid = newId;
            setLastTid(&(tuple->t_self));
        }
    
        /* AFTER ROW INSERT Triggers */
        ExecARInsertTriggers(estate, resultRelInfo, tuple, recheckIndexes);
    
        fprintf(stderr,"IN ExecInsert-----------------------------300
    ");
    
        if (slot->tts_isempty)
            fprintf(stderr,"slot. tts_isempty!
    ");
        else
        {
            ///fprintf(stderr,"slot, tts not empty!
    ");
            HeapTuple htp = slot->tts_tuple;
            HeapTupleHeader theader = htp->t_data;
            if (theader == NULL)
                fprintf(stderr,"heap tuple header is NULL
    ");
            else{
                ///fprintf(stderr,"heap tuple header is NOT NULL
    ");
                HeapTupleFields htfds = theader->t_choice.t_heap;
    
                TransactionId txmin = htfds.t_xmin;
                TransactionId txmax = htfds.t_xmax;
    
                fprintf(stderr,"t_xmin is :%d ", (int)txmin );
                fprintf(stderr,"t_xmax is :%d 
    ", (int)txmax );
    
                /**
                if ( htfds == NULL )
                    fprintf(stderr,"t_heap is NULL
    ");
                else
                    fprintf(stderr,"t_heap is not NULL
    ");
                */
            }
        }
    
        list_free(recheckIndexes);
    
        /* Process RETURNING if present */
        if (resultRelInfo->ri_projectReturning)
            return ExecProcessReturning(resultRelInfo->ri_projectReturning,
                                        slot, planSlot);
    
        return NULL;
    }

     接着要看这个:

    typedef struct VariableCacheData
    {
        /*
         * These fields are protected by OidGenLock.
         */
        Oid            nextOid;        /* next OID to assign */
        uint32        oidCount;        /* OIDs available before must do XLOG work */
    
        /*
         * These fields are protected by XidGenLock.
         */
        TransactionId nextXid;        /* next XID to assign */
    
        TransactionId oldestXid;    /* cluster-wide minimum datfrozenxid */
        TransactionId xidVacLimit;    /* start forcing autovacuums here */
        TransactionId xidWarnLimit; /* start complaining here */
        TransactionId xidStopLimit; /* refuse to advance nextXid beyond here */
        TransactionId xidWrapLimit; /* where the world ends */
        Oid            oldestXidDB;    /* database with minimum datfrozenxid */
    
        /*
         * These fields are protected by ProcArrayLock.
         */
        TransactionId latestCompletedXid;    /* newest XID that has committed or
                                             * aborted */
    } VariableCacheData;
    
    typedef VariableCacheData *VariableCache;

     现在已经知道,xmin是从 ShmemVairableCache->nextXid得来。

    而且,即使数据库重新启动后,xmin也能在上一次的基础上继续增加。

    可以知道,对其应当是进行了初始化。

    而xmin的来源是 checkPoint数据,它是从数据库文件中来。

    目前所知:调用关系

    PostmasterMain-->StartupDataBase 

    StartupDataBase是一个宏,展开后是 : StartChildProcess(StartupProcess)

    StartupProcess-->StartupXLOG

    在StartupXLOG中,有如下的代码:

            /*
             * Get the last valid checkpoint record.  If the latest one according
             * to pg_control is broken, try the next-to-last one.
             */
            checkPointLoc = ControlFile->checkPoint;
            RedoStartLSN = ControlFile->checkPointCopy.redo;
            record = ReadCheckpointRecord(checkPointLoc, 1);
        ShmemVariableCache->nextXid = checkPoint.nextXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;

     为何说 ShmemVariableCache是在共享内存中呢,下面代码会有所启示:

    /*
     *    InitShmemAllocation() --- set up shared-memory space allocation.
     *
     * This should be called only in the postmaster or a standalone backend.
     */
    void
    InitShmemAllocation(void)
    {
    
        fprintf(stderr,"In InitShmemAllocation.....start by process %d
    ",getpid());
    
        PGShmemHeader *shmhdr = ShmemSegHdr;
    
        Assert(shmhdr != NULL);
    
        /*
         * Initialize the spinlock used by ShmemAlloc.    We have to do the space
         * allocation the hard way, since obviously ShmemAlloc can't be called
         * yet.
         */
        ShmemLock = (slock_t *) (((char *) shmhdr) + shmhdr->freeoffset);
        shmhdr->freeoffset += MAXALIGN(sizeof(slock_t));
        Assert(shmhdr->freeoffset <= shmhdr->totalsize);
    
        SpinLockInit(ShmemLock);
    
        /* ShmemIndex can't be set up yet (need LWLocks first) */
        shmhdr->index = NULL;
        ShmemIndex = (HTAB *) NULL;
    
        /*
         * Initialize ShmemVariableCache for transaction manager. (This doesn't
         * really belong here, but not worth moving.)
         */
        ShmemVariableCache = (VariableCache)
            ShmemAlloc(sizeof(*ShmemVariableCache));
        memset(ShmemVariableCache, 0, sizeof(*ShmemVariableCache));
    
        fprintf(stderr,"When Initiating, nextXid is: %d 
    ", (int)ShmemVariableCache->nextXid);
    
        fprintf(stderr,"In InitShmemAllocation.....end by process %d
    
    ",getpid());
    }

    而ShmemAlloc是关键:

    /*
     * ShmemAlloc -- allocate max-aligned chunk from shared memory
     *
     * Assumes ShmemLock and ShmemSegHdr are initialized.
     *
     * Returns: real pointer to memory or NULL if we are out
     *        of space.  Has to return a real pointer in order
     *        to be compatible with malloc().
     */
    void *
    ShmemAlloc(Size size)
    {
        Size        newStart;
        Size        newFree;
        void       *newSpace;
    
        /* use volatile pointer to prevent code rearrangement */
        volatile PGShmemHeader *shmemseghdr = ShmemSegHdr;
    
        /*
         * ensure all space is adequately aligned.
         */
        size = MAXALIGN(size);
    
        Assert(shmemseghdr != NULL);
    
        SpinLockAcquire(ShmemLock);
    
        newStart = shmemseghdr->freeoffset;
    
        /* extra alignment for large requests, since they are probably buffers */
        if (size >= BLCKSZ)
            newStart = BUFFERALIGN(newStart);
    
        newFree = newStart + size;
        if (newFree <= shmemseghdr->totalsize)
        {
            newSpace = (void *) ((char *) ShmemBase + newStart);
            shmemseghdr->freeoffset = newFree;
        }
        else
            newSpace = NULL;
    
        SpinLockRelease(ShmemLock);
    
        if (!newSpace)
            ereport(WARNING,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of shared memory")));
    
        return newSpace;
    }
    /* shared memory global variables */
    
    static PGShmemHeader *ShmemSegHdr;        /* shared mem segment header */
    
    static void *ShmemBase;            /* start address of shared memory */
    
    static void *ShmemEnd;            /* end+1 address of shared memory */
    
    slock_t    *ShmemLock;            /* spinlock for shared memory and LWLock
                                     * allocation */
    
    static HTAB *ShmemIndex = NULL; /* primary index hashtable for shmem */
    
    
    /*
     *    InitShmemAccess() --- set up basic pointers to shared memory.
     *
     * Note: the argument should be declared "PGShmemHeader *seghdr",
     * but we use void to avoid having to include ipc.h in shmem.h.
     */
    void
    InitShmemAccess(void *seghdr)
    {
        PGShmemHeader *shmhdr = (PGShmemHeader *) seghdr;
    
        ShmemSegHdr = shmhdr;
        ShmemBase = (void *) shmhdr;
        ShmemEnd = (char *) ShmemBase + shmhdr->totalsize;
    }

     数据库系统启动的时候的情形已经有所了解了。那么运行中,transaction id 是如何递增的呢。如果我运行两次 GetNewTransactionId,就可以发现 transactionid 每次加2了。

    /*
     * AssignTransactionId
     *
     * Assigns a new permanent XID to the given TransactionState.
     * We do not assign XIDs to transactions until/unless this is called.
     * Also, any parent TransactionStates that don't yet have XIDs are assigned
     * one; this maintains the invariant that a child transaction has an XID
     * following its parent's.
     */
    static void
    AssignTransactionId(TransactionState s)
    {
    
        fprintf(stderr,"************---------------------In AssignTransactionId..start by process %d
    ",getpid());
        bool        isSubXact = (s->parent != NULL);
        ResourceOwner currentOwner;
    
        /* Assert that caller didn't screw up */
        Assert(!TransactionIdIsValid(s->transactionId));
        Assert(s->state == TRANS_INPROGRESS);
    
        /*
         * Ensure parent(s) have XIDs, so that a child always has an XID later
         * than its parent.  Musn't recurse here, or we might get a stack overflow
         * if we're at the bottom of a huge stack of subtransactions none of which
         * have XIDs yet.
         */
        if (isSubXact && !TransactionIdIsValid(s->parent->transactionId))
        {
            TransactionState p = s->parent;
            TransactionState *parents;
            size_t        parentOffset = 0;
    
            parents = palloc(sizeof(TransactionState) * s->nestingLevel);
            while (p != NULL && !TransactionIdIsValid(p->transactionId))
            {
                parents[parentOffset++] = p;
                p = p->parent;
            }
    
            /*
             * This is technically a recursive call, but the recursion will never
             * be more than one layer deep.
             */
            while (parentOffset != 0)
                AssignTransactionId(parents[--parentOffset]);
    
            pfree(parents);
        }
    
        /*
         * Generate a new Xid and record it in PG_PROC and pg_subtrans.
         *
         * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
         * shared storage other than PG_PROC; because if there's no room for it in
         * PG_PROC, the subtrans entry is needed to ensure that other backends see
         * the Xid as "running".  See GetNewTransactionId.
         */
        s->transactionId = GetNewTransactionId(isSubXact);
    
       ////#####added by gaojian fprintf(stderr,
    "In AssignTransactionId ....1.... transaction is: %d ",s->transactionId); s->transactionId = GetNewTransactionId(isSubXact); fprintf(stderr,"In AssignTransactionId ....2.... transaction is: %d ",s->transactionId); /////#####added by gaojian if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); /* * If it's a top-level transaction, the predicate locking system needs to * be told about it too. */ if (!isSubXact) RegisterPredicateLockingXid(s->transactionId); /* * Acquire lock on the transaction XID. (We assume this cannot block.) We * have to ensure that the lock is assigned to the transaction's own * ResourceOwner. */ currentOwner = CurrentResourceOwner; PG_TRY(); { CurrentResourceOwner = s->curTransactionOwner; XactLockTableInsert(s->transactionId); } PG_CATCH(); { /* Ensure CurrentResourceOwner is restored on error */ CurrentResourceOwner = currentOwner; PG_RE_THROW(); } PG_END_TRY(); CurrentResourceOwner = currentOwner; /* * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each * top-level transaction we issue a WAL record for the assignment. We * include the top-level xid and all the subxids that have not yet been * reported using XLOG_XACT_ASSIGNMENT records. * * This is required to limit the amount of shared memory required in a hot * standby server to keep track of in-progress XIDs. See notes for * RecordKnownAssignedTransactionIds(). * * We don't keep track of the immediate parent of each subxid, only the * top-level transaction that each subxact belongs to. This is correct in * recovery only because aborted subtransactions are separately WAL * logged. */ if (isSubXact && XLogStandbyInfoActive()) { unreportedXids[nUnreportedXids] = s->transactionId; nUnreportedXids++; /* * ensure this test matches similar one in * RecoverPreparedTransactions() */ if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS) { XLogRecData rdata[2]; xl_xact_assignment xlrec; /* * xtop is always set by now because we recurse up transaction * stack to the highest unassigned xid and then come back down */ xlrec.xtop = GetTopTransactionId(); Assert(TransactionIdIsValid(xlrec.xtop)); xlrec.nsubxacts = nUnreportedXids; rdata[0].data = (char *) &xlrec; rdata[0].len = MinSizeOfXactAssignment; rdata[0].buffer = InvalidBuffer; rdata[0].next = &rdata[1]; rdata[1].data = (char *) unreportedXids; rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId); rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); nUnreportedXids = 0; } } fprintf(stderr,"---------------------In AssignTransactionId end..by process %d ",getpid()); }

    关键在这个:GetNewTransactionId函数中,调用了 TransactionIdAdvance(ShmemVariableCache->nextXid)

    /*
     * Allocate the next XID for a new transaction or subtransaction.
     *
     * The new XID is also stored into MyProc before returning.
     *
     * Note: when this is called, we are actually already inside a valid
     * transaction, since XIDs are now not allocated until the transaction
     * does something.    So it is safe to do a database lookup if we want to
     * issue a warning about XID wrap.
     */
    TransactionId
    GetNewTransactionId(bool isSubXact)
    {
        fprintf(stderr,"*********In GetNewTransactionId.....start by process %d
    ",getpid());
    
        TransactionId xid;
    
        /*
         * During bootstrap initialization, we return the special bootstrap
         * transaction id.
         */
        if (IsBootstrapProcessingMode())
        {
            Assert(!isSubXact);
            MyProc->xid = BootstrapTransactionId;
            return BootstrapTransactionId;
        }
    
        /* safety check, we should never get this far in a HS slave */
        if (RecoveryInProgress())
            elog(ERROR, "cannot assign TransactionIds during recovery");
    
        LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
    
        xid = ShmemVariableCache->nextXid;
    
        fprintf(stderr,"In GetNewTransactionId--------1, xid is :%d
    ",xid);
    
        /*----------
         * Check to see if it's safe to assign another XID.  This protects against
         * catastrophic data loss due to XID wraparound.  The basic rules are:
         *
         * If we're past xidVacLimit, start trying to force autovacuum cycles.
         * If we're past xidWarnLimit, start issuing warnings.
         * If we're past xidStopLimit, refuse to execute transactions, unless
         * we are running in a standalone backend (which gives an escape hatch
         * to the DBA who somehow got past the earlier defenses).
         *----------
         */
        if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
        {
            /*
             * For safety's sake, we release XidGenLock while sending signals,
             * warnings, etc.  This is not so much because we care about
             * preserving concurrency in this situation, as to avoid any
             * possibility of deadlock while doing get_database_name(). First,
             * copy all the shared values we'll need in this path.
             */
            TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
            TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
            TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
            Oid            oldest_datoid = ShmemVariableCache->oldestXidDB;
    
            LWLockRelease(XidGenLock);
    
            /*
             * To avoid swamping the postmaster with signals, we issue the autovac
             * request only once per 64K transaction starts.  This still gives
             * plenty of chances before we get into real trouble.
             */
            if (IsUnderPostmaster && (xid % 65536) == 0)
                SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
    
            if (IsUnderPostmaster &&
                TransactionIdFollowsOrEquals(xid, xidStopLimit))
            {
                char       *oldest_datname = get_database_name(oldest_datoid);
    
                /* complain even if that DB has disappeared */
                if (oldest_datname)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                             errmsg("database is not accepting commands to avoid wraparound data loss in database "%s"",
                                    oldest_datname),
                             errhint("Stop the postmaster and use a standalone backend to vacuum that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
                else
                    ereport(ERROR,
                            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                             errmsg("database is not accepting commands to avoid wraparound data loss in database with OID %u",
                                    oldest_datoid),
                             errhint("Stop the postmaster and use a standalone backend to vacuum that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
            }
            else if (TransactionIdFollowsOrEquals(xid, xidWarnLimit))
            {
                char       *oldest_datname = get_database_name(oldest_datoid);
    
                /* complain even if that DB has disappeared */
                if (oldest_datname)
                    ereport(WARNING,
                            (errmsg("database "%s" must be vacuumed within %u transactions",
                                    oldest_datname,
                                    xidWrapLimit - xid),
                             errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
                else
                    ereport(WARNING,
                            (errmsg("database with OID %u must be vacuumed within %u transactions",
                                    oldest_datoid,
                                    xidWrapLimit - xid),
                             errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.
    "
                                     "You might also need to commit or roll back old prepared transactions.")));
            }
    
            /* Re-acquire lock and start over */
            LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
            xid = ShmemVariableCache->nextXid;
        }
    
        /*
         * If we are allocating the first XID of a new page of the commit log,
         * zero out that commit-log page before returning. We must do this while
         * holding XidGenLock, else another xact could acquire and commit a later
         * XID before we zero the page.  Fortunately, a page of the commit log
         * holds 32K or more transactions, so we don't have to do this very often.
         *
         * Extend pg_subtrans too.
         */
        ExtendCLOG(xid);
        ExtendSUBTRANS(xid);
    
        /*
         * Now advance the nextXid counter.  This must not happen until after we
         * have successfully completed ExtendCLOG() --- if that routine fails, we
         * want the next incoming transaction to try it again.    We cannot assign
         * more XIDs until there is CLOG space for them.
         */
        TransactionIdAdvance(ShmemVariableCache->nextXid);
    
       /*
         * We must store the new XID into the shared ProcArray before releasing
         * XidGenLock.    This ensures that every active XID older than
         * latestCompletedXid is present in the ProcArray, which is essential for
         * correct OldestXmin tracking; see src/backend/access/transam/README.
         *
         * XXX by storing xid into MyProc without acquiring ProcArrayLock, we are
         * relying on fetch/store of an xid to be atomic, else other backends
         * might see a partially-set xid here.    But holding both locks at once
         * would be a nasty concurrency hit.  So for now, assume atomicity.
         *
         * Note that readers of PGPROC xid fields should be careful to fetch the
         * value only once, rather than assume they can read a value multiple
         * times and get the same answer each time.
         *
         * The same comments apply to the subxact xid count and overflow fields.
         *
         * A solution to the atomic-store problem would be to give each PGPROC its
         * own spinlock used only for fetching/storing that PGPROC's xid and
         * related fields.
         *
         * If there's no room to fit a subtransaction XID into PGPROC, set the
         * cache-overflowed flag instead.  This forces readers to look in
         * pg_subtrans to map subtransaction XIDs up to top-level XIDs. There is a
         * race-condition window, in that the new XID will not appear as running
         * until its parent link has been placed into pg_subtrans. However, that
         * will happen before anyone could possibly have a reason to inquire about
         * the status of the XID, so it seems OK.  (Snapshots taken during this
         * window *will* include the parent XID, so they will deliver the correct
         * answer later on when someone does have a reason to inquire.)
         */
        {
            /*
             * Use volatile pointer to prevent code rearrangement; other backends
             * could be examining my subxids info concurrently, and we don't want
             * them to see an invalid intermediate state, such as incrementing
             * nxids before filling the array entry.  Note we are assuming that
             * TransactionId and int fetch/store are atomic.
             */
            volatile PGPROC *myproc = MyProc;
    
            if (!isSubXact)
                myproc->xid = xid;
            else
            {
                int            nxids = myproc->subxids.nxids;
    
                if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
                {
                    myproc->subxids.xids[nxids] = xid;
                    myproc->subxids.nxids = nxids + 1;
                }
                else
                    myproc->subxids.overflowed = true;
            }
        }
    
        LWLockRelease(XidGenLock);
    
        fprintf(stderr,"****************In GetNewTransactionId...xid is:%d..end by process %d
    
    ",xid,getpid());
    
        return xid;
    }

     

  • 相关阅读:
    结束也是开始
    21.1 LogMiner
    20.4 使用FLASHBACK DATABASE 恢复数据库到先前状态
    结束也是开始
    21.2 DBVERIFY
    21.3 DBNEWID
    20.2 使用FlashBack Table 恢复表到先前状态
    21.2 DBVERIFY
    21.3 DBNEWID
    20.3 使用FLASHBACK TABLE 恢复被删除表
  • 原文地址:https://www.cnblogs.com/gaojian/p/3159292.html
Copyright © 2011-2022 走看看