本节介绍了PostgreSQL提交事务的具体实现逻辑,主要解析了函数CommitTransaction->RecordTransactionCommit的实现逻辑。
TransactionState
事务状态结构体
/*
* transaction states - transaction state from server perspective
* 事务状态枚举 - 服务器视角的事务状态
*/
typedef enum TransState
{
TRANS_DEFAULT, /* idle 空闲 */
TRANS_START, /* transaction starting 事务启动 */
TRANS_INPROGRESS, /* inside a valid transaction 进行中 */
TRANS_COMMIT, /* commit in progress 提交中 */
TRANS_ABORT, /* abort in progress 回滚中 */
TRANS_PREPARE /* prepare in progress 准备中 */
} TransState;
/*
* transaction block states - transaction state of client queries
* 事务块状态 - 客户端查询的事务状态
*
* Note: the subtransaction states are used only for non-topmost
* transactions; the others appear only in the topmost transaction.
* 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
*/
typedef enum TBlockState
{
/* not-in-transaction-block states 未进入事务块状态 */
TBLOCK_DEFAULT, /* idle 空闲 */
TBLOCK_STARTED, /* running single-query transaction 单个查询事务 */
/* transaction block states 事务块状态 */
TBLOCK_BEGIN, /* starting transaction block 开始事务块 */
TBLOCK_INPROGRESS, /* live transaction 进行中 */
TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
TBLOCK_END, /* COMMIT received 接收到COMMIT */
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_PREPARE, /* live xact, PREPARE received 进行中,接收到PREPARE */
/* subtransaction states 子事务状态 */
TBLOCK_SUBBEGIN, /* starting a subtransaction 开启 */
TBLOCK_SUBINPROGRESS, /* live subtransaction 进行中 */
TBLOCK_SUBRELEASE, /* RELEASE received 接收到RELEASE */
TBLOCK_SUBCOMMIT, /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
TBLOCK_SUBABORT, /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;
/*
* transaction state structure
* 事务状态结构体
*/
typedef struct TransactionStateData
{
//事务ID
TransactionId transactionId; /* my XID, or Invalid if none */
//子事务ID
SubTransactionId subTransactionId; /* my subxact ID */
//保存点名称
char *name; /* savepoint name, if any */
//保存点级别
int savepointLevel; /* savepoint level */
//低级别的事务状态
TransState state; /* low-level state */
//高级别的事务状态
TBlockState blockState; /* high-level state */
//事务嵌套深度
int nestingLevel; /* transaction nesting depth */
//GUC上下文嵌套深度
int gucNestLevel; /* GUC context nesting depth */
//事务生命周期上下文
MemoryContext curTransactionContext; /* my xact-lifetime context */
//查询资源
ResourceOwner curTransactionOwner; /* my query resources */
//按XID顺序保存的已提交的子事务ID
TransactionId *childXids; /* subcommitted child XIDs, in XID order */
//childXids数组大小
int nChildXids; /* # of subcommitted child XIDs */
//分配的childXids数组空间
int maxChildXids; /* allocated size of childXids[] */
//上一个CurrentUserId
Oid prevUser; /* previous CurrentUserId setting */
//上一个SecurityRestrictionContext
int prevSecContext; /* previous SecurityRestrictionContext */
//上一事务是否只读?
bool prevXactReadOnly; /* entry-time xact r/o state */
//是否处于Recovery?
bool startedInRecovery; /* did we start in recovery? */
//XID是否已保存在WAL Record中?
bool didLogXid; /* has xid been included in WAL record? */
//Enter/ExitParallelMode计数器
int parallelModeLevel; /* Enter/ExitParallelMode counter */
//父事务状态
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
//结构体指针
typedef TransactionStateData *TransactionState;
RecordTransactionCommit函数,在WAL Record中记录COMMIT Record,返回最新的XID,如果xact没有XID,则返回InvalidTransactionId。.
/*
* RecordTransactionCommit
*
* Returns latest XID among xact and its children, or InvalidTransactionId
* if the xact has no XID. (We compute that here just because it's easier.)
* 返回最新的XID,如果xact没有XID,则返回InvalidTransactionId。
* (我们在这里计算是因为它更简单。)
*
* If you change this function, see RecordTransactionCommitPrepared also.
*/
static TransactionId
RecordTransactionCommit(void)
{
TransactionId xid = GetTopTransactionIdIfAny();//获取XID
bool markXidCommitted = TransactionIdIsValid(xid);//标记
TransactionId latestXid = InvalidTransactionId;//最后的XID
int nrels;
RelFileNode *rels;
int nchildren;
TransactionId *children;
int nmsgs = 0;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval = false;
bool wrote_xlog;
/* Get data needed for commit record */
//为WAL Record的commit record准备数据.
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
if (XLogStandbyInfoActive())
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
wrote_xlog = (XactLastRecEnd != 0);
/*
* If we haven't been assigned an XID yet, we neither can, nor do we want
* to write a COMMIT record.
* 如果仍未分配XID,我们既不能也不想写COMMIT WAL Record。
*/
if (!markXidCommitted)
{
/*
* We expect that every smgrscheduleunlink is followed by a catalog
* update, and hence XID assignment, so we shouldn't get here with any
* pending deletes. Use a real test not just an Assert to check this,
* since it's a bit fragile.
* 我们希望每个smgrscheduleunlink之后都有一个目录更新,
* 因此进行XID分配,所以我们不应该在这里进行任何删除。
* 使用真正的测试,而不仅仅是一个断言来检查它,因为它有点脆弱。
*/
if (nrels != 0)
elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
/* Can't have child XIDs either; AssignTransactionId enforces this */
//没有child XIDs,AssignTransactionId会强制实现此逻辑.
Assert(nchildren == 0);
/*
* Transactions without an assigned xid can contain invalidation
* messages (e.g. explicit relcache invalidations or catcache
* invalidations for inplace updates); standbys need to process those.
* We can't emit a commit record without an xid, and we don't want to
* force assigning an xid, because that'd be problematic for e.g.
* vacuum. Hence we emit a bespoke record for the invalidations. We
* don't want to use that in case a commit record is emitted, so they
* happen synchronously with commits (besides not wanting to emit more
* WAL records).
* 没有指定xid的事务可以包含失效消息
* (例如显式relcache失效消息或catcache失效消息,用于就地更新);备机需要处理这些消息.
* 我们不能在没有xid的情况下发出COMMIT WAL Record,
* 而且我们也不想强制分配xid,因为这对于vacuum来说是有问题的。
* 因此,我们发布一个定制的记录。
* 我们不希望在发出COMMIT WAL Record时使用它,
* 因此它们与提交同步发生(除了不希望发出更多WAL记录之外)。
*/
if (nmsgs != 0)
{
LogStandbyInvalidations(nmsgs, invalMessages,
RelcacheInitFileInval);
wrote_xlog = true; /* not strictly necessary */
}
/*
* If we didn't create XLOG entries, we're done here; otherwise we
* should trigger flushing those entries the same as a commit record
* would. This will primarily happen for HOT pruning and the like; we
* want these to be flushed to disk in due time.
* 如果我们没有创建XLOG条目,我们已完成所有工作;
* 否则,我们应该像提交记录那样触发刷新这些条目。
* 这主要发生在HOT pruning等;我们希望在适当的时候将它们刷新到磁盘。
*/
if (!wrote_xlog)
goto cleanup;
}
else
{
bool replorigin;
/*
* Are we using the replication origins feature? Or, in other words,
* are we replaying remote actions?
* 我们正在使用复制源特性吗?或者,换句话说,我们正在回放远程操作吗?
*/
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
/*
* Begin commit critical section and insert the commit XLOG record.
* 开始进入提交关键部分并插入commit XLOG记录。
*/
/* Tell bufmgr and smgr to prepare for commit */
//通知bufmgr和smgr准备提交
BufmgrCommit();
/*
* Mark ourselves as within our "commit critical section". This
* forces any concurrent checkpoint to wait until we've updated
* pg_xact. Without this, it is possible for the checkpoint to set
* REDO after the XLOG record but fail to flush the pg_xact update to
* disk, leading to loss of the transaction commit if the system
* crashes a little later.
* 将自己标记为“提交关键部分”。
* 这将强制并发检查点等待,直到我们更新了pg_xact。
* 如果不这样做,检查点可以在XLOG记录之后设置REDO,
* 但是无法将pg_xact更新刷新到磁盘,如果稍后系统崩溃,就会丢失事务提交。
*
* Note: we could, but don't bother to, set this flag in
* RecordTransactionAbort. That's because loss of a transaction abort
* is noncritical; the presumption would be that it aborted, anyway.
* 注意:我们可以在RecordTransactionAbort中设置此标志,但不必费心。
* 这是因为事务中止的损失是无关紧要的;无论如何,假设它会回滚。
*
* It's safe to change the delayChkpt flag of our own backend without
* holding the ProcArrayLock, since we're the only one modifying it.
* This makes checkpoint's determination of which xacts are delayChkpt
* a bit fuzzy, but it doesn't matter.
* 在不保存ProcArrayLock的情况下更改自己的后端delayChkpt标志是安全的,因为只有我们在修改它。
* 这使得检查点对哪些xacts是delayChkpt的判断有点模糊,但这无关紧要。
*/
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
SetCurrentTransactionStopTimestamp();
XactLogCommitRecord(xactStopTimestamp,
nchildren, children, nrels, rels,
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
InvalidTransactionId, NULL /* plain commit */ );
if (replorigin)
/* Move LSNs forward for this replication origin */
//为该复制源向前移动LSNs
replorigin_session_advance(replorigin_session_origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit
* timestamp if there's no replication origin; otherwise, the
* timestamp was already set in replorigin_session_origin_timestamp by
* replication.
* 记录提交时间戳。
* 如果没有复制源,则该值来自普通的提交时间戳;
* 否则,通过复制已经在replorigin_session_origin_timestamp中设置了时间戳。
*
* We don't need to WAL-log anything here, as the commit record
* written above already contains the data.
* 我们不需要WAL-log在这里记录任何东西,因为上面写的提交记录已经包含了数据。
*/
if (!replorigin || replorigin_session_origin_timestamp == 0)
replorigin_session_origin_timestamp = xactStopTimestamp;
TransactionTreeSetCommitTsData(xid, nchildren, children,
replorigin_session_origin_timestamp,
replorigin_session_origin, false);
}
/*
* Check if we want to commit asynchronously. We can allow the XLOG flush
* to happen asynchronously if synchronous_commit=off, or if the current
* transaction has not performed any WAL-logged operation or didn't assign
* an xid. The transaction can end up not writing any WAL, even if it has
* an xid, if it only wrote to temporary and/or unlogged tables. It can
* end up having written WAL without an xid if it did HOT pruning. In
* case of a crash, the loss of such a transaction will be irrelevant;
* temp tables will be lost anyway, unlogged tables will be truncated and
* HOT pruning will be done again later. (Given the foregoing, you might
* think that it would be unnecessary to emit the XLOG record at all in
* this case, but we don't currently try to do that. It would certainly
* cause problems at least in Hot Standby mode, where the
* KnownAssignedXids machinery requires tracking every XID assignment. It
* might be OK to skip it only when wal_level < replica, but for now we
* don't.)
* 检查是否希望执行异步提交.
* 如synchronous_commit=off,可以允许异步执行XLOG刷新,或者如果当前事务没有执行
* WAL-logged操作或者不能分配XID.
* 如果事务只写入临时和/或unlogged的表,那么即使它有一个xid,它也不会写入任何WAL。
* 如果事务执行HOT pruning,那么可以在没有XID的情况下写入WAL.
* 在crash的情况下,此类事务引起的问题将无关紧要;临时表可以随时废弃,unlogged表将被阶段,
* 而HOT pruning在稍后将被再次执行.
* (鉴于上述情况,您可能认为在本例中根本没有必要发出XLOG记录,但我们目前并不尝试这样做。
* 至少在热备份模式下,它肯定会导致问题,因为在这种模式下,KnownAssignedXids机器需要跟踪每个XID分配。
* 可能只在wal_level < replica时跳过它是可以的,但是现在我们不这样做。)
*
* However, if we're doing cleanup of any non-temp rels or committing any
* command that wanted to force sync commit, then we must flush XLOG
* immediately. (We must not allow asynchronous commit if there are any
* non-temp tables to be deleted, because we might delete the files before
* the COMMIT record is flushed to disk. We do allow asynchronous commit
* if all to-be-deleted tables are temporary though, since they are lost
* anyway if we crash.)
* 但是,如果我们正在清理任何非临时的临时记录或提交想要强制同步提交的命令,那么我们必须立即刷新XLOG。
* (如存在非临时表的删除操作,则不允许异步提交,因为我们可能在COMMIT 记录刷到磁盘前已删除了文件.
* 但如果将被删除的是临时表,我们确实可以允许异步提交,因为临时表在crash也会丢弃)
*/
if ((wrote_xlog && markXidCommitted &&
synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
forceSyncCommit || nrels > 0)
{
XLogFlush(XactLastRecEnd);
/*
* Now we may update the CLOG, if we wrote a COMMIT record above
* 现在我们更新CLOG,如果我们在上面已写入了COMMIT WAL Record.
*/
if (markXidCommitted)
TransactionIdCommitTree(xid, nchildren, children);
}
else
{
//异步提交
/*
* Asynchronous commit case:
* 异步提交:
*
* This enables possible committed transaction loss in the case of a
* postmaster crash because WAL buffers are left unwritten. Ideally we
* could issue the WAL write without the fsync, but some
* wal_sync_methods do not allow separate write/fsync.
* 这可能会导致在postmaster崩溃的情况下出现提交的事务丢失,
* 因为WAL buffer是未持久化的。
* 理想情况下,我们可以在没有fsync的情况下发出WAL write,
* 但是一些wal_sync_methods不允许单独的write/fsync。
*
* Report the latest async commit LSN, so that the WAL writer knows to
* flush this commit.
* 反馈最后的异步提交LSN,通知WAL写入器刷新此commit
*/
XLogSetAsyncXactLSN(XactLastRecEnd);
/*
* We must not immediately update the CLOG, since we didn't flush the
* XLOG. Instead, we store the LSN up to which the XLOG must be
* flushed before the CLOG may be updated.
* 我们不能马上更新CLOG,因为我们还没有刷新XLOG.
* 相反的,我们存储LSN直至在CLOG可能已更新前XLOG必须需要刷新的时候.
*/
if (markXidCommitted)
TransactionIdAsyncCommitTree(xid, nchildren, children, XactLastRecEnd);
}
/*
* If we entered a commit critical section, leave it now, and let
* checkpoints proceed.
* 如果已进入commit关键区域,已完成工作,可以离开了,让checkpoints执行相关操作.
*/
if (markXidCommitted)
{
MyPgXact->delayChkpt = false;
END_CRIT_SECTION();
}
/* Compute latestXid while we have the child XIDs handy */
//如持有子XIDs,计算最后的latestXid
latestXid = TransactionIdLatest(xid, nchildren, children);
/*
* Wait for synchronous replication, if required. Similar to the decision
* above about using committing asynchronously we only want to wait if
* this backend assigned an xid and wrote WAL. No need to wait if an xid
* was assigned due to temporary/unlogged tables or due to HOT pruning.
* 如需要,等待同步复制.
* 与上述使用异步提交的决定类似,我们只想在该进程已分配和写入WAL的情况才等待.
* 临时/unlogged表或者HOT pruning,不需要等待事务ID是否已分配.
*
* Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks.
* 注意在这个场景下,我们必须标记clog,但在procarray中仍显示为running,并一直持有锁.
*/
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */
//记录最后commit记录的位置
XactLastCommitEnd = XactLastRecEnd;
/* Reset XactLastRecEnd until the next transaction writes something */
//重置XactLastRecEnd直至下个事务写入数据.
XactLastRecEnd = 0;
cleanup:
/* Clean up local data */
//清除本地数据
if (rels)
pfree(rels);
//返回XID
return latestXid;
}
插入数据,执行commit
10:57:56 (xdb@[local]:5432)testdb=# begin;
BEGIN
10:57:59 (xdb@[local]:5432)testdb=#* insert into t_session1 values(1);
INSERT 0 1
10:58:01 (xdb@[local]:5432)testdb=#* commit;
启动gdb,设置断点
(gdb) b RecordTransactionCommit
Breakpoint 2 at 0x547528: file xact.c, line 1141.
(gdb) c
Continuing.
Breakpoint 2, RecordTransactionCommit () at xact.c:1141
1141 TransactionId xid = GetTopTransactionIdIfAny();
(gdb)
查看调用栈
(gdb) bt
#0 RecordTransactionCommit () at xact.c:1141
#1 0x00000000005483f2 in CommitTransaction () at xact.c:2070
#2 0x0000000000549078 in CommitTransactionCommand () at xact.c:2831
#3 0x00000000008c8ea9 in finish_xact_command () at postgres.c:2523
#4 0x00000000008c6b5d in exec_simple_query (query_string=0x2c97ec8 "commit;") at postgres.c:1170
#5 0x00000000008cae70 in PostgresMain (argc=1, argv=0x2cc3dc8, dbname=0x2cc3c30 "testdb", username=0x2c94ba8 "xdb")
at postgres.c:4182
#6 0x000000000082642b in BackendRun (port=0x2cb9c00) at postmaster.c:4361
#7 0x0000000000825b8f in BackendStartup (port=0x2cb9c00) at postmaster.c:4033
#8 0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#9 0x00000000008217b4 in PostmasterMain (argc=1, argv=0x2c92b60) at postmaster.c:1379
#10 0x00000000007488ef in main (argc=1, argv=0x2c92b60) at main.c:228
(gdb)
获取事务ID
(gdb) p xid
$3 = 2411
(gdb)
设置其他变量,markXidCommitted —> True
(gdb) n
1143 TransactionId latestXid = InvalidTransactionId;
(gdb)
1148 int nmsgs = 0;
(gdb)
1149 SharedInvalidationMessage *invalMessages = NULL;
(gdb)
1150 bool RelcacheInitFileInval = false;
(gdb)
1154 nrels = smgrGetPendingDeletes(true, &rels);
(gdb)
1155 nchildren = xactGetCommittedChildren(&children);
(gdb)
1156 if (XLogStandbyInfoActive())
(gdb)
1159 wrote_xlog = (XactLastRecEnd != 0);
(gdb)
1165 if (!markXidCommitted)
(gdb) p latestXid
$4 = 0
(gdb) p markXidCommitted
$5 = true
(gdb) p nrels
$6 = 0
(gdb) p nchildren
$7 = 0
(gdb) p wrote_xlog
$8 = true
(gdb)
markXidCommitted为T,进入相应的处理逻辑.
开始进入提交关键部分并插入commit XLOG记录。
(gdb) n
1214 replorigin = (replorigin_session_origin != InvalidRepOriginId &&
(gdb)
1221 BufmgrCommit();
(gdb) p replorigin
$9 = false
(gdb)
进入提交部分,设置当前事务时间戳
(gdb) n
1240 START_CRIT_SECTION();
(gdb)
1241 MyPgXact->delayChkpt = true;
(gdb)
1243 SetCurrentTransactionStopTimestamp();
(gdb) p *MyPgXact
$10 = {xid = 2411, xmin = 0, vacuumFlags = 0 '\000', overflowed = false, delayChkpt = true, nxids = 0 '\000'}
(gdb)
插入XLOG
(gdb) n
1245 XactLogCommitRecord(xactStopTimestamp,
(gdb)
1252 if (replorigin)
(gdb)
设置提交事务数据
(gdb)
1267 if (!replorigin || replorigin_session_origin_timestamp == 0)
(gdb)
1268 replorigin_session_origin_timestamp = xactStopTimestamp;
(gdb)
1270 TransactionTreeSetCommitTsData(xid, nchildren, children,
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
同步刷新XLOG
(gdb)
1301 synchronous_commit > SYNCHRONOUS_COMMIT_OFF) ||
(gdb)
1300 if ((wrote_xlog && markXidCommitted &&
(gdb)
1304 XLogFlush(XactLastRecEnd);
(gdb)
1309 if (markXidCommitted)
(gdb)
更新CLOG,如果我们在上面已写入了COMMIT WAL Record.
(gdb)
1310 TransactionIdCommitTree(xid, nchildren, children);
(gdb)
1309 if (markXidCommitted)
(gdb)
退出提交关键区域
(gdb)
1340 if (markXidCommitted)
(gdb)
1342 MyPgXact->delayChkpt = false;
(gdb)
1343 END_CRIT_SECTION();
(gdb)
计算最后的latestXid
(gdb)
1347 latestXid = TransactionIdLatest(xid, nchildren, children);
(gdb) n
1358 if (wrote_xlog && markXidCommitted)
(gdb) p latestXid
$11 = 2411
(gdb)
记录最后commit记录的位置
(gdb) n
1359 SyncRepWaitForLSN(XactLastRecEnd, true);
(gdb)
1362 XactLastCommitEnd = XactLastRecEnd;
(gdb)
1365 XactLastRecEnd = 0;
(gdb)
1368 if (rels)
(gdb)
1371 return latestXid;
(gdb) p XactLastCommitEnd
$12 = 5522364896
(gdb)
返回,完成调用
(gdb) n
1372 }
(gdb)
CommitTransaction () at xact.c:2087
2087 TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
(gdb)
DONE!
How Postgres Makes Transactions Atomic
PG Source Code
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。