PostgreSQL源码解读(150)-PGTools#2(BaseBackup函数)

本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的函数BaseBackup.

创新互联建站是专业的威远网站建设公司,威远接单;提供成都网站制作、成都网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行威远网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

一、数据结构

option
使用工具时存储选项的数据结构


#ifndef HAVE_STRUCT_OPTION
//工具软件选项
struct option
{
    const char *name;//名称
    int         has_arg;//是否包含参数,no_argument/required_argument/optional_argument
    int        *flag;//标记
    int         val;//参数值
};
#define no_argument 0
#define required_argument 1
#define optional_argument 2
#endif
/*
 * On OpenBSD and some versions of Solaris, opterr and friends are defined in
 * core libc rather than in a separate getopt module.  Define these variables
 * only if configure found they aren't there by default; otherwise, this
 * module and its callers will just use libc's variables.  (We assume that
 * testing opterr is sufficient for all of these.)
 */
#ifndef HAVE_INT_OPTERR
int         opterr = 1,         /* if error message should be printed */
            optind = 1,         /* index into parent argv vector */
            optopt;             /* character checked for validity */
char       *optarg;             /* argument associated with option */
#endif
#define BADCH   (int)'?'
#define BADARG  (int)':'
#define EMSG    ""

pg_result
用于接收PQgetResult的返回结果.


struct pg_result
{
    //元组数量
    int         ntups;
    //属性数量
    int         numAttributes;
    PGresAttDesc *attDescs;
    //PGresTuple数组
    PGresAttValue **tuples;     /* each PGresTuple is an array of
                                 * PGresAttValue's */
    //元组数组的大小
    int         tupArrSize;     /* allocated size of tuples array */
    //参数格式
    int         numParameters;
    //参数描述符
    PGresParamDesc *paramDescs;
    //执行状态类型(枚举变量)
    ExecStatusType resultStatus;
    //从查询返回的命令状态
    char        cmdStatus[CMDSTATUS_LEN];   /* cmd status from the query */
    //1-二进制的元组数据,否则为文本数据
    int         binary;         /* binary tuple values if binary == 1,
                                 * otherwise text */
    /*
     * These fields are copied from the originating PGconn, so that operations
     * on the PGresult don't have to reference the PGconn.
     * 这些字段从原始的PGconn中拷贝,以便不需要依赖PGconn
     */
    //钩子函数
    PGNoticeHooks noticeHooks;
    PGEvent    *events;
    int         nEvents;
    int         client_encoding;    /* encoding id */
    /*
     * Error information (all NULL if not an error result).  errMsg is the
     * "overall" error message returned by PQresultErrorMessage.  If we have
     * per-field info then it is stored in a linked list.
     * 错误信息(如没有错误,则全部为NULL)
     * errMsg是PQresultErrorMessage返回的"overall"错误信息.
     * 如果存在per-field信息,那么会存储在相互链接的链表中
     */
    //错误信息
    char       *errMsg;         /* error message, or NULL if no error */
    //按字段拆分的信息
    PGMessageField *errFields;  /* message broken into fields */
    //如可用,触发查询的文本信息
    char       *errQuery;       /* text of triggering query, if available */
    /* All NULL attributes in the query result point to this null string */
    //查询结果中的所有NULL属性指向该null字符串
    char        null_field[1];
    /*
     * Space management information.  Note that attDescs and error stuff, if
     * not null, point into allocated blocks.  But tuples points to a
     * separately malloc'd block, so that we can realloc it.
     * 空间管理信息.
     * 注意attDescs和error,如为not null,则指向已分配的blocks.
     * 但元组指向单独的已分配的block,因此可以重新分配空间.
     */
    //最近已分配的block
    PGresult_data *curBlock;    /* most recently allocated block */
    //块中空闲空间的开始偏移
    int         curOffset;      /* start offset of free space in block */
    //块中剩余的空闲字节
    int         spaceLeft;      /* number of free bytes remaining in block */
    //该PGresult结构体总共的分配空间
    size_t      memorySize;     /* total space allocated for this PGresult */
};
/* Data about a single parameter of a prepared statement */
//prepared statement语句的单个参数的数据
typedef struct pgresParamDesc
{
    //类型ID
    Oid         typid;          /* type id */
} PGresParamDesc;
typedef enum
{
    //空查询串
    PGRES_EMPTY_QUERY = 0,      /* empty query string was executed */
    //后台进程正常执行了没有结果返回的查询命令
    PGRES_COMMAND_OK,           /* a query command that doesn't return
                                 * anything was executed properly by the
                                 * backend */
    //后台进程正常执行了有元组返回的查询命令
    //PGresult中有结果元组
    PGRES_TUPLES_OK,            /* a query command that returns tuples was
                                 * executed properly by the backend, PGresult
                                 * contains the result tuples */
    //拷贝数据OUT,传输中
    PGRES_COPY_OUT,             /* Copy Out data transfer in progress */
    //拷贝数据IN,传输中
    PGRES_COPY_IN,              /* Copy In data transfer in progress */
    //从后台进程中收到非期望中的响应
    PGRES_BAD_RESPONSE,         /* an unexpected response was recv'd from the
                                 * backend */
    //提示或警告信息
    PGRES_NONFATAL_ERROR,       /* notice or warning message */
    //查询失败
    PGRES_FATAL_ERROR,          /* query failed */
    //拷贝I/O,传输中
    PGRES_COPY_BOTH,            /* Copy In/Out data transfer in progress */
    //更大的结果集中的单个元组
    PGRES_SINGLE_TUPLE          /* single tuple from larger resultset */
} ExecStatusType;
typedef union pgresult_data PGresult_data;
union pgresult_data
{
    //链接到下一个block,或者为NULL
    PGresult_data *next;        /* link to next block, or NULL */
    //以字节形式访问块
    char        space[1];       /* dummy for accessing block as bytes */
};

二、源码解读

BaseBackup,实际执行备份的函数.
主要逻辑是通过libpq接口向服务器端发起备份请求(BASE_BACKUP命令)


static void
BaseBackup(void)
{
    PGresult   *res;
    char       *sysidentifier;
    TimeLineID  latesttli;
    TimeLineID  starttli;
    char       *basebkp;
    char        escaped_label[MAXPGPATH];
    char       *maxrate_clause = NULL;
    int         i;
    char        xlogstart[64];
    char        xlogend[64];
    int         minServerMajor,
                maxServerMajor;
    int         serverVersion,
                serverMajor;
    //数据库连接
    Assert(conn != NULL);
    /*
     * Check server version. BASE_BACKUP command was introduced in 9.1, so we
     * can't work with servers older than 9.1.
     * 检查服务器版本.BASE_BACKUP在9.1+才出现,数据库版本不能低于9.1.
     */
    minServerMajor = 901;
    maxServerMajor = PG_VERSION_NUM / 100;
    serverVersion = PQserverVersion(conn);
    serverMajor = serverVersion / 100;
    if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
    {
        const char *serverver = PQparameterStatus(conn, "server_version");
        fprintf(stderr, _("%s: incompatible server version %s\n"),
                progname, serverver ? serverver : "'unknown'");
        exit(1);
    }
    /*
     * If WAL streaming was requested, also check that the server is new
     * enough for that.
     * 要求WAL streaming,检查数据库是否支持
     */
    if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))
    {
        /*
         * Error message already written in CheckServerVersionForStreaming(),
         * but add a hint about using -X none.
         * 错误信息已在CheckServerVersionForStreaming()中体现,这里添加-X提示.
         */
        fprintf(stderr, _("HINT: use -X none or -X fetch to disable log streaming\n"));
        exit(1);
    }
    /*
     * Build contents of configuration file if requested
     * 如需要创建recovery.conf文件
     */
    if (writerecoveryconf)
        GenerateRecoveryConf(conn);
    /*
     * Run IDENTIFY_SYSTEM so we can get the timeline
     * 执行RunIdentifySystem,获取时间线
     */
    if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
        exit(1);
    /*
     * Start the actual backup
     * 开始实际的backup
     */
    PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
    if (maxrate > 0)
        maxrate_clause = psprintf("MAX_RATE %u", maxrate);
    if (verbose)
        //提示信息
        fprintf(stderr,
                _("%s: initiating base backup, waiting for checkpoint to complete\n"),
                progname);
    if (showprogress && !verbose)
    {
        //进度信息
        fprintf(stderr, "waiting for checkpoint");
        if (isatty(fileno(stderr)))
            fprintf(stderr, "\r");
        else
            fprintf(stderr, "\n");
    }
    //base backup命令
    basebkp =
        psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
                 escaped_label,
                 showprogress ? "PROGRESS" : "",
                 includewal == FETCH_WAL ? "WAL" : "",
                 fastcheckpoint ? "FAST" : "",
                 includewal == NO_WAL ? "" : "NOWAIT",
                 maxrate_clause ? maxrate_clause : "",
                 format == 't' ? "TABLESPACE_MAP" : "",
                 verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
    //调用API
    if (PQsendQuery(conn, basebkp) == 0)
    {
        fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                progname, "BASE_BACKUP", PQerrorMessage(conn));
        exit(1);
    }
    /*
     * Get the starting WAL location
     * 获取WAL起始位置
     */
    //获取PQ执行结果
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        fprintf(stderr, _("%s: could not initiate base backup: %s"),
                progname, PQerrorMessage(conn));
        exit(1);
    }
    //判断ntuples
    if (PQntuples(res) != 1)
    {
        fprintf(stderr,
                _("%s: server returned unexpected response to BASE_BACKUP command; got %d rows and %d fields, expected %d rows and %d fields\n"),
                progname, PQntuples(res), PQnfields(res), 1, 2);
        exit(1);
    }
    //获取WAL start位置
    strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));
    if (verbose)
        fprintf(stderr, _("%s: checkpoint completed\n"), progname);
    /*
     * 9.3 and later sends the TLI of the starting point. With older servers,
     * assume it's the same as the latest timeline reported by
     * IDENTIFY_SYSTEM.
     * 9.3+在起始点就传送了TLI.
     */
    if (PQnfields(res) >= 2)
        starttli = atoi(PQgetvalue(res, 0, 1));
    else
        starttli = latesttli;
    PQclear(res);
    MemSet(xlogend, 0, sizeof(xlogend));
    if (verbose && includewal != NO_WAL)
        fprintf(stderr, _("%s: write-ahead log start point: %s on timeline %u\n"),
                progname, xlogstart, starttli);
    /*
     * Get the header
     * 获取头部信息
     */
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        fprintf(stderr, _("%s: could not get backup header: %s"),
                progname, PQerrorMessage(conn));
        exit(1);
    }
    if (PQntuples(res) < 1)
    {
        fprintf(stderr, _("%s: no data returned from server\n"), progname);
        exit(1);
    }
    /*
     * Sum up the total size, for progress reporting
     * 统计总大小,用于进度报告
     */
    totalsize = totaldone = 0;
    tablespacecount = PQntuples(res);
    for (i = 0; i < PQntuples(res); i++)
    {
        totalsize += atol(PQgetvalue(res, i, 2));
        /*
         * Verify tablespace directories are empty. Don't bother with the
         * first once since it can be relocated, and it will be checked before
         * we do anything anyway.
         * 验证表空间目录是否为空.
         * 首次验证不需要报警,因为可以重新定位并且在作其他事情前会检查.
         */
        if (format == 'p' && !PQgetisnull(res, i, 1))
        {
            char       *path = unconstify(char *, get_tablespace_mapping(PQgetvalue(res, i, 1)));
            verify_dir_is_empty_or_create(path, &made_tablespace_dirs, &found_tablespace_dirs);
        }
    }
    /*
     * When writing to stdout, require a single tablespace
     * 在写入stdout时,要求一个独立的表空间.
     */
    if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)
    {
        fprintf(stderr,
                _("%s: can only write single tablespace to stdout, database has %d\n"),
                progname, PQntuples(res));
        exit(1);
    }
    /*
     * If we're streaming WAL, start the streaming session before we start
     * receiving the actual data chunks.
     * 如果正在streaming WAL,开始接收实际的数据chunks前,开始streaming session.
     */
    if (includewal == STREAM_WAL)
    {
        if (verbose)
            fprintf(stderr, _("%s: starting background WAL receiver\n"),
                    progname);
        StartLogStreamer(xlogstart, starttli, sysidentifier);
    }
    /*
     * Start receiving chunks
     * 开始接收chunks
     */
    for (i = 0; i < PQntuples(res); i++)//所有的表空间
    {
        if (format == 't')
            //tar包
            ReceiveTarFile(conn, res, i);
        else
            //普通文件
            ReceiveAndUnpackTarFile(conn, res, i);
    }                           /* Loop over all tablespaces */
    if (showprogress)
    {
        progress_report(PQntuples(res), NULL, true);
        if (isatty(fileno(stderr)))
            fprintf(stderr, "\n");  /* Need to move to next line */
    }
    PQclear(res);
    /*
     * Get the stop position
     */
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        fprintf(stderr,
                _("%s: could not get write-ahead log end position from server: %s"),
                progname, PQerrorMessage(conn));
        exit(1);
    }
    if (PQntuples(res) != 1)
    {
        fprintf(stderr,
                _("%s: no write-ahead log end position returned from server\n"),
                progname);
        exit(1);
    }
    strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
    if (verbose && includewal != NO_WAL)
        fprintf(stderr, _("%s: write-ahead log end point: %s\n"), progname, xlogend);
    PQclear(res);
    //
    res = PQgetResult(conn);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
        if (sqlstate &&
            strcmp(sqlstate, ERRCODE_DATA_CORRUPTED) == 0)
        {
            fprintf(stderr, _("%s: checksum error occurred\n"),
                    progname);
            checksum_failure = true;
        }
        else
        {
            fprintf(stderr, _("%s: final receive failed: %s"),
                    progname, PQerrorMessage(conn));
        }
        exit(1);
    }
    if (bgchild > 0)
    {
#ifndef WIN32
        int         status;
        pid_t       r;
#else
        DWORD       status;
        /*
         * get a pointer sized version of bgchild to avoid warnings about
         * casting to a different size on WIN64.
         */
        intptr_t    bgchild_handle = bgchild;
        uint32      hi,
                    lo;
#endif
        if (verbose)
            fprintf(stderr,
                    _("%s: waiting for background process to finish streaming ...\n"), progname);
#ifndef WIN32//WIN32
        if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
        {
            fprintf(stderr,
                    _("%s: could not send command to background pipe: %s\n"),
                    progname, strerror(errno));
            exit(1);
        }
        /* Just wait for the background process to exit */
        r = waitpid(bgchild, &status, 0);
        if (r == (pid_t) -1)
        {
            fprintf(stderr, _("%s: could not wait for child process: %s\n"),
                    progname, strerror(errno));
            exit(1);
        }
        if (r != bgchild)
        {
            fprintf(stderr, _("%s: child %d died, expected %d\n"),
                    progname, (int) r, (int) bgchild);
            exit(1);
        }
        if (status != 0)
        {
            fprintf(stderr, "%s: %s\n",
                    progname, wait_result_to_str(status));
            exit(1);
        }
        /* Exited normally, we're happy! */
#else                           /* WIN32 */
        /*
         * On Windows, since we are in the same process, we can just store the
         * value directly in the variable, and then set the flag that says
         * it's there.
         * 在Windows平台,因为在同一个进程中,只需要直接存储值到遍历中,然后设置标记即可.
         */
        if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
        {
            fprintf(stderr,
                    _("%s: could not parse write-ahead log location \"%s\"\n"),
                    progname, xlogend);
            exit(1);
        }
        xlogendptr = ((uint64) hi) << 32 | lo;
        InterlockedIncrement(&has_xlogendptr);
        /* First wait for the thread to exit */
        if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
            WAIT_OBJECT_0)
        {
            _dosmaperr(GetLastError());
            fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
                    progname, strerror(errno));
            exit(1);
        }
        if (GetExitCodeThread((HANDLE) bgchild_handle, &status) == 0)
        {
            _dosmaperr(GetLastError());
            fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
                    progname, strerror(errno));
            exit(1);
        }
        if (status != 0)
        {
            fprintf(stderr, _("%s: child thread exited with error %u\n"),
                    progname, (unsigned int) status);
            exit(1);
        }
        /* Exited normally, we're happy */
#endif
    }
    /* Free the configuration file contents */
    //释放配置文件内存
    destroyPQExpBuffer(recoveryconfcontents);
    /*
     * End of copy data. Final result is already checked inside the loop.
     * 拷贝数据完成.最终结果已在循环中检查.
     */
    PQclear(res);
    PQfinish(conn);
    conn = NULL;
    /*
     * Make data persistent on disk once backup is completed. For tar format
     * once syncing the parent directory is fine, each tar file created per
     * tablespace has been already synced. In plain format, all the data of
     * the base directory is synced, taking into account all the tablespaces.
     * Errors are not considered fatal.
     * 在备份结束后,持久化数据在磁盘上.
     * 对于tar格式只需要同步父目录即可,每一个表空间创建一个tar文件,这些文件已同步.
     * 对于普通格式,基础目录中的所有数据已同步,已兼顾了所有的表空间.
     * 错误不会认为是致命的异常.
     */
    if (do_sync)
    {
        if (verbose)
            fprintf(stderr,
                    _("%s: syncing data to disk ...\n"), progname);
        if (format == 't')
        {
            if (strcmp(basedir, "-") != 0)
                (void) fsync_fname(basedir, true, progname);
        }
        else
        {
            (void) fsync_pgdata(basedir, progname, serverVersion);
        }
    }
    if (verbose)
        fprintf(stderr, _("%s: base backup completed\n"), progname);
}
/*
 * PQgetvalue:
 *  return the value of field 'field_num' of row 'tup_num'
 *  返回tuples数组第field_num字段的第tup_num行.
 */
char *
PQgetvalue(const PGresult *res, int tup_num, int field_num)
{
    if (!check_tuple_field_number(res, tup_num, field_num))
        return NULL;
    return res->tuples[tup_num][field_num].value;
}

三、跟踪分析

备份命令


pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R

启动gdb跟踪


[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-110.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later 
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
...
Reading symbols from /appdb/atlasdb/pg11.2/bin/pg_basebackup...done.
(gdb) b BaseBackup
(gdb) set args -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R 
(gdb) r
Starting program: /appdb/atlasdb/pg11.2/bin/pg_basebackup -h localhost -U xdb -p 5432 -D /data/backup -P -Xs -R 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Breakpoint 1, BaseBackup () at pg_basebackup.c:1740
1740        char       *maxrate_clause = NULL;
(gdb)

连接pg_conn结构体


(gdb) n
1749        Assert(conn != NULL);
(gdb) p *conn
$1 = {pghost = 0x6282d0 "localhost", pghostaddr = 0x0, pgport = 0x6282f0 "5432", pgtty = 0x628310 "", 
  connect_timeout = 0x0, client_encoding_initial = 0x0, pgoptions = 0x628330 "", appname = 0x0, 
  fbappname = 0x628350 "pg_basebackup", dbName = 0x6282b0 "replication", replication = 0x6283d0 "true", 
  pguser = 0x628290 "xdb", pgpass = 0x0, pgpassfile = 0x628d30 "/home/xdb/.pgpass", keepalives = 0x0, 
  keepalives_idle = 0x0, keepalives_interval = 0x0, keepalives_count = 0x0, sslmode = 0x628370 "prefer", 
  sslcompression = 0x628390 "0", sslkey = 0x0, sslcert = 0x0, sslrootcert = 0x0, sslcrl = 0x0, requirepeer = 0x0, 
  krbsrvname = 0x6283b0 "postgres", target_session_attrs = 0x6283f0 "any", Pfdebug = 0x0, noticeHooks = {
    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0, 
    noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0, 
  eventArraySize = 0, status = CONNECTION_OK, asyncStatus = PGASYNC_IDLE, xactStatus = PQTRANS_IDLE, 
  queryclass = PGQUERY_SIMPLE, last_query = 0x61f1c0 "SHOW wal_segment_size", last_sqlstate = "\000\000\000\000\000", 
  options_valid = true, nonblocking = false, singleRowMode = false, copy_is_binary = 0 '\000', copy_already_done = 0, 
  notifyHead = 0x0, notifyTail = 0x0, nconnhost = 1, whichhost = 0, connhost = 0x627a50, sock = 7, laddr = {addr = {
      ss_family = 10, __ss_padding = "\307\326", '\000' , "\001", '\000' , 
      __ss_align = 0}, salen = 28}, raddr = {addr = {ss_family = 10, 
      __ss_padding = "\025\070", '\000' , "\001", '\000' , __ss_align = 0}, 
    salen = 28}, pversion = 196608, sversion = 110002, auth_req_received = true, password_needed = false, 
  sigpipe_so = false, sigpipe_flag = true, try_next_addr = false, try_next_host = false, addr_cur = 0x0, 
  setenv_state = SETENV_STATE_IDLE, next_eo = 0x0, send_appname = true, be_pid = 1435, be_key = -828773845, 
  pstatus = 0x629570, client_encoding = 0, std_strings = true, verbosity = PQERRORS_DEFAULT, 
  show_context = PQSHOW_CONTEXT_ERRORS, lobjfuncs = 0x0, inBuffer = 0x61f600 "T", inBufSize = 16384, inStart = 75, 
  inCursor = 75, inEnd = 75, outBuffer = 0x623610 "Q", outBufSize = 16384, outCount = 0, outMsgStart = 1, outMsgEnd = 27, 
  rowBuf = 0x627620, rowBufLen = 32, result = 0x0, next_result = 0x0, sasl_state = 0x0, ssl_in_use = false, 
  allow_ssl_try = false, wait_ssl_try = false, ssl = 0x0, peer = 0x0, engine = 0x0, gctx = 0x0, gtarg_nam = 0x0, 
  errorMessage = {data = 0x627830 "", len = 0, maxlen = 256}, workBuffer = {data = 0x627940 "SELECT", len = 6, 
    maxlen = 256}, addrlist = 0x0, addrlist_family = 0}
(gdb)

判断版本,是否支持BaseBackup


(gdb) n
1755        minServerMajor = 901;
(gdb) 
1756        maxServerMajor = PG_VERSION_NUM / 100;
(gdb) p PG_VERSION_NUM
$2 = 110002
(gdb) n
1757        serverVersion = PQserverVersion(conn);
(gdb) 
1758        serverMajor = serverVersion / 100;
(gdb) 
1759        if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
(gdb) p serverVersion
$3 = 110002
(gdb) n

判断服务器是否支持WAL streaming


(gdb) n
1772        if (includewal == STREAM_WAL && !CheckServerVersionForStreaming(conn))
(gdb)

如需要,生成recovery.conf文件


1785        if (writerecoveryconf)
(gdb) p includewal
$4 = STREAM_WAL
(gdb) p writerecoveryconf
$5 = true
(gdb) n
1786            GenerateRecoveryConf(conn);
(gdb)

获取系统标识符


1791        if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL, NULL))
(gdb) 
1797        PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
(gdb) p sysidentifier
$6 = 0x6292d0 "6662151435832250464"
(gdb) p *sysidentifier
$7 = 54 '6'
(gdb) p latesttli
$8 = 1
(gdb)

开始实际的备份工作


(gdb) p escaped_label
$9 = "pg_basebackup base backup\000\000\000\001", '\000' , "\"`-\360\377\177\000\000\000\000\000\000\377\177\000\000` u\367\377\177\000\000\000\001\000\000\000\000\000\000\001\000\000\000\002\000\000\000]VA\000\000\000\000\000@\335\377\377\377\177\000\000b\343\377\377\377\177", '\000' , "\343\377\377\377\177\000\000B\335\377\377\377\177\000\000\370Ǹ\367\377\177\000\000\220\325\227\367\377\177\000\000\000\341\377\377\377\177\000\000\000\000\000\000\000\000\000\000\371D"...
(gdb) p label
$10 = 0x412610 "pg_basebackup base backup"
(gdb) p i
$11 = 0
(gdb)

构造backup命令


(gdb) n
1802        if (verbose)
(gdb) 
1807        if (showprogress && !verbose)
(gdb) 
1809            fprintf(stderr, "waiting for checkpoint");
(gdb) 
waiting for checkpoint1810          if (isatty(fileno(stderr)))
(gdb) 
1811                fprintf(stderr, "\r");
(gdb) 
1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb) 
1824                     format == 't' ? "TABLESPACE_MAP" : "",
(gdb) 
1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb) 
1822                     includewal == NO_WAL ? "" : "NOWAIT",
(gdb) 
1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb) 
1820                     includewal == FETCH_WAL ? "WAL" : "",
(gdb) 
1817            psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
(gdb) 
1816        basebkp =
(gdb) 
1827        if (PQsendQuery(conn, basebkp) == 0)
(gdb) 
(gdb) p basebkp
$12 = 0x6291f0 "BASE_BACKUP LABEL 'pg_basebackup base backup' PROGRESS   NOWAIT   "
(gdb)

发送命令到服务器端,获取执行结果


(gdb) n
1837        res = PQgetResult(conn);
(gdb) 
1838        if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)

返回结果


(gdb) p *res
$13 = {ntups = 1, numAttributes = 2, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0, 
  paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK, 
  cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {
    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0, 
    noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0, 
  client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680, 
  curOffset = 133, spaceLeft = 1915}
(gdb)
(gdb) p *res->attDescs
$14 = {name = 0x6296c8 "recptr", tableid = 0, columnid = 0, format = 0, typid = 25, typlen = -1, atttypmod = 0}
(gdb) p *res->tuples
$15 = (PGresAttValue *) 0x6296d8
(gdb) p **res->tuples
$16 = {len = 10, value = 0x6296f8 "1/57000028"}
(gdb) p *res->tuples[2]
Cannot access memory at address 0x0
(gdb) p *res->tuples[0]
$17 = {len = 10, value = 0x6296f8 "1/57000028"}
(gdb) p *res->tuples[1]
Cannot access memory at address 0x15171
(gdb)

判断ntuples,获取WAL start位置


(gdb) n
1844        if (PQntuples(res) != 1)
(gdb) 
1852        strlcpy(xlogstart, PQgetvalue(res, 0, 0), sizeof(xlogstart));
(gdb) p PQgetvalue(res, 0, 0)
$18 = 0x6296f8 "1/57000028"
(gdb) p xlogstart
$19 = " `-\360\377\177\000\000\353\340\377\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000' 
(gdb) n
1854        if (verbose)
(gdb) p xlogstart
$20 = "1/57000028\000\377\377\177\000\000\360\340a", '\000' , "\360\337\377\377\377\177", '\000' 
(gdb)

获取时间线timeline


(gdb) n
1862        if (PQnfields(res) >= 2)
(gdb) p PQnfields(res)
$21 = 2
(gdb) n
1863            starttli = atoi(PQgetvalue(res, 0, 1));
(gdb) 
1866        PQclear(res);
(gdb) p atoi(PQgetvalue(res, 0, 1))
$22 = 1
(gdb)  p res->tuples[1]
$23 = (PGresAttValue *) 0x15171
(gdb)  p res->tuples[0]
$24 = (PGresAttValue *) 0x6296d8
(gdb) 
(gdb) n
1867        MemSet(xlogend, 0, sizeof(xlogend));
(gdb) 
1869        if (verbose && includewal != NO_WAL)
(gdb) p xlogend
$25 = '\000' 

Get the header


(gdb) n
1876        res = PQgetResult(conn);
(gdb) n
1877        if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) p *res
$26 = {ntups = 1, numAttributes = 3, attDescs = 0x629688, tuples = 0x629e90, tupArrSize = 128, numParameters = 0, 
  paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK, 
  cmdStatus = "SELECT\000\000\000\000\000\000\000\000\000\000\027\000\000\000\004\000\000\000\377\377\377\377:\226b", '\000' , "\031\000\000\000\377\377\377\377\377\377\377\377B\226b", binary = 0, noticeHooks = {
    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0, 
    noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0, 
  client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x629680, 
  curOffset = 183, spaceLeft = 1865}

统计总大小,用于进度报告


(gdb) 
1892        totalsize = totaldone = 0;
(gdb) n
1893        tablespacecount = PQntuples(res);
(gdb) 
1894        for (i = 0; i < PQntuples(res); i++)
(gdb) p tablespacecount
$29 = 1
(gdb) n
1896            totalsize += atol(PQgetvalue(res, i, 2));
(gdb) p PQgetvalue(res, i, 2)
$30 = 0x629730 "445480"
(gdb) p atol(PQgetvalue(res, i, 2))
$31 = 445480
(gdb) n
1903            if (format == 'p' && !PQgetisnull(res, i, 1))
(gdb) 
1894        for (i = 0; i < PQntuples(res); i++)
(gdb) p res->tuples[0][0]
$33 = {len = -1, value = 0x629658 ""}
(gdb) p res->tuples[0][1]
$34 = {len = -1, value = 0x629658 ""}
(gdb) p res->tuples[0][2]
$35 = {len = 6, value = 0x629730 "445480"}
(gdb)

开始接收实际的数据chunks前,开始streaming session.


(gdb) n
1914        if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)
(gdb) 
1926        if (includewal == STREAM_WAL)
(gdb) 
1928            if (verbose)
(gdb) 
1931            StartLogStreamer(xlogstart, starttli, sysidentifier);
(gdb) n
Detaching after fork from child process 1511.
1937        for (i = 0; i < PQntuples(res); i++)
(gdb)

查看操作系统中的日志目录


[xdb@localhost backup]$ ll
total 0
drwx------. 3 xdb xdb 60 Mar 15 15:46 pg_wal
[xdb@localhost backup]$ ll ./pg_wal/
total 16384
-rw-------. 1 xdb xdb 16777216 Mar 15 15:46 000000010000000100000057
drwx------. 2 xdb xdb        6 Mar 15 15:46 archive_status
[xdb@localhost backup]$

Start receiving chunks,开始接收chunks


(gdb) n
Detaching after fork from child process 1511.
1937        for (i = 0; i < PQntuples(res); i++)
(gdb) n
1939            if (format == 't')
(gdb) 
1942                ReceiveAndUnpackTarFile(conn, res, i);
(gdb) 
193789/445489 kB
(gdb) for (i = 0; i < PQntuples(res); i++)

查看操作系统中的备份目录


[xdb@localhost backup]$ ll
total 56
-rw-------. 1 xdb xdb   226 Mar 15 15:47 backup_label
drwx------. 6 xdb xdb    58 Mar 15 15:48 base
drwx------. 2 xdb xdb  4096 Mar 15 15:48 global
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_commit_ts
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_dynshmem
-rw-------. 1 xdb xdb  4513 Mar 15 15:48 pg_hba.conf
-rw-------. 1 xdb xdb  1636 Mar 15 15:48 pg_ident.conf
drwx------. 4 xdb xdb    68 Mar 15 15:48 pg_logical
drwx------. 4 xdb xdb    36 Mar 15 15:47 pg_multixact
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_notify
drwx------. 2 xdb xdb     6 Mar 15 15:48 pg_replslot
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_serial
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_snapshots
drwx------. 2 xdb xdb     6 Mar 15 15:48 pg_stat
drwx------. 2 xdb xdb     6 Mar 15 15:48 pg_stat_tmp
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_subtrans
drwx------. 2 xdb xdb     6 Mar 15 15:48 pg_tblspc
drwx------. 2 xdb xdb     6 Mar 15 15:47 pg_twophase
-rw-------. 1 xdb xdb     3 Mar 15 15:48 PG_VERSION
drwx------. 3 xdb xdb    92 Mar 15 15:48 pg_wal
drwx------. 2 xdb xdb    18 Mar 15 15:48 pg_xact
-rw-------. 1 xdb xdb    88 Mar 15 15:48 postgresql.auto.conf
-rw-------. 1 xdb xdb 23812 Mar 15 15:48 postgresql.conf
-rw-------. 1 xdb xdb   183 Mar 15 15:48 recovery.conf
[xdb@localhost backup]$

显示进度


(gdb) n
1945        if (showprogress)
(gdb) 
1947            progress_report(PQntuples(res), NULL, true);
(gdb) 
194889/445489 kB (100%),if (isatty(fileno(stderr)))
(gdb) 
(gdb) n
1949                fprintf(stderr, "\n");  /* Need to move to next line */
(gdb) 
1952        PQclear(res);
(gdb)

Get the stop position


(gdb) 
1957        res = PQgetResult(conn);
(gdb) n
1958        if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) p *res
$36 = {ntups = 1, numAttributes = 2, attDescs = 0x6295a8, tuples = 0x629db0, tupArrSize = 128, numParameters = 0, 
  paramDescs = 0x0, resultStatus = PGRES_TUPLES_OK, 
  cmdStatus = "SELECT", '\000' , "\300!", , binary = 0, noticeHooks = {
    noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0, 
    noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0, 
  client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x6295a0, 
  curOffset = 133, spaceLeft = 1915}
(gdb) 
(gdb) n
1965        if (PQntuples(res) != 1)
(gdb) 
1972        strlcpy(xlogend, PQgetvalue(res, 0, 0), sizeof(xlogend));
(gdb) p xlogend
$37 = '\000' 
(gdb) p *xlogend
$38 = 0 '\000'
(gdb) n
1973        if (verbose && includewal != NO_WAL)
(gdb) 
1975        PQclear(res);
(gdb)

COMMAND is OK


(gdb) 
1977        res = PQgetResult(conn);
(gdb) 
1978        if (PQresultStatus(res) != PGRES_COMMAND_OK)
(gdb) p *res
$39 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0, 
  resultStatus = PGRES_COMMAND_OK, cmdStatus = "SELECT", '\000' , "\300!", , 
  binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eab4 , noticeRecArg = 0x0, 
    noticeProc = 0x7ffff7b9eb09 , noticeProcArg = 0x0}, events = 0x0, nEvents = 0, 
  client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, curOffset = 0, 
  spaceLeft = 0}
(gdb)

善后工作,如在备份结束后,持久化数据在磁盘上等


(gdb) n
1997        if (bgchild > 0)
(gdb) 
2014            if (verbose)
(gdb) 
2019            if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
(gdb) 
2028            r = waitpid(bgchild, &status, 0);
(gdb) p bgchild
$40 = 1511
(gdb) n
2029            if (r == -1)
(gdb) 
2035            if (r != bgchild)
(gdb) p r
$41 = 1511
(gdb) n
2041            if (!WIFEXITED(status))
(gdb) 
2047            if (WEXITSTATUS(status) != 0)
(gdb) 
2098        destroyPQExpBuffer(recoveryconfcontents);
(gdb) 
2103        PQclear(res);
(gdb) 
2104        PQfinish(conn);
(gdb) 
2113        if (do_sync)
(gdb) 
2115            if (format == 't')
(gdb) 
2122                (void) fsync_pgdata(basedir, progname, serverVersion);
(gdb) 
2126        if (verbose)
(gdb) 
2128    }
(gdb) 
main (argc=12, argv=0x7fffffffe4b8) at pg_basebackup.c:2534
2534        success = true;
(gdb)

再次启动跟踪,监控后台数据库的活动.
进入BaseBackup函数


Breakpoint 1, BaseBackup () at pg_basebackup.c:1740
1740        char       *maxrate_clause = NULL;
(gdb)

数据库活动


15:56:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
[local] xdb@testdb-# where backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid            | 
datname          | 
pid              | 1566
usesysid         | 10
usename          | xdb
application_name | pg_basebackup
client_addr      | ::1
client_hostname  | 
client_port      | 51162
backend_start    | 2019-03-15 15:56:13.82013+08
xact_start       | 
query_start      | 
state_change     | 2019-03-15 15:56:13.821507+08
wait_event_type  | Client
wait_event       | ClientRead
state            | idle
backend_xid      | 
backend_xmin     | 
query            | 
backend_type     | walsender

开启WAL streaming


1931            StartLogStreamer(xlogstart, starttli, sysidentifier);
(gdb) 
Detaching after fork from child process 1602.
1937        for (i = 0; i < PQntuples(res); i++)
(gdb)

数据库活动


16:01:25 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
where backend_type not in ('walwriter', 'checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid            | 
datname          | 
pid              | 1566
usesysid         | 10
usename          | xdb
application_name | pg_basebackup
client_addr      | ::1
client_hostname  | 
client_port      | 51162
backend_start    | 2019-03-15 15:56:13.82013+08
xact_start       | 
query_start      | 
state_change     | 2019-03-15 16:00:47.345326+08
wait_event_type  | Client
wait_event       | ClientWrite
state            | active
backend_xid      | 
backend_xmin     | 
query            | 
backend_type     | walsender
-[ RECORD 2 ]----+------------------------------
datid            | 
datname          | 
pid              | 1601
usesysid         | 10
usename          | xdb
application_name | pg_basebackup
client_addr      | ::1
client_hostname  | 
client_port      | 51164
backend_start    | 2019-03-15 16:01:47.150434+08
xact_start       | 
query_start      | 
state_change     | 2019-03-15 16:01:47.159234+08
wait_event_type  | Activity
wait_event       | WalSenderMain
state            | active
backend_xid      | 
backend_xmin     | 
query            | 
backend_type     | walsender
16:01:56 (xdb@[local]:5432)testdb=# 
16:01:56 (xdb@[local]:5432)testdb=

拷贝数据


(gdb) 
1942                ReceiveAndUnpackTarFile(conn, res, i);
(gdb) 
193789/445489 kBfor (i = 0; i < PQntuples(res); i++)
(gdb) 
1945        if (showprogress)
(gdb)

数据库活动


...
-[ RECORD 3 ]----+------------------------------
datid            | 
datname          | 
pid              | 1352
usesysid         | 
usename          | 
application_name | 
client_addr      | 
client_hostname  | 
client_port      | 
backend_start    | 2019-03-15 15:03:01.923092+08
xact_start       | 
query_start      | 
state_change     | 
wait_event_type  | Activity
wait_event       | WalWriterMain
state            | 
backend_xid      | 
backend_xmin     | 
query            | 
backend_type     | walwriter

执行善后工作


2113        if (do_sync)
(gdb) 
2115            if (format == 't')
(gdb) 
2122                (void) fsync_pgdata(basedir, progname, serverVersion);
(gdb) 
2126        if (verbose)
(gdb) 
2128    }
(gdb)

数据库活动


16:05:01 (xdb@[local]:5432)testdb=# select * from pg_stat_activity
where backend_type not in ('checkpointer', 'background writer', 'logical replication launcher', 'autovacuum launcher') and query not like '%pg_stat_activity%';
-[ RECORD 1 ]----+------------------------------
datid            | 
datname          | 
pid              | 1352
usesysid         | 
usename          | 
application_name | 
client_addr      | 
client_hostname  | 
client_port      | 
backend_start    | 2019-03-15 15:03:01.923092+08
xact_start       | 
query_start      | 
state_change     | 
wait_event_type  | Activity
wait_event       | WalWriterMain
state            | 
backend_xid      | 
backend_xmin     | 
query            | 
backend_type     | walwriter

DONE!

四、参考资料

PG Source Code


当前文章:PostgreSQL源码解读(150)-PGTools#2(BaseBackup函数)
本文路径:http://csdahua.cn/article/jcjsjp.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流