初步学习pg_control文件之十四

简介:

接前文 初步学习pg_control文件之十三

看如下几个:

复制代码
/*
     * Parameter settings that determine if the WAL can be used for archival
     * or hot standby.
     */
    int            wal_level;
    int            MaxConnections;
    int            max_prepared_xacts;
    int            max_locks_per_xact;
复制代码

PostgreSQL中多次用到了函数数组:

复制代码
/*                                        
 * Method table for resource managers.                                        
 *                                        
 * RmgrTable[] is indexed by RmgrId values (see rmgr.h).                                        
 */                                        
typedef struct RmgrData                                        
{                                        
    const char *rm_name;                                    
    void        (*rm_redo) (XLogRecPtr lsn, XLogRecord *rptr);                            
    void        (*rm_desc) (StringInfo buf, uint8 xl_info, char *rec);                            
    void        (*rm_startup) (void);                            
    void        (*rm_cleanup) (void);                            
    bool        (*rm_safe_restartpoint) (void);                            
} RmgrData;                                        
                                        
                                        
const RmgrData RmgrTable[RM_MAX_ID + 1] = {                                        
    {"XLOG", xlog_redo, xlog_desc, NULL, NULL, NULL},                                    
    {"Transaction", xact_redo, xact_desc, NULL, NULL, NULL},                                    
    {"Storage", smgr_redo, smgr_desc, NULL, NULL, NULL},                                    
    {"CLOG", clog_redo, clog_desc, NULL, NULL, NULL},                                    
    {"Database", dbase_redo, dbase_desc, NULL, NULL, NULL},                                    
    {"Tablespace", tblspc_redo, tblspc_desc, NULL, NULL, NULL},                                    
    {"MultiXact", multixact_redo, multixact_desc, NULL, NULL, NULL},                                    
    {"RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL},                                    
    {"Standby", standby_redo, standby_desc, NULL, NULL, NULL},                                    
    {"Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL},                                    
    {"Heap", heap_redo, heap_desc, NULL, NULL, NULL},                                    
    {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, 
    {"Hash", hash_redo, hash_desc, NULL, NULL, NULL},                                    
    {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},   
    {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL}, 
    {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}                                    
};                                        
复制代码

看代码:

复制代码
/*                                            
 * This must be called ONCE during postmaster or standalone-backend startup                                            
 */                                            
void                                            
StartupXLOG(void)                                            
{                                            
    …                                        
    /*                                        
     * Check whether we need to force recovery from WAL.  If it appears to    
     * have been a clean shutdown and we did not have a recovery.conf file, 
     * then assume no recovery needed.                                        
     */                                        
    if (XLByteLT(checkPoint.redo, RecPtr))                                        
    {                                        
        if (wasShutdown)                                    
            ereport(PANIC,                                
                    (errmsg("invalid redo record in shutdown checkpoint")));                        
        InRecovery = true;                                    
    }                                        
    else if (ControlFile->state != DB_SHUTDOWNED)                                        
        InRecovery = true;                                    
    else if (InArchiveRecovery)                                        
    {                                        
        /* force recovery due to presence of recovery.conf */                                    
        InRecovery = true;                                    
    }                                        
                                            
    /* REDO */                                        
    if (InRecovery)                                        
    {                                        
        …                                    
        /*                                    
         * Find the first record that logically follows the checkpoint --- it 
         * might physically precede it, though.                                    
         */                                    
        if (XLByteLT(checkPoint.redo, RecPtr))                                    
        {                                    
            /* back up to find the record */                                
            record = ReadRecord(&(checkPoint.redo), PANIC, false);                                
        }                                    
        else                                    
        {                                    
            /* just have to read next record after CheckPoint */                                
            record = ReadRecord(NULL, LOG, false);                                
        }                                    
                                            
        if (record != NULL)                                    
        {                                    
            bool        recoveryContinue = true;                        
            bool        recoveryApply = true;                        
            bool        recoveryPause = false;                        
            ErrorContextCallback errcontext;                                
            TimestampTz xtime;                                
                                            
            InRedo = true;                                
                                            
            ereport(LOG,                                
                    (errmsg("redo starts at %X/%X",                        
                            ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));                
                                            
            /*                                
             * main redo apply loop                                
             */                                
            do                                
            {                                
                #ifdef WAL_DEBUG                            
                if (XLOG_DEBUG ||                            
                 (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||                            
                    (rmid != RM_XACT_ID && trace_recovery_messages <= DEBUG3))                        
                {                            
                    StringInfoData buf;                        
                                            
                    initStringInfo(&buf);                        
                    appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",                        
                                     ReadRecPtr.xlogid, ReadRecPtr.xrecoff,        
                                     EndRecPtr.xlogid, EndRecPtr.xrecoff);        
                    xlog_outrec(&buf, record);                        
                    appendStringInfo(&buf, " - ");                        
                    RmgrTable[record->xl_rmid].rm_desc(&buf,                        
                                       record->xl_info,        
                                     XLogRecGetData(record));        
                    elog(LOG, "%s", buf.data);                        
                    pfree(buf.data);                        
                }                            
                #endif                            
                                            
                /* Handle interrupt signals of startup process */                            
                HandleStartupProcInterrupts();                            
                                            
                /* Allow read-only connections if we're consistent now */                            
                CheckRecoveryConsistency();                            
                                            
                /*                            
                 * Have we reached our recovery target?                            
                 */                            
                if (recoveryStopsHere(record, &recoveryApply))                            
                {                            
                    /*                        
                     * Pause only if users can connect to send a resume                        
                     * message                        
                     */                        
                    if (recoveryPauseAtTarget && standbyState == STANDBY_SNAPSHOT_READY)                        
                    {                        
                        SetRecoveryPause(true);                    
                        recoveryPausesHere();                    
                    }                        
                    reachedStopPoint = true;    /* see below */                    
                    recoveryContinue = false;                        
                    if (!recoveryApply)                        
                        break;                    
                }                            
                                            
                /* Setup error traceback support for ereport() */                            
                errcontext.callback = rm_redo_error_callback;                            
                errcontext.arg = (void *) record;                            
                errcontext.previous = error_context_stack;                            
                error_context_stack = &errcontext;                            
                                            
                /* nextXid must be beyond record's xid */                            
                if (TransactionIdFollowsOrEquals(record->xl_xid,                            
                                 ShmemVariableCache->nextXid))            
                {                            
                    ShmemVariableCache->nextXid = record->xl_xid;                        
                    TransactionIdAdvance(ShmemVariableCache->nextXid);                        
                }                            
                                            
                /*                            
                 * Update shared replayEndRecPtr before replaying this record,                            
                 * so that XLogFlush will update minRecoveryPoint correctly.                            
                 */                            
                SpinLockAcquire(&xlogctl->info_lck);                            
                xlogctl->replayEndRecPtr = EndRecPtr;                            
                recoveryPause = xlogctl->recoveryPause;                            
                SpinLockRelease(&xlogctl->info_lck);                            
                                            
                /*                            
                 * Pause only if users can connect to send a resume message                            
                 */                            
                if (recoveryPause && standbyState == STANDBY_SNAPSHOT_READY)                            
                    recoveryPausesHere();                        
                                            
                /*                            
                 * If we are attempting to enter Hot Standby mode, process                            
                 * XIDs we see                            
                 */                            
                if (standbyState >= STANDBY_INITIALIZED &&                            
                    TransactionIdIsValid(record->xl_xid))                        
                    RecordKnownAssignedTransactionIds(record->xl_xid);                        
                                            
                RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);                            
                                            
                /* Pop the error context stack */                            
                error_context_stack = errcontext.previous;                            
                                            
                /*                            
                 * Update shared recoveryLastRecPtr after this record has been                            
                 * replayed.                            
                 */                            
                SpinLockAcquire(&xlogctl->info_lck);                            
                xlogctl->recoveryLastRecPtr = EndRecPtr;                            
                SpinLockRelease(&xlogctl->info_lck);                            
                                            
                LastRec = ReadRecPtr;                            
                                            
                record = ReadRecord(NULL, LOG, false);                            
            } while (record != NULL && recoveryContinue);                                
                                            
            /*                                
             * end of main redo apply loop                                
             */                                
                                            
            ereport(LOG,                                
                (errmsg("redo done at %X/%X",                            
                        ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));                    
                                            
            xtime = GetLatestXTime();                                
            if (xtime)                                
                ereport(LOG,                            
                     (errmsg("last completed transaction was at log time %s",                        
                             timestamptz_to_str(xtime))));                
            InRedo = false;                                
        }                                    
        else                                    
        {                                    
            /* there are no WAL records following the checkpoint */                                
            ereport(LOG,                                
                    (errmsg("redo is not required")));                        
        }                                    
    }                                        
                                            
    …                                        
}                                            
复制代码

经过分析可以发现,在Startup_XLOG函数中,通过 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);

调用了各个redo函数,其中包括: xlog_redo:

复制代码
/*                                    
 * XLOG resource manager's routines                                    
 *                                    
 * Definitions of info values are in include/catalog/pg_control.h, though                                    
 * not all record types are related to control file updates.                                    
 */                                    
void                                    
xlog_redo(XLogRecPtr lsn, XLogRecord *record)                                    
{                                    
    …                                
    if (info == XLOG_NEXTOID)                                
    {                                
        …                            
    }                                
    …                                
    else if (info == XLOG_PARAMETER_CHANGE)                                
    {                                
        xl_parameter_change xlrec;                            
                                    
        /* Update our copy of the parameters in pg_control */                            
        memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));                            
                                    
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);                            
        ControlFile->MaxConnections = xlrec.MaxConnections;                            
        ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts;                            
        ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;                            
        ControlFile->wal_level = xlrec.wal_level; 
        …                            
        UpdateControlFile();                            
        LWLockRelease(ControlFileLock);                            
                                    
        /* Check to see if any changes to max_connections give problems */                            
        CheckRequiredParameterValues();                            
    }                                
}                                    
复制代码

看这段话:是说找到checkpoint 后面开始的第一条记录的位置。

        /*                                    
         * Find the first record that logically follows the checkpoint --- it 
         * might physically precede it, though.                                    
         */  








目录
相关文章