温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

PostgreSQL中ReceiveXlogStream有什么作用

发布时间:2021-11-09 15:13:50 来源:亿速云 阅读:175 作者:iii 栏目:关系型数据库

这篇文章主要介绍“PostgreSQL中ReceiveXlogStream有什么作用”,在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PostgreSQL中ReceiveXlogStream有什么作用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer->LogStreamerMain及其主要的实现函数ReceiveXlogStream.

一、数据结构

logstreamer_param
WAL data streamer参数.

typedef struct
{
     ////后台连接
    PGconn     *bgconn;
    //开始位置
    XLogRecPtr  startptr;
    //目录或者tar文件,依赖于使用的模式
    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */
    //系统标识符
    char       *sysidentifier;
    //时间线
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数

/*
 * Global parameters when receiving xlog stream. For details about the individual fields,
 * see the function comment for ReceiveXlogStream().
 * 接收xlog流数据时的全局参数.
 * 每个域字段的详细解释,参见ReceiveXlogStream()函数注释.
 */
typedef struct StreamCtl
{
    //streaming的开始位置
    XLogRecPtr  startpos;       /* Start position for streaming */
    //时间线
    TimeLineID  timeline;       /* Timeline to stream data from */
    //系统标识符
    char       *sysidentifier;  /* Validate this system identifier and
                                 * timeline */
    //standby超时信息
    int         standby_message_timeout;    /* Send status messages this often */
    //是否同步(写入时是否马上Flush WAL data)
    bool        synchronous;    /* Flush immediately WAL data on write */
    //在已归档的数据中标记segment为已完成
    bool        mark_done;      /* Mark segment as done in generated archive */
    //刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)
    bool        do_sync;        /* Flush to disk to ensure consistent state of
                                 * data */
    //在返回T时停止streaming
    stream_stop_callback stream_stop;   /* Stop streaming when returns true */
    //如有效,监测该socket中的输入并检查stream_stop()的返回
    pgsocket    stop_socket;    /* if valid, watch for input on this socket
                                 * and check stream_stop() when there is any */
    //如何写WAL
    WalWriteMethod *walmethod;  /* How to write the WAL */
    //附加到部分接受文件的后缀
    char       *partial_suffix; /* Suffix appended to partially received files */
    //使用的replication slot,如无则为NULL
    char       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源码解读

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流数据时的全局参数
    in_log_streamer = true;
    //初始化StreamCtl结构体
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收数据
    if (!ReceiveXlogStream(param->bgconn, &stream))
        /*
         * Any errors will already have been reported in the function process,
         * but we need to tell the parent that we didn't shutdown in a nice
         * way.
         * 在函数执行过程中出现的错误已通过警告的方式发出,
         * 但仍需要告知父进程不能优雅的关闭本进程.
         */
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //结束连接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否内存
    pg_free(stream.walmethod);
    return 0;
}

ReceiveXlogStream
在指定的开始位置接收log stream

/*
 * Receive a log stream starting at the specified position.
 * 在指定的开始位置接收log stream
 *
 * Individual parameters are passed through the StreamCtl structure.
 * 通过StreamCtl结构体传递参数.
 *
 * If sysidentifier is specified, validate that both the system
 * identifier and the timeline matches the specified ones
 * (by sending an extra IDENTIFY_SYSTEM command)
 * 如指定了系统标识符,验证系统标识符和timeline是否匹配指定的信息.
 * (通过发送额外的IDENTIFY_SYSTEM命令)
 *
 * All received segments will be written to the directory
 * specified by basedir. This will also fetch any missing timeline history
 * files.
 * 所有接收到的segments会写入到basedir中.
 * 这同时会提前所有缺失的timeline history文件.
 *
 * The stream_stop callback will be called every time data
 * is received, and whenever a segment is completed. If it returns
 * true, the streaming will stop and the function
 * return. As long as it returns false, streaming will continue
 * indefinitely.
 * stream_stop回调函数在每次接收到数据以及segment完成传输后调用.
 * 如返回T,streaming会停止,函数返回.
 * 如返回F,streaming会一直继续.
 *
 * If stream_stop() checks for external input, stop_socket should be set to
 * the FD it checks.  This will allow such input to be detected promptly
 * rather than after standby_message_timeout (which might be indefinite).
 * Note that signals will interrupt waits for input as well, but that is
 * race-y since a signal received while busy won't interrupt the wait.
 * 如stream_stop()用于检测额外的输入,stop_socket变量应设置为该函数需检查的FD.
 * 这会允许立即检测此类输入,而不是在standby_message_timeout之后(可能会无限循环).
 * 注意信号也会中断输入等待,但这是存在竞争的,因为在忙时接收到信号不会中断等待.
 *
 * standby_message_timeout controls how often we send a message
 * back to the master letting it know our progress, in milliseconds.
 * Zero means no messages are sent.
 * This message will only contain the write location, and never
 * flush or replay.
 * standby_message_timeout控制发送进度消息回master的频度,单位为ms.
 * 0意味着没有消息会发送.
 * 该消息只保存写入位置,永远不会flush或replay.
 *
 * If 'partial_suffix' is not NULL, files are initially created with the
 * given suffix, and the suffix is removed once the file is finished. That
 * allows you to tell the difference between partial and completed files,
 * so that you can continue later where you left.
 * 如'partial_suffix'不为NULL,文件已通过给定的suffix创建,
 *   一旦文件完成传输,则suffix会被清除.
 * 这是部分和完整完成文件的异同,以便在离开后可以继续.
 *
 * If 'synchronous' is true, the received WAL is flushed as soon as written,
 * otherwise only when the WAL file is closed.
 * 如'synchronous'为T,接收到的WAL会刷新为写入,否则的话只会在WAL file关闭时才写入.
 *
 * Note: The WAL location *must* be at a log segment start!
 * 注意:WAL位置必须是log segment的起始位置.
 */
bool
ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
    char        query[128];
    char        slotcmd[128];
    PGresult   *res;
    XLogRecPtr  stoppos;
    /*
     * The caller should've checked the server version already, but doesn't do
     * any harm to check it here too.
     * 调用者已完成版本校验,但这里重复校验并没有什么问题.
     */
    if (!CheckServerVersionForStreaming(conn))
        return false;
    /*
     * Decide whether we want to report the flush position. If we report the
     * flush position, the primary will know what WAL we'll possibly
     * re-request, and it can then remove older WAL safely. We must always do
     * that when we are using slots.
     * 确定是否需要报告flush位置.
     * 如果我们报告了flush位置,主服务器将会知道可能重复请求的WAL file,
     *   这样可以安全的移除更老的WAL.
     * 如使用slots,应经常执行该操作.
     *
     * Reporting the flush position makes one eligible as a synchronous
     * replica. People shouldn't include generic names in
     * synchronous_standby_names, but we've protected them against it so far,
     * so let's continue to do so unless specifically requested.
     * 报告flush位置使其符合同步副本的条件.
     * DBA不应该在synchronous_standby_names中包含常规的名称,但我们截止目前位置已很好的保护了它们,
     *   因此可以继续这样执行除非特别请求.
     */
    if (stream->replication_slot != NULL)
    {
        //存在slot
        reportFlushPosition = true;
        sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
    }
    else
    {
        if (stream->synchronous)
            reportFlushPosition = true;//同步
        else
            reportFlushPosition = false;//异步
        slotcmd[0] = 0;//ASCII 0
    }
    if (stream->sysidentifier != NULL)
    {
        //系统标识符不为NULL
        /* Validate system identifier hasn't changed */
        //验证系统标识符没有改变
        //发送IDENTIFY_SYSTEM命令
        res = PQexec(conn, "IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            fprintf(stderr,
                    _("%s: could not send replication command \"%s\": %s"),
                    progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
            PQclear(res);
            return false;
        }
        if (PQntuples(res) != 1 || PQnfields(res) < 3)
        {
            fprintf(stderr,
                    _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),
                    progname, PQntuples(res), PQnfields(res), 1, 3);
            PQclear(res);
            return false;
        }
        if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
        {
            fprintf(stderr,
                    _("%s: system identifier does not match between base backup and streaming onnection\n"),
                    progname);
            PQclear(res);
            return false;
        }
        if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
        {
            fprintf(stderr,
                    _("%s: starting timeline %u is not present in the server\n"),
                    progname, stream->timeline);
            PQclear(res);
            return false;
        }
        PQclear(res);
    }
    /*
     * initialize flush position to starting point, it's the caller's
     * responsibility that that's sane.
     * 初始化flush位置为开始点,这是调用者的责任.
     */
    lastFlushPosition = stream->startpos;
    while (1)
    {
        /*
         * Fetch the timeline history file for this timeline, if we don't have
         * it already. When streaming log to tar, this will always return
         * false, as we are never streaming into an existing file and
         * therefore there can be no pre-existing timeline history file.
         * 为该timeline提前timeline history,如我们已不需要.
         * 如streaming日志为tar格式,这通常会返回F,这如同从来没有streaming到已存在的文件中,
         *   因此没有已存在的timeline history文件.
         */
        if (!existsTimeLineHistoryFile(stream))
        {
            //如不存在history文件
            snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
            //发送TIMELINE_HISTORY命令
            res = PQexec(conn, query);
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
            {
                /* FIXME: we might send it ok, but get an error */
                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
                PQclear(res);
                return false;
            }
            /*
             * The response to TIMELINE_HISTORY is a single row result set
             * with two fields: filename and content
             * TIMELINE_HISTORY的响应是一个单行结果集,有两个字段:filename和content
             */
            if (PQnfields(res) != 2 || PQntuples(res) != 1)
            {
                fprintf(stderr,
                        _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),
                        progname, PQntuples(res), PQnfields(res), 1, 2);
            }
            /* Write the history file to disk */
            //写入history文件到磁盘上
            writeTimeLineHistoryFile(stream,
                                     PQgetvalue(res, 0, 0),
                                     PQgetvalue(res, 0, 1));
            PQclear(res);
        }
        /*
         * Before we start streaming from the requested location, check if the
         * callback tells us to stop here.
         * 从请求的位置开始streaming前,检查回调函数告诉我们在哪停止
         */
        if (stream->stream_stop(stream->startpos, stream->timeline, false))
            return true;
        /* Initiate the replication stream at specified location */
        //在指定的位置初始化复制流
        snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
                 slotcmd,
                 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
                 stream->timeline);
        //发送START_REPLICATION命令
        res = PQexec(conn, query);
        if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
            fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                    progname, "START_REPLICATION", PQresultErrorMessage(res));
            PQclear(res);
            return false;
        }
        PQclear(res);
        /* Stream the WAL */
        //流化WAL
        res = HandleCopyStream(conn, stream, &stoppos);
        if (res == NULL)
            goto error;
        /*
         * Streaming finished.
         *
         * There are two possible reasons for that: a controlled shutdown, or
         * we reached the end of the current timeline. In case of
         * end-of-timeline, the server sends a result set after Copy has
         * finished, containing information about the next timeline. Read
         * that, and restart streaming from the next timeline. In case of
         * controlled shutdown, stop here.
         * Streaming完成.
         * 这里有两个可能的原因:可控的shutdown或者到达了当前时间线的末尾.
         * 在end-of-timeline这种情况下,服务器在Copy完成后发送结果集,
         *   含有关于下一个时间线的相关信息.
         * 读取这些信息,在下一个时间线开始重新启动streaming.
         * 如为可控的关闭,可以停止了.
         */
        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
            /*
             * End-of-timeline. Read the next timeline's ID and starting
             * position. Usually, the starting position will match the end of
             * the previous timeline, but there are corner cases like if the
             * server had sent us half of a WAL record, when it was promoted.
             * The new timeline will begin at the end of the last complete
             * record in that case, overlapping the partial WAL record on the
             * old timeline.
             * 这是End-of-timeline的情况.
             * 读取下一个时间线ID和开始位置.通常来说,开始位置将匹配先前时间线的末尾,
             *   但会存在特殊的情况比如服务器已经传输了WAL Record的一部分.
             * 这种情况下,新的时间线会在上次已完成的记录末尾开始,与旧时间线的部分WAL Record重叠.
             */
            uint32      newtimeline;//新的时间线
            bool        parsed;//是否解析
            //读取结果集的末尾
            parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
            PQclear(res);
            if (!parsed)
                goto error;
            /* Sanity check the values the server gave us */
            //执行校验和坚持
            if (newtimeline <= stream->timeline)
            {
                //新的时间线不可能小于等于stream中的时间线
                fprintf(stderr,
                        _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
                        progname, newtimeline, stream->timeline);
                goto error;
            }
            if (stream->startpos > stoppos)
            {
                //开始位置大于结束位置
                fprintf(stderr,
                        _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),
                        progname,
                        stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
                        newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
                goto error;
            }
            /* Read the final result, which should be CommandComplete. */
            //读取最后的结果,应为命令结束
            res = PQgetResult(conn);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
                fprintf(stderr,
                        _("%s: unexpected termination of replication stream: %s"),
                        progname, PQresultErrorMessage(res));
                PQclear(res);
                goto error;
            }
            PQclear(res);
            /*
             * Loop back to start streaming from the new timeline. Always
             * start streaming at the beginning of a segment.
             * 从新时间线开始循环,通常会在segment的开始出开始streaming
             */
            stream->timeline = newtimeline;
            stream->startpos = stream->startpos -
                XLogSegmentOffset(stream->startpos, WalSegSz);
            continue;//继续循环
        }
        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
            PQclear(res);
            /*
             * End of replication (ie. controlled shut down of the server).
             * replication完成(比如服务器关闭了复制)
             *
             * Check if the callback thinks it's OK to stop here. If not,
             * complain.
             * 检查是否回调函数认为在这里停止就OK了,如果不是,则报警.
             */
            if (stream->stream_stop(stoppos, stream->timeline, false))
                return true;
            else
            {
                fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
                        progname);
                goto error;
            }
        }
        else
        {
            /* Server returned an error. */
            //返回错误
            fprintf(stderr,
                    _("%s: unexpected termination of replication stream: %s"),
                    progname, PQresultErrorMessage(res));
            PQclear(res);
            goto error;
        }
    }
error:
    if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
                progname, current_walfile_name, stream->walmethod->getlasterror());
    walfile = NULL;
    return false;
}
/*
 * The main loop of ReceiveXlogStream. Handles the COPY stream after
 * initiating streaming with the START_REPLICATION command.
 * ReceiveXlogStream中的主循环实现函数.
 * 在使用START_REPLICATION命令初始化streaming后处理COPY stream.
 *
 * If the COPY ends (not necessarily successfully) due a message from the
 * server, returns a PGresult and sets *stoppos to the last byte written.
 * On any other sort of error, returns NULL.
 * 如COPY由于服务器端的原因终止,返回PGresult并设置*stoppos为最后写入的字节.
 * 如出现错误,则返回NULL.
 */
static PGresult *
HandleCopyStream(PGconn *conn, StreamCtl *stream,
                 XLogRecPtr *stoppos)
{
    char       *copybuf = NULL;
    TimestampTz last_status = -1;
    XLogRecPtr  blockpos = stream->startpos;
    still_sending = true;
    while (1)
    {
        //循环处理
        int         r;
        TimestampTz now;//时间戳
        long        sleeptime;
        /*
         * Check if we should continue streaming, or abort at this point.
         * 检查我们是否应该继续streaming,或者在当前就退出
         */
        if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
            goto error;
        now = feGetCurrentTimestamp();
        /*
         * If synchronous option is true, issue sync command as soon as there
         * are WAL data which has not been flushed yet.
         * 如同步选项为T,只要存在未flushed的WAL data,马上执行sync命令.
         */
        if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
        {
            if (stream->walmethod->sync(walfile) != 0)
            {
                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
                        progname, current_walfile_name, stream->walmethod->getlasterror());
                goto error;
            }
            lastFlushPosition = blockpos;
            /*
             * Send feedback so that the server sees the latest WAL locations
             * immediately.
             * 发送反馈以便服务器马上可看到最后的WAL位置.
             */
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Potentially send a status message to the master
         * 可能向主服务器发送状态消息
         */
        if (still_sending && stream->standby_message_timeout > 0 &&
            feTimestampDifferenceExceeds(last_status, now,
                                         stream->standby_message_timeout))
        {
            /* Time to send feedback! */
            //是时候发送反馈了.
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Calculate how long send/receive loops should sleep
         * 计算send/receive循环应该睡眠多长时间
         */
        sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                 last_status);
        //拷贝stream中接收到的内容
        r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
        while (r != 0)
        {
            if (r == -1)
                goto error;//出错
            if (r == -2)
            {
                //已完结或出错
                PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
                if (res == NULL)
                    goto error;
                else
                    return res;
            }
            /* Check the message type. */
            //检查消息类型
            if (copybuf[0] == 'k')
            {
                if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                         &last_status))
                    goto error;
            }
            else if (copybuf[0] == 'w')
            {
                if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
                    goto error;
                /*
                 * Check if we should continue streaming, or abort at this
                 * point.
                 * 检查我们是否应该继续streaming或者在此就停止
                 */
                if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
                    goto error;
            }
            else
            {
                fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
                        progname, copybuf[0]);
                goto error;
            }
            /*
             * Process the received data, and any subsequent data we can read
             * without blocking.
             * 处理接收到的数据,后续的数据可以无阻塞的读取.
             */
            r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
        }
    }
error:
    if (copybuf != NULL)
        PQfreemem(copybuf);
    return NULL;
}
/*
 * Check if we should continue streaming, or abort at this point.
 */
static bool
CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
                    XLogRecPtr *stoppos)
{
    if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    {
        if (!close_walfile(stream, blockpos))
        {
            /* Potential error message is written by close_walfile */
            return false;
        }
        if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
        {
            fprintf(stderr, _("%s: could not send copy-end packet: %s"),
                    progname, PQerrorMessage(conn));
            return false;
        }
        still_sending = false;
    }
    return true;
}
/*
 * Receive CopyData message available from XLOG stream, blocking for
 * maximum of 'timeout' ms.
 * 接收从XLOG stream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞.
 *
 * If data was received, returns the length of the data. *buffer is set to
 * point to a buffer holding the received message. The buffer is only valid
 * until the next CopyStreamReceive call.
 * 如接收到数据,则返回数据的大小.
 * 变量*buffer设置为指向含有接收到消息的buffer.buffer在下一个CopyStreamReceive调用才会生效.
 *
 * Returns 0 if no data was available within timeout, or if wait was
 * interrupted by signal or stop_socket input.
 * -1 on error. -2 if the server ended the COPY.
 * 如在timeout时间内没有数据返回,或者如果因为信号等待/stop_socket输入中断,则返回0.
 * -1:表示出现错误.-2表示服务器完成了COPY
 */
static int
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                  char **buffer)
{
    char       *copybuf = NULL;
    int         rawlen;
    if (*buffer != NULL)
        PQfreemem(*buffer);
    *buffer = NULL;
    /* Try to receive a CopyData message */
    rawlen = PQgetCopyData(conn, ©buf, 1);
    if (rawlen == 0)
    {
        int         ret;
        /*
         * No data available.  Wait for some to appear, but not longer than
         * the specified timeout, so that we can ping the server.  Also stop
         * waiting if input appears on stop_socket.
         */
        ret = CopyStreamPoll(conn, timeout, stop_socket);
        if (ret <= 0)
            return ret;
        /* Now there is actually data on the socket */
        if (PQconsumeInput(conn) == 0)
        {
            fprintf(stderr,
                    _("%s: could not receive data from WAL stream: %s"),
                    progname, PQerrorMessage(conn));
            return -1;
        }
        /* Now that we've consumed some input, try again */
        rawlen = PQgetCopyData(conn, ©buf, 1);
        if (rawlen == 0)
            return 0;
    }
    if (rawlen == -1)           /* end-of-streaming or error */
        return -2;
    if (rawlen == -2)
    {
        fprintf(stderr, _("%s: could not read COPY data: %s"),
                progname, PQerrorMessage(conn));
        return -1;
    }
    /* Return received messages to caller */
    *buffer = copybuf;
    return rawlen;
}

三、跟踪分析

备份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪(跟踪fork的子进程)

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) set follow-fork-mode child
(gdb) b LogStreamerMain
Breakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_1604"
[New process 2036]
[Thread debugging using libthread_db enabled]backup/backup_label          )
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]
Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490
490     in_log_streamer = true;
305153/305153 kB (100%), 1/1 tablespace                                          )
pg_basebackup: write-ahead log end point: 0/5A0000F8
pg_basebackup: waiting for background process to finish streaming ...
(gdb)

输入参数

(gdb) n
492     MemSet(&stream, 0, sizeof(stream));
(gdb) p *param
$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

设置StreamCtl结构体

(gdb) n
493     stream.startpos = param->startptr;
(gdb) 
494     stream.timeline = param->timeline;
(gdb) 
495     stream.sysidentifier = param->sysidentifier;
(gdb) 
496     stream.stream_stop = reached_end_position;
(gdb) 
498     stream.stop_socket = bgpipe[0];
(gdb) 
502     stream.standby_message_timeout = standby_message_timeout;
(gdb) 
503     stream.synchronous = false;
(gdb) 
504     stream.do_sync = do_sync;
(gdb) 
505     stream.mark_done = true;
(gdb) 
506     stream.partial_suffix = NULL;
(gdb) 
507     stream.replication_slot = replication_slot;
(gdb) 
509     if (format == 'p')
(gdb) 
510         stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
(gdb)

进入ReceiveXlogStream函数

(gdb) 
514     if (!ReceiveXlogStream(param->bgconn, &stream))
(gdb) step
ReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458
458     if (!CheckServerVersionForStreaming(conn))
(gdb) 
(gdb) n
472     if (stream->replication_slot != NULL)
(gdb) p *stream
$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474", 
  standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true, 
  stream_stop = 0x403953 <reached_end_position>, stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0, 
  replication_slot = 0x62a1e0 "pg_basebackup_1604"}
(gdb)

判断系统标识符和时间线

(gdb) n
474         reportFlushPosition = true;
(gdb) 
475         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
(gdb) 
486     if (stream->sysidentifier != NULL)
(gdb) 
489         res = PQexec(conn, "IDENTIFY_SYSTEM");
(gdb) 
490         if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
498         if (PQntuples(res) != 1 || PQnfields(res) < 3)
(gdb) 
506         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
(gdb) p PQgetvalue(res, 0, 0)
$3 = 0x633500 "6666964067616600474"
(gdb) n
514         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
(gdb) 
522         PQclear(res);
(gdb) p PQgetvalue(res, 0, 1)
$4 = 0x633514 "16"
(gdb)

不存在时间线history文件,生成history文件

(gdb) n
529     lastFlushPosition = stream->startpos;
(gdb) 
539         if (!existsTimeLineHistoryFile(stream))
(gdb) 
541             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
(gdb) 
542             res = PQexec(conn, query);
(gdb) 
543             if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
556             if (PQnfields(res) != 2 || PQntuples(res) != 1)
(gdb) 
564             writeTimeLineHistoryFile(stream,
(gdb) 
568             PQclear(res);
(gdb)

调用START_REPLICATION命令初始化

(gdb) 
575         if (stream->stream_stop(stream->startpos, stream->timeline, false))
(gdb) n
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
583         res = PQexec(conn, query);
(gdb) 
584         if (PQresultStatus(res) != PGRES_COPY_BOTH)
(gdb) 
591         PQclear(res);
(gdb)

执行命令,处理stream WAL,完成调用

595         if (res == NULL)
(gdb) p *res
$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0, 
  resultStatus = PGRES_COMMAND_OK, 
  cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' <repeats 19 times>, "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 <defaultNoticeReceiver>, 
    noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 <defaultNoticeProcessor>, noticeProcArg = 0x0}, events = 0x0, 
  nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, 
  curOffset = 0, spaceLeft = 0}
(gdb) n
608         if (PQresultStatus(res) == PGRES_TUPLES_OK)
(gdb) 
666         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
(gdb) 
668             PQclear(res);
(gdb) 
676             if (stream->stream_stop(stoppos, stream->timeline, false))
(gdb) 
677                 return true;
(gdb) 
702 }
(gdb) 
LogStreamerMain (param=0x629db0) at pg_basebackup.c:523
523     if (!stream.walmethod->finish())
(gdb)

到此,关于“PostgreSQL中ReceiveXlogStream有什么作用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI