温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

PostgreSQL 源码解读(211)- 后台进程#10(checkpointer-BufferSync)

发布时间:2020-08-09 08:28:03 来源:ITPUB博客 阅读:382 作者:husthxd 栏目:关系型数据库

本节介绍了checkpoint中用于刷盘的函数:BufferSync,该函数Write out all dirty buffers in the pool(把缓冲池中所有脏页持久化到物理存储中).
值得一提的是:checkpoint只会处理在checkpoint开始时的脏页(标记为BM_CHECKPOINT_NEEDED),而不会处理在checkpoint变脏的page.

一、数据结构

宏定义
checkpoints request flag bits,检查点请求标记位定义.


/*
 * OR-able request flag bits for checkpoints.  The "cause" bits are used only
 * for logging purposes.  Note: the flags must be defined so that it's
 * sensible to OR together request flags arising from different requestors.
 */
/* These directly affect the behavior of CreateCheckPoint and subsidiaries */
#define CHECKPOINT_IS_SHUTDOWN  0x0001  /* Checkpoint is for shutdown */
#define CHECKPOINT_END_OF_RECOVERY  0x0002  /* Like shutdown checkpoint, but
                       * issued at end of WAL recovery */
#define CHECKPOINT_IMMEDIATE  0x0004  /* Do it without delays */
#define CHECKPOINT_FORCE    0x0008  /* Force even if no activity */
#define CHECKPOINT_FLUSH_ALL  0x0010  /* Flush all pages, including those
                     * belonging to unlogged tables */
/* These are important to RequestCheckpoint */
#define CHECKPOINT_WAIT     0x0020  /* Wait for completion */
#define CHECKPOINT_REQUESTED  0x0040  /* Checkpoint request has been made */
/* These indicate the cause of a checkpoint request */
#define CHECKPOINT_CAUSE_XLOG 0x0080  /* XLOG consumption */
#define CHECKPOINT_CAUSE_TIME 0x0100  /* Elapsed time */

二、源码解读

BufferSync : 把缓冲池中所有脏页持久化到物理存储中.其主要逻辑如下:
1.执行相关校验,如确保调用SyncOneBuffer函数的正确性等;
2.根据checkpoint标记设置mask标记(如为XX,则unlogged buffer也会flush);
3.遍历缓存,使用BM_CHECKPOINT_NEEDED标记需要刷盘的缓存page;如无需要处理的page,则返回;
4.排序需刷盘的脏页,避免随机IO,提升性能;
5.为每一个需要刷脏页的表空间分配进度状态;
6.在单个标记的写进度上构建最小堆,并计算单个处理缓冲区占比多少;
7.如ts_heap不为空,循环处理
7.1获取buf_id
7.2调用SyncOneBuffer刷盘
7.3调用CheckpointWriteDelay,休眠以控制I/O频率
7.4释放资源,更新统计信息


/*
 * BufferSync -- Write out all dirty buffers in the pool.
 * 把缓冲池中所有脏页持久化到物理存储中.
 *
 * This is called at checkpoint time to write out all dirty shared buffers.
 * The checkpoint request flags should be passed in.  If CHECKPOINT_IMMEDIATE
 * is set, we disable delays between writes; if CHECKPOINT_IS_SHUTDOWN,
 * CHECKPOINT_END_OF_RECOVERY or CHECKPOINT_FLUSH_ALL is set, we write even
 * unlogged buffers, which are otherwise skipped.  The remaining flags
 * currently have no effect here.
 * 该函数在checkpoint时把缓冲池中所有脏页刷到磁盘上.
 * 输入参数为checkpoint请求标记.
 * 如请求标记为CHECKPOINT_IMMEDIATE,在写入期间禁用延迟;
 * 如为CHECKPOINT_IS_SHUTDOWN/CHECKPOINT_END_OF_RECOVERY/CHECKPOINT_FLUSH_ALL,
 *   就算正常情况下会忽略的unlogged缓存,也会写入到磁盘上.
 * 其他标记在这里没有影响.
 */
static void
BufferSync(int flags)
{
  uint32    buf_state;
  int     buf_id;
  int     num_to_scan;
  int     num_spaces;
  int     num_processed;
  int     num_written;
  CkptTsStatus *per_ts_stat = NULL;
  Oid     last_tsid;
  binaryheap *ts_heap;
  int     i;
  int     mask = BM_DIRTY;
  WritebackContext wb_context;
  /* Make sure we can handle the pin inside SyncOneBuffer */
  //确保可以处理在SyncOneBuffer函数中的pin page
  ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
  /*
   * Unless this is a shutdown checkpoint or we have been explicitly told,
   * we write only permanent, dirty buffers.  But at shutdown or end of
   * recovery, we write all dirty buffers.
   */
  //如为CHECKPOINT_IS_SHUTDOWN/CHECKPOINT_END_OF_RECOVERY/CHECKPOINT_FLUSH_ALL,
  //就算正常情况下会忽略的unlogged缓存,也会写入到磁盘上.
  if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
          CHECKPOINT_FLUSH_ALL))))
    mask |= BM_PERMANENT;
  /*
   * Loop over all buffers, and mark the ones that need to be written with
   * BM_CHECKPOINT_NEEDED.  Count them as we go (num_to_scan), so that we
   * can estimate how much work needs to be done.
   * 遍历缓存,使用BM_CHECKPOINT_NEEDED标记需要写入的page.
   * 对这些pages计数以便估算有多少工作需要完成.
   *
   * This allows us to write only those pages that were dirty when the
   * checkpoint began, and not those that get dirtied while it proceeds.
   * Whenever a page with BM_CHECKPOINT_NEEDED is written out, either by us
   * later in this function, or by normal backends or the bgwriter cleaning
   * scan, the flag is cleared.  Any buffer dirtied after this point won't
   * have the flag set.
   * 只需要写在checkpoint开始时的脏页,不需要包括在checkpoint期间变脏的page.
   * 一旦标记为BM_CHECKPOINT_NEEDED的脏页完成刷盘,
   *   在这个函数后续处理逻辑或者普通的后台进程/bgwriter进程会重置该标记.
   * 所有在该时点的脏页不会设置为BM_CHECKPOINT_NEEDED.
   *
   * Note that if we fail to write some buffer, we may leave buffers with
   * BM_CHECKPOINT_NEEDED still set.  This is OK since any such buffer would
   * certainly need to be written for the next checkpoint attempt, too.
   * 要注意的是脏页刷盘出错,脏页的标记仍为BM_CHECKPOINT_NEEDED,在下次checkpoint是尝试再次刷盘.
   */
  num_to_scan = 0;
  for (buf_id = 0; buf_id < NBuffers; buf_id++)
  {
    BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
    /*
     * Header spinlock is enough to examine BM_DIRTY, see comment in
     * SyncOneBuffer.
     */
    buf_state = LockBufHdr(bufHdr);
    if ((buf_state & mask) == mask)
    {
      CkptSortItem *item;
      buf_state |= BM_CHECKPOINT_NEEDED;
      item = &CkptBufferIds[num_to_scan++];
      item->buf_id = buf_id;
      item->tsId = bufHdr->tag.rnode.spcNode;
      item->relNode = bufHdr->tag.rnode.relNode;
      item->forkNum = bufHdr->tag.forkNum;
      item->blockNum = bufHdr->tag.blockNum;
    }
    UnlockBufHdr(bufHdr, buf_state);
  }
  if (num_to_scan == 0)
    return;         /* nothing to do */
  WritebackContextInit(&wb_context, &checkpoint_flush_after);
  TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
  /*
   * Sort buffers that need to be written to reduce the likelihood of random
   * IO. The sorting is also important for the implementation of balancing
   * writes between tablespaces. Without balancing writes we'd potentially
   * end up writing to the tablespaces one-by-one; possibly overloading the
   * underlying system.
   * 排序需刷盘的脏页,用于避免随机IO.
   */
  qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
      ckpt_buforder_comparator);
  num_spaces = 0;
  /*
   * Allocate progress status for each tablespace with buffers that need to
   * be flushed. This requires the to-be-flushed array to be sorted.
   * 为每一个需要刷脏页的表空间分配进度状态.
   */
  last_tsid = InvalidOid;
  for (i = 0; i < num_to_scan; i++)
  {
    CkptTsStatus *s;
    Oid     cur_tsid;
    cur_tsid = CkptBufferIds[i].tsId;
    /*
     * Grow array of per-tablespace status structs, every time a new
     * tablespace is found.
     */
    if (last_tsid == InvalidOid || last_tsid != cur_tsid)
    {
      Size    sz;
      num_spaces++;
      /*
       * Not worth adding grow-by-power-of-2 logic here - even with a
       * few hundred tablespaces this should be fine.
       */
      sz = sizeof(CkptTsStatus) * num_spaces;
      if (per_ts_stat == NULL)
        per_ts_stat = (CkptTsStatus *) palloc(sz);
      else
        per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
      s = &per_ts_stat[num_spaces - 1];
      memset(s, 0, sizeof(*s));
      s->tsId = cur_tsid;
      /*
       * The first buffer in this tablespace. As CkptBufferIds is sorted
       * by tablespace all (s->num_to_scan) buffers in this tablespace
       * will follow afterwards.
       */
      s->index = i;
      /*
       * progress_slice will be determined once we know how many buffers
       * are in each tablespace, i.e. after this loop.
       */
      last_tsid = cur_tsid;
    }
    else
    {
      s = &per_ts_stat[num_spaces - 1];
    }
    s->num_to_scan++;
  }
  Assert(num_spaces > 0);
  /*
   * Build a min-heap over the write-progress in the individual tablespaces,
   * and compute how large a portion of the total progress a single
   * processed buffer is.
   * 在单个标记的写进度上构建最小堆,并计算单个处理缓冲区占比多少.
   */
  ts_heap = binaryheap_allocate(num_spaces,
                  ts_ckpt_progress_comparator,
                  NULL);
  for (i = 0; i < num_spaces; i++)
  {
    CkptTsStatus *ts_stat = &per_ts_stat[i];
    ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
    binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
  }
  binaryheap_build(ts_heap);
  /*
   * Iterate through to-be-checkpointed buffers and write the ones (still)
   * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
   * tablespaces; otherwise the sorting would lead to only one tablespace
   * receiving writes at a time, making inefficient use of the hardware.
   * 迭代处理to-be-checkpointed buffers,刷脏页.
   * 在表空间之间写入是平衡的.
   */
  num_processed = 0;
  num_written = 0;
  while (!binaryheap_empty(ts_heap))
  {
    BufferDesc *bufHdr = NULL;
    CkptTsStatus *ts_stat = (CkptTsStatus *)
    DatumGetPointer(binaryheap_first(ts_heap));
    buf_id = CkptBufferIds[ts_stat->index].buf_id;
    Assert(buf_id != -1);
    bufHdr = GetBufferDescriptor(buf_id);
    num_processed++;
    /*
     * We don't need to acquire the lock here, because we're only looking
     * at a single bit. It's possible that someone else writes the buffer
     * and clears the flag right after we check, but that doesn't matter
     * since SyncOneBuffer will then do nothing.  However, there is a
     * further race condition: it's conceivable that between the time we
     * examine the bit here and the time SyncOneBuffer acquires the lock,
     * someone else not only wrote the buffer but replaced it with another
     * page and dirtied it.  In that improbable case, SyncOneBuffer will
     * write the buffer though we didn't need to.  It doesn't seem worth
     * guarding against this, though.
     */
    if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
    {
      //只处理标记为BM_CHECKPOINT_NEEDED的page
      //调用SyncOneBuffer刷盘(一次一个page)
      if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
      {
        TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
        BgWriterStats.m_buf_written_checkpoints++;
        num_written++;
      }
    }
    /*
     * Measure progress independent of actually having to flush the buffer
     * - otherwise writing become unbalanced.
     */
    ts_stat->progress += ts_stat->progress_slice;
    ts_stat->num_scanned++;
    ts_stat->index++;
    /* Have all the buffers from the tablespace been processed? */
    if (ts_stat->num_scanned == ts_stat->num_to_scan)
    {
      binaryheap_remove_first(ts_heap);
    }
    else
    {
      /* update heap with the new progress */
      binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
    }
    /*
     * Sleep to throttle our I/O rate.
     * 休眠 : 控制I/O频率
     */
    CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
  }
  /* issue all pending flushes */
  IssuePendingWritebacks(&wb_context);
  pfree(per_ts_stat);
  per_ts_stat = NULL;
  binaryheap_free(ts_heap);
  /*
   * Update checkpoint statistics. As noted above, this doesn't include
   * buffers written by other backends or bgwriter scan.
   */
  CheckpointStats.ckpt_bufs_written += num_written;
  TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
}

三、跟踪分析

测试脚本


testdb=# update t_wal_ckpt set c2 = 'C4#'||substr(c2,4,40);
UPDATE 1
testdb=# checkpoint;

跟踪分析


(gdb) handle SIGINT print nostop pass
SIGINT is used by the debugger.
Are you sure you want to change it? (y or n) y
Signal        Stop  Print Pass to program Description
SIGINT        No  Yes Yes   Interrupt
(gdb) b CheckPointGuts
Breakpoint 1 at 0x56f0ca: file xlog.c, line 8968.
(gdb) c
Continuing.
Program received signal SIGINT, Interrupt.
Breakpoint 1, CheckPointGuts (checkPointRedo=16953420440, flags=108) at xlog.c:8968
8968    CheckPointCLOG();
(gdb) n
8969    CheckPointCommitTs();
(gdb) 
8970    CheckPointSUBTRANS();
(gdb) 
8971    CheckPointMultiXact();
(gdb) 
8972    CheckPointPredicate();
(gdb) 
8973    CheckPointRelationMap();
(gdb) 
8974    CheckPointReplicationSlots();
(gdb) 
8975    CheckPointSnapBuild();
(gdb) 
8976    CheckPointLogicalRewriteHeap();
(gdb) 
8977    CheckPointBuffers(flags); /* performs all required fsyncs */
(gdb) step
CheckPointBuffers (flags=108) at bufmgr.c:2583
2583    TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
(gdb) n
2584    CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
(gdb) 
2585    BufferSync(flags);
(gdb) step
BufferSync (flags=108) at bufmgr.c:1793
1793    CkptTsStatus *per_ts_stat = NULL;
(gdb) p flags
$1 = 108
(gdb) n
1797    int     mask = BM_DIRTY;
(gdb) 
1801    ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
(gdb) 
1808    if (!((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
(gdb) 
1810      mask |= BM_PERMANENT;
(gdb) 
1828    num_to_scan = 0;
(gdb) 
1829    for (buf_id = 0; buf_id < NBuffers; buf_id++)
(gdb) 
1831      BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
(gdb) 
1837      buf_state = LockBufHdr(bufHdr);
(gdb) p buf_id
$2 = 0
(gdb) p NBuffers
$3 = 65536
(gdb) n
1839      if ((buf_state & mask) == mask)
(gdb) 
1853      UnlockBufHdr(bufHdr, buf_state);
(gdb) 
1829    for (buf_id = 0; buf_id < NBuffers; buf_id++)
(gdb) 
1831      BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
(gdb) 
1837      buf_state = LockBufHdr(bufHdr);
(gdb) 
1839      if ((buf_state & mask) == mask)
(gdb) 
1853      UnlockBufHdr(bufHdr, buf_state);
(gdb) 
1829    for (buf_id = 0; buf_id < NBuffers; buf_id++)
(gdb) b bufmgr.c:1856
Breakpoint 2 at 0x8a68b3: file bufmgr.c, line 1856.
(gdb) c
Continuing.
Breakpoint 2, BufferSync (flags=108) at bufmgr.c:1856
1856    if (num_to_scan == 0)
(gdb) p num_to_scan
$4 = 1
(gdb) n
1859    WritebackContextInit(&wb_context, &checkpoint_flush_after);
(gdb) 
1861    TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
(gdb) 
1870    qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
(gdb) 
1873    num_spaces = 0;
(gdb) 
1879    last_tsid = InvalidOid;
(gdb) 
1880    for (i = 0; i < num_to_scan; i++)
(gdb) 
1885      cur_tsid = CkptBufferIds[i].tsId;
(gdb) 
1891      if (last_tsid == InvalidOid || last_tsid != cur_tsid)
(gdb) p cur_tsid
$5 = 1663
(gdb) n
1895        num_spaces++;
(gdb) 
1901        sz = sizeof(CkptTsStatus) * num_spaces;
(gdb) 
1903        if (per_ts_stat == NULL)
(gdb) 
1904          per_ts_stat = (CkptTsStatus *) palloc(sz);
(gdb) 
1908        s = &per_ts_stat[num_spaces - 1];
(gdb) p sz
$6 = 40
(gdb) p num_spaces
$7 = 1
(gdb) n
1909        memset(s, 0, sizeof(*s));
(gdb) 
1910        s->tsId = cur_tsid;
(gdb) 
1917        s->index = i;
(gdb) 
1924        last_tsid = cur_tsid;
(gdb) 
1892      {
(gdb) 
1931      s->num_to_scan++;
(gdb) 
1880    for (i = 0; i < num_to_scan; i++)
(gdb) 
1934    Assert(num_spaces > 0);
(gdb) 
1941    ts_heap = binaryheap_allocate(num_spaces,
(gdb) 
1945    for (i = 0; i < num_spaces; i++)
(gdb) 
1947      CkptTsStatus *ts_stat = &per_ts_stat[i];
(gdb) 
1949      ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
(gdb) 
1951      binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
(gdb) 
1945    for (i = 0; i < num_spaces; i++)
(gdb) 
1954    binaryheap_build(ts_heap);
(gdb) 
1962    num_processed = 0;
(gdb) p *ts_heap
$8 = {bh_size = 1, bh_space = 1, bh_has_heap_property = true, bh_compare = 0x8aa0d8 <ts_ckpt_progress_comparator>, 
  bh_arg = 0x0, bh_nodes = 0x2d666d8}
(gdb) n
1963    num_written = 0;
(gdb) 
1964    while (!binaryheap_empty(ts_heap))
(gdb) 
1966      BufferDesc *bufHdr = NULL;
(gdb) 
1968      DatumGetPointer(binaryheap_first(ts_heap));
(gdb) 
1967      CkptTsStatus *ts_stat = (CkptTsStatus *)
(gdb) 
1970      buf_id = CkptBufferIds[ts_stat->index].buf_id;
(gdb) 
1971      Assert(buf_id != -1);
(gdb) p buf_id
$9 = 160
(gdb) n
1973      bufHdr = GetBufferDescriptor(buf_id);
(gdb) 
1975      num_processed++;
(gdb) 
1989      if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
(gdb) p *bufHdr
$10 = {tag = {rnode = {spcNode = 1663, dbNode = 16384, relNode = 221290}, forkNum = MAIN_FORKNUM, blockNum = 0}, 
  buf_id = 160, state = {value = 3549691904}, wait_backend_pid = 0, freeNext = -2, content_lock = {tranche = 53, state = {
      value = 536870912}, waiters = {head = 2147483647, tail = 2147483647}}}
(gdb) n
1991        if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
(gdb) 
1993          TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
(gdb) 
1994          BgWriterStats.m_buf_written_checkpoints++;
(gdb) 
1995          num_written++;
(gdb) 
2003      ts_stat->progress += ts_stat->progress_slice;
(gdb) 
2004      ts_stat->num_scanned++;
(gdb) 
2005      ts_stat->index++;
(gdb) 
2008      if (ts_stat->num_scanned == ts_stat->num_to_scan)
(gdb) 
2010        binaryheap_remove_first(ts_heap);
(gdb) 
2021      CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
(gdb) 
1964    while (!binaryheap_empty(ts_heap))
(gdb) 
2025    IssuePendingWritebacks(&wb_context);
(gdb) 
2027    pfree(per_ts_stat);
(gdb) 
2028    per_ts_stat = NULL;
(gdb) 
2029    binaryheap_free(ts_heap);
(gdb) 
2035    CheckpointStats.ckpt_bufs_written += num_written;
(gdb) 
2037    TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan);
(gdb) 
2038  }
(gdb) 
CheckPointBuffers (flags=108) at bufmgr.c:2586
2586    CheckpointStats.ckpt_sync_t = GetCurrentTimestamp();
(gdb)

四、参考资料

PG Source Code
PgSQL · 特性分析 · 谈谈checkpoint的调度

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI