本篇内容主要讲解“PostgreSQL中的ProcessRepliesIfAny函数分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中的ProcessRepliesIfAny函数分析”吧!
调用栈如下:
(gdb) bt
#0 0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,
nevents=1) at latch.c:1048
#2 0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,
wait_event_info=83886092) at latch.c:1000
#3 0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,
wait_event_info=83886092) at latch.c:385
#4 0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5 0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6 0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
at walsender.c:1539
#7 0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
at postgres.c:4178
#8 0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9 0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228
N/A
ProcessRepliesIfAny
在streaming期间,处理接收到的消息,同时检查远程终端是否关闭了连接,执行相关处理.
代码不多也不复杂,可自行阅读.
/*
* Process any incoming messages while streaming. Also checks if the remote
* end has closed the connection.
* 在streaming期间,处理接收到的消息.
* 同时检查远程终端是否关闭了连接,执行相关处理.
*/
static void
ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
bool received = false;
//当前时间
last_processing = GetCurrentTimestamp();
for (;;)
{
//---------- 循环接收相关消息
pq_startmsgread();
r = pq_getbyte_if_available(&firstchar);
if (r < 0)
{
/* unexpected error or EOF */
//未知异常或者EOF
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected EOF on standby connection")));
//进程退出
proc_exit(0);
}
if (r == 0)
{
/* no data available without blocking */
//已无阻塞的消息数据,退出
pq_endmsgread();
break;
}
/* Read the message contents */
//读取消息内容
resetStringInfo(&reply_message);
if (pq_getmessage(&reply_message, 0))
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected EOF on standby connection")));
proc_exit(0);
}
/*
* If we already received a CopyDone from the frontend, the frontend
* should not send us anything until we've closed our end of the COPY.
* XXX: In theory, the frontend could already send the next command
* before receiving the CopyDone, but libpq doesn't currently allow
* that.
* 如果已在前台接收到CopyDone消息,前台不应该再发送消息,直至关闭COPY.
* XXX:理论上来说,在接收到CopyDone前,前台可能已经发送了下一个命令,但libpq不允许这种情况发生
*/
if (streamingDoneReceiving && firstchar != 'X')
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
firstchar)));
/* Handle the very limited subset of commands expected in this phase */
//处理有限几个命令
switch (firstchar)
{
/*
* 'd' means a standby reply wrapped in a CopyData packet.
* 'd'意味着standby节点的应答封装了CopyData包
*/
case 'd':
ProcessStandbyMessage();
received = true;
break;
/*
* CopyDone means the standby requested to finish streaming.
* Reply with CopyDone, if we had not sent that already.
* CopyDone意味着standby节点请求结束streaming.
* 如尚未发送,则使用CopyDone应答.
*/
case 'c':
if (!streamingDoneSending)
{
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
}
streamingDoneReceiving = true;
received = true;
break;
/*
* 'X' means that the standby is closing down the socket.
* 'X'意味着standby节点正在关闭socket
*/
case 'X':
proc_exit(0);
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid standby message type \"%c\"",
firstchar)));
}
}
/*
* Save the last reply timestamp if we've received at least one reply.
* 如接收到至少一条应答信息,则保存最后的应答时间戳.
*/
if (received)
{
last_reply_timestamp = last_processing;
waiting_for_ping_response = false;
}
}
在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点
(gdb) set follow-fork-mode child
(gdb) b ProcessRepliesIfAny
Breakpoint 2 at 0x85343b: file walsender.c, line 1597.
(gdb) c
Continuing.
Breakpoint 2, ProcessRepliesIfAny () at walsender.c:1597
1597 bool received = false;
(gdb)
查看进程信息
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1376 1 0 14:16 ? 00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb 1377 1376 0 14:16 ? 00:00:00 postgres: logger
xdb 1550 1376 0 16:53 ? 00:00:00 postgres: checkpointer
xdb 1551 1376 0 16:53 ? 00:00:00 postgres: background writer
xdb 1552 1376 0 16:53 ? 00:00:00 postgres: walwriter
xdb 1553 1376 0 16:53 ? 00:00:00 postgres: autovacuum launcher
xdb 1554 1376 0 16:53 ? 00:00:00 postgres: archiver
xdb 1555 1376 0 16:53 ? 00:00:00 postgres: stats collector
xdb 1556 1376 0 16:53 ? 00:00:00 postgres: logical replication launcher
xdb 1633 1376 0 17:26 ? 00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle
循环接收相关消息
(gdb) n
1599 last_processing = GetCurrentTimestamp();
(gdb)
1603 pq_startmsgread();
(gdb)
1604 r = pq_getbyte_if_available(&firstchar);
(gdb)
1605 if (r < 0)
(gdb) p r
$1 = 1
(gdb) p firstchar
$2 = 100 'd'
(gdb)
命令是’d’,执行相关处理
(gdb) n
1613 if (r == 0)
(gdb)
1621 resetStringInfo(&reply_message);
(gdb)
1622 if (pq_getmessage(&reply_message, 0))
(gdb)
1637 if (streamingDoneReceiving && firstchar != 'X')
(gdb)
1644 switch (firstchar)
(gdb)
1650 ProcessStandbyMessage();
(gdb)
1651 received = true;
(gdb)
1652 break;
(gdb)
1681 }
(gdb)
设置断点
(gdb) b walsender.c:1643
Breakpoint 3 at 0x8535b6: file walsender.c, line 1643.
(gdb) b walsender.c:1672
Breakpoint 4 at 0x85361a: file walsender.c, line 1672.
(gdb) c
Continuing.
Breakpoint 3, ProcessRepliesIfAny () at walsender.c:1644
1644 switch (firstchar)
(gdb)
Continuing.
...
Breakpoint 4, ProcessRepliesIfAny () at walsender.c:1673
1673 proc_exit(0);
(gdb)
进程即将退出,查看进程信息
[xdb@localhost ~]$ ps -ef|grep postgres
xdb 1376 1 0 14:16 ? 00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb 1377 1376 0 14:16 ? 00:00:00 postgres: logger
xdb 1550 1376 0 16:53 ? 00:00:00 postgres: checkpointer
xdb 1551 1376 0 16:53 ? 00:00:00 postgres: background writer
xdb 1552 1376 0 16:53 ? 00:00:00 postgres: walwriter
xdb 1553 1376 0 16:53 ? 00:00:00 postgres: autovacuum launcher
xdb 1554 1376 0 16:53 ? 00:00:00 postgres: archiver
xdb 1555 1376 0 16:53 ? 00:00:00 postgres: stats collector
xdb 1556 1376 0 16:53 ? 00:00:00 postgres: logical replication launcher
xdb 1633 1376 0 17:26 ? 00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle
xdb 1637 1376 0 17:27 ? 00:00:00 postgres: walsender replicator 192.168.26.26(40530) streaming 0/5D075248
[xdb@localhost ~]$
进程退出(PID=1633),启动了新的进程(PID=1637)
(gdb) n
[Inferior 2 (process 1633) exited normally]
(gdb)
到此,相信大家对“PostgreSQL中的ProcessRepliesIfAny函数分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/6906/viewspace-2639294/