【PostgreSQL内核学习(十三)—— (PortalRun)】

news/2024/7/9 20:12:47 标签: postgresql, 数据库

PortalRun

  • 概述
  • PortalRun 函数
    • MarkPortalActive 函数
    • PotalSetIoState 函数
    • FillPortalStore 函数
    • DoPortalRunFetch 函数
    • PortalRunSelect 函数
    • PortalRunMulti 函数
    • MarkPortalDone 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了《PostgresSQL数据库内核分析》一书,OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档

概述

  在【PostgreSQL内核学习(八)—— 查询执行(查询执行策略)】中我们了解到,Portal 的执行过程为:CreatePortal —> PortalDefineQuery —> PortalStart —> PortalRun —> PortalDrop 。随后,我们在【PostgreSQL内核学习(十一)—— (CreatePortal)】一文中介绍了 CreatePortal 函数的执行过程。又在【PostgreSQL内核学习(十二)—— (PortalStart)】一文中学习了 PortalStart 的执行过程。本文着重来继续学习 PortalRun 函数。

PortalRun 函数

   PortalRun 函数,用于执行一个 portal查询一系列查询,其主要作用是根据传入的 portal 和参数执行查询,支持不同的查询策略,并进行性能统计和异常处理。函数首先检查 portal执行策略,然后执行相应的操作,包括运行查询填充结果集获取命令完成标签等。如果执行过程中发生错误,函数会捕获异常并标记 portal 为失败状态。最后,函数还会记录执行时间、更新唯一 SQL 统计信息等。其函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

PortalRun 函数的入参如下

  • portal:要执行的 portal 对象,包含了待执行的查询、执行策略等信息
  • count指定要执行的行数或查询次数,如果为负数或 0,则表示不执行查询;如果为 FETCH_ALL,则表示执行所有行。
  • isTopLevel:一个布尔值,表示是否在顶层执行查询,通常来自客户端命令消息。
  • dest:用于接收主要(可以设置标签)查询结果的目标接收器DestReceiver)。
  • altdest:用于接收非主要查询结果的目标接收器,通常用于处理 EXPLAIN 查询。
  • completionTag:一个缓冲区,用于存储命令完成状态字符串的指针,可以为 NULL,如果不为 NULL,则会将命令完成状态存储在其中。
/*
 * PortalRun
 *		运行一个 portal 的查询或查询系列。
 *
 * count <= 0 被解释为无操作:启动并关闭目标,但不执行任何其他操作。
 * 同样,count == FETCH_ALL 被解释为“所有行”。注意,在多查询情况下,我们总是运行 portal 完成。
 *
 * isTopLevel: 如果查询在后端的“顶层”执行(即,直接来自客户端命令消息),则为 true。
 *
 * dest: 主要(可以设置标签)查询输出的目标接收器。
 *
 * altdest: 非主要查询输出的目标接收器。
 *
 * completionTag: 指向一个大小为 COMPLETION_TAG_BUFSIZE 的缓冲区,用于存储命令完成状态字符串。
 *			   如果调用者不需要状态字符串,则可以为 NULL。
 *
 * 返回值为 TRUE 表示 portal 的执行已完成,FALSE 表示因为 count 参数耗尽而暂停执行。
 */
bool PortalRun(Portal portal, long count, bool isTopLevel, DestReceiver* dest, DestReceiver* altdest, char* completionTag)
{
    gstrace_entry(GS_TRC_ID_PortalRun); // 进行性能跟踪
    increase_instr_portal_nesting_level(); // 增加 portal 嵌套级别计数

    bool result = false; // 存储函数的返回值,默认为 false
    uint64 nprocessed; // 存储处理的行数
    ResourceOwner saveTopTransactionResourceOwner; // 保存顶层事务资源拥有者
    MemoryContext saveTopTransactionContext; // 保存顶层事务内存上下文
    Portal saveActivePortal; // 保存当前活动的 portal
    ResourceOwner saveResourceOwner; // 保存当前资源拥有者
    MemoryContext savePortalContext; // 保存 portal 内存上下文
    MemoryContext saveMemoryContext; // 保存当前内存上下文
    errno_t errorno = EOK; // 错误号

    AssertArg(PortalIsValid(portal)); // 检查传入的 portal 参数是否有效
    AssertArg(PointerIsValid(portal->commandTag)); // 检查命令标签是否有效

    char* old_stmt_name = u_sess->pcache_cxt.cur_stmt_name; // 保存旧的语句名称
    u_sess->pcache_cxt.cur_stmt_name = (char*)portal->prepStmtName; // 设置当前语句名称为 portal 的预处理语句名称
    CmdType cmdType = CMD_UNKNOWN; // 初始化命令类型
    CmdType queryType = CMD_UNKNOWN; // 初始化查询类型

    // 在进行性能统计时,匹配 portal->commandTag 与 CmdType
    if (isTopLevel && u_sess->attr.attr_common.pgstat_track_activities &&
        u_sess->attr.attr_common.pgstat_track_sql_count && !u_sess->attr.attr_sql.enable_cluster_resize) {
        cmdType = set_cmd_type(portal->commandTag);
        queryType = set_command_type_by_commandTag(portal->commandTag);
    }

    PGSTAT_INIT_TIME_RECORD(); // 初始化性能统计时间记录

    TRACE_POSTGRESQL_QUERY_EXECUTE_START(); // 记录查询执行开始

    // 初始化完成标签为空字符串
    if (completionTag != NULL)
        completionTag[0] = '\0';

    if (portal->strategy != PORTAL_MULTI_QUERY) { // 根据 portal 的策略选择执行方式
        if (u_sess->attr.attr_common.log_executor_stats) {
            elog(DEBUG3, "PortalRun");
            /* PORTAL_MULTI_QUERY 会记录各自查询的统计信息 */
            ResetUsage();
        }
        PGSTAT_START_TIME_RECORD(); // 记录查询开始时间
    }

    // 检查 portal 的合法性,并标记 portal 为活动状态
    MarkPortalActive(portal);

    QueryDesc* queryDesc = portal->queryDesc; // 获取 portal 的查询描述

    // 增加资源池计数
    if (IS_PGXC_DATANODE && queryDesc != NULL && (queryDesc->plannedstmt) != NULL &&
        queryDesc->plannedstmt->has_obsrel) {
        increase_rp_number();
    }

    /*
     * 设置全局 portal 上下文指针。
     *
     * 我们必须在这里做一个特殊的处理来支持像 VACUUM 和 CLUSTER 这样的实用程序命令,
     * 这些命令在内部启动和提交事务。当我们被调用来执行这样一个命令时,
     * CurrentResourceOwner 将指向 TopTransactionResourceOwner,
     * 但 TopTransactionResourceOwner 在内部提交和重启的过程中会被销毁和替换。
     * 因此,我们需要准备将其还原为指向退出时的 TopTransactionResourceOwner。
     * (这个内部启动新事务的想法不太好。)
     * CurrentMemoryContext 也存在类似的问题,但我们保存的其他指针要么是 NULL,
     * 要么指向寿命较长的对象。
     */
    saveTopTransactionResourceOwner = t_thrd.utils_cxt.TopTransactionResourceOwner;
    saveTopTransactionContext = u_sess->top_transaction_mem_cxt;
    saveActivePortal = ActivePortal;
    saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner;
    savePortalContext = t_thrd.mem_cxt.portal_mem_cxt;
    saveMemoryContext = CurrentMemoryContext;

    PotalSetIoState(portal); // 设置 portal 的 I/O 状态

    /* 工作负载客户端管理 */
    if (ENABLE_WORKLOAD_CONTROL && queryDesc != NULL) {
        WLMTopSQLReady(queryDesc);
        WLMInitQueryPlan(queryDesc);
        dywlm_client_manager(queryDesc);
    }

    PG_TRY(); // 开始捕获异常
    {
        ActivePortal = portal; // 设置当前活动的 portal
        t_thrd.utils_cxt.CurrentResourceOwner = portal->resowner; // 设置当前资源拥有者
        t_thrd.mem_cxt.portal_mem_cxt = PortalGetHeapMemory(portal); // 设置 portal 内存上下文

        MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); // 切换到 portal 内存上下文

        switch (portal->strategy) {
            case PORTAL_ONE_SELECT:
            case PORTAL_ONE_RETURNING:
            case PORTAL_ONE_MOD_WITH:
            case PORTAL_UTIL_SELECT:

                /*
                 * 如果我们尚未运行命令,则运行命令并将其结果存储在 portal 的 tuplestore 中。
                 * 但对于 PORTAL_ONE_SELECT 情况,我们不这样做。
                 */
                if (portal->strategy != PORTAL_ONE_SELECT && !portal->holdStore) {
                    /* DestRemoteExecute 不能自动发送 T 消息 */
                    if (strcmp(portal->commandTag, "EXPLAIN") == 0 && dest->mydest != DestRemote)
                        t_thrd.explain_cxt.explain_perf_mode = EXPLAIN_NORMAL;
                    FillPortalStore(portal, isTopLevel);
                }

                /*
                 * 现在获取所需数量的结果。
                 */
                nprocessed = PortalRunSelect(portal, true, count, dest);

                /*
                 * 如果 portal 结果包含命令标签并且调用者提供了一个指针来存储它,
                 * 则将命令标签复制到 completionTag 中。
                 * 修改 "SELECT" 标签以提供行数。
                 */
                if (completionTag != NULL) {
                    if (strcmp(portal->commandTag, "SELECT") == 0) {
                        errorno = snprintf_s(completionTag,
                            COMPLETION_TAG_BUFSIZE,
                            COMPLETION_TAG_BUFSIZE - 1,
                            "SELECT %lu",
                            nprocessed);
                        securec_check_ss(errorno, "\0", "\0");
                    } else {
                        errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE,
                            portal->commandTag);
                        securec_check(errorno, "\0", "\0");
                    }
                }

                /* 标记 portal 为非活动状态 */
                portal->status = PORTAL_READY;

                /*
                 * 由于这是前向提取,如果 atEnd 现在为真,则表示完成。
                 */
                result = portal->atEnd;
                break;

            case PORTAL_MULTI_QUERY:
                PortalRunMulti(portal, isTopLevel, dest, altdest, completionTag);

                /* 防止 portal 的命令被重新执行 */
                MarkPortalDone(portal);
                /* RunMulti 总是在结束时完成 */
                result = true;
                break;

            default:
                ereport(ERROR,
                    (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
                        errmodule(MOD_EXECUTOR),
                        errmsg("Unrecognized portal strategy: %d", (int)portal->strategy)));
                result = false; /* 保持编译器安静 */
                break;
        }
    }
    PG_CATCH(); // 捕获异常
    {
        /* 在执行 portal 时未捕获的错误:标记为失败 */
        MarkPortalFailed(portal);

        /* 还原全局变量并传播错误 */
        if (saveMemoryContext == saveTopTransactionContext)
            MemoryContextSwitchTo(u_sess->top_transaction_mem_cxt);
        else
            MemoryContextSwitchTo(saveMemoryContext);
        ActivePortal = saveActivePortal;
        if (saveResourceOwner == saveTopTransactionResourceOwner)
            t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.TopTransactionResourceOwner;
        else
            t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
        t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;

        if (ENABLE_WORKLOAD_CONTROL) {
            /* 保存错误信息到历史信息 */
            save_error_message();
            if (g_instance.wlm_cxt->dynamic_workload_inited) {
                t_thrd.wlm_cxt.parctl_state.errjmp = 1;
                if (t_thrd.wlm_cxt.parctl_state.simple == 0)
                    dywlm_client_release(&t_thrd.wlm_cxt.parctl_state);
                else
                    WLMReleaseGroupActiveStatement();
                dywlm_client_max_release(&t_thrd.wlm_cxt.parctl_state);
            } else
                WLMParctlRelease(&t_thrd.wlm_cxt.parctl_state);

            if (IS_PGXC_COORDINATOR && t_thrd.wlm_cxt.collect_info->sdetail.msg) {
                pfree_ext(t_thrd.wlm_cxt.collect_info->sdetail.msg);
            }
        }

        PG_RE_THROW(); // 重新抛出异常
    }
    PG_END_TRY(); // 结束异常捕获

    if (ENABLE_WORKLOAD_CONTROL) {
        t_thrd.wlm_cxt.parctl_state.except = 0;

        if (g_instance.wlm_cxt->dynamic_workload_inited && (t_thrd.wlm_cxt.parctl_state.simple == 0)) {
            dywlm_client_release(&t_thrd.wlm_cxt.parctl_state);
        } else {
            // 只释放资源池计数
            if (IS_PGXC_COORDINATOR && !IsConnFromCoord() &&
                (u_sess->wlm_cxt->parctl_state_exit || IsQueuedSubquery())) {
                WLMReleaseGroupActiveStatement();
            }
        }
    }

    if (saveMemoryContext == saveTopTransactionContext)
        MemoryContextSwitchTo(u_sess->top_transaction_mem_cxt);
    else
        MemoryContextSwitchTo(saveMemoryContext);
    ActivePortal = saveActivePortal;
    if (saveResourceOwner == saveTopTransactionResourceOwner)
        t_thrd.utils_cxt.CurrentResourceOwner = t_thrd.utils_cxt.TopTransactionResourceOwner;
    else
        t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
    t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;

    if (portal->strategy != PORTAL_MULTI_QUERY) {
        PGSTAT_END_TIME_RECORD(EXECUTION_TIME); // 记录执行时间

        if (u_sess->attr.attr_common.log_executor_stats)
            ShowUsage("EXECUTOR STATISTICS"); // 显示执行器统计信息
    }
    TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); // 记录查询执行完成

    /* 根据 cmdType 进行 SQL 计数 */
    if (cmdType != CMD_UNKNOWN || queryType != CMD_UNKNOWN) {
        report_qps_type(cmdType);
        report_qps_type(queryType);
    }

    /* 更新唯一 SQL 统计信息 */
    if (is_instr_top_portal() && is_unique_sql_enabled() && is_local_unique_sql()) {
        /* 唯一 SQL 返回行数 */
        if (portal->queryDesc != NULL && portal->queryDesc->estate && portal->queryDesc->estate->es_plannedstmt &&
            portal->queryDesc->estate->es_plannedstmt->commandType == CMD_SELECT) {
            ereport(DEBUG1,
                (errmodule(MOD_INSTR),
                    errmsg("[UniqueSQL]"
                           "unique id: %lu , select returned rows: %lu",
                        u_sess->unique_sql_cxt.unique_sql_id,
                        portal->queryDesc->estate->es_processed)));
            UniqueSQLStatCountReturnedRows(portal->queryDesc->estate->es_processed);
        }

        /* 使用 unique_sql_start_time 作为唯一 SQL 开始时间 */
        if (IsNeedUpdateUniqueSQLStat(portal) && IS_UNIQUE_SQL_TRACK_TOP && IsTopUniqueSQL()) {
            UpdateUniqueSQLStat(NULL, NULL, u_sess->unique_sql_cxt.unique_sql_start_time);
        }

        if (u_sess->unique_sql_cxt.unique_sql_start_time != 0) {
            int64 duration = GetCurrentTimestamp() - u_sess->unique_sql_cxt.unique_sql_start_time;
            if (IS_SINGLE_NODE) {
                pgstat_update_responstime_singlenode(
                    u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_start_time, duration);
            } else {
                pgstat_report_sql_rt(
                    u_sess->unique_sql_cxt.unique_sql_id, u_sess->unique_sql_cxt.unique_sql_start_time, duration);
            }
        }
    }
    decrease_instr_portal_nesting_level(); // 减少 portal 嵌套级别计数
    gstrace_exit(GS_TRC_ID_PortalRun); // 结束性能跟踪
    u_sess->pcache_cxt.cur_stmt_name = old_stmt_name; // 恢复旧的语句名称
    return result; // 返回结果
}

MarkPortalActive 函数

  MarkPortalActive 函数用于检查 portal 的合法性,并标记 portal 为活动状态。具体来说,MarkPortalActive 函数用于将一个 portal 从 “READY” 状态标记为 “ACTIVE” 状态。portal 在 “READY” 状态时表示可以运行,而在 “ACTIVE” 状态时表示正在执行中。函数的主要作用是确保 portal 处于 “READY” 状态才能被标记为 “ACTIVE” 状态,否则会抛出错误。

具体函数行为如下:

  1. 函数首先检查传入的 portal 的状态,确保它当前处于 “READY” 状态。如果 portal 不在 “READY” 状态,将会抛出一个错误,表明该 portal 不能被运行。
  2. 如果 portal 确实处于 “READY” 状态,那么函数将执行状态转换,将其标记为 “ACTIVE” 状态。
  3. 函数还记录了 portal 进入 “ACTIVE” 状态时的当前子事务 ID(SubTransactionId),以便跟踪 portal 在事务中的活动状态。

  MarkPortalActive 函数源码如下:(路径:src/common/backend/utils/mmgr/portalmem.cpp

/*
 * MarkPortalActive
 * 将一个 portal 从 READY 状态转换为 ACTIVE 状态。
 *
 * 注意:不要直接将 portal->status = PORTAL_ACTIVE;而是使用该函数。
 */
void MarkPortalActive(Portal portal)
{
    /* 为了安全性,这是一个在运行时进行测试而不仅仅是一个断言 */
    if (portal->status != PORTAL_READY)
        ereport(ERROR,
            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("无法运行 portal \"%s\"", portal->name)));
    /* 执行状态转换 */
    portal->status = PORTAL_ACTIVE;
    portal->activeSubid = GetCurrentSubTransactionId();
}

PotalSetIoState 函数

  PotalSetIoState 函数用于根据 portal 的命令标签(commandTag)来设置数据库IO 状态和后端状态。这些设置对于统计数据库的 IO 活动跟踪后端执行的读写操作非常有用,可以用于性能分析和监视

具体来说,它检查 portal 的命令标签,然后根据不同的命令标签来设置数据库的 IO 状态和后端状态

  1. 如果命令标签是 “COPY”、“UPDATE”、“INSERT”、“CREATE TABLE”、“ALTER TABLE”、“CREATE INDEX”、“REINDEX INDEX” 或 “CLUSTER”,则将数据库IO 状态设置为写(IOSTATE_WRITE)。
  2. 如果命令标签是 “VACUUM”,则将数据库IO 状态设置为 VACUUMIOSTATE_VACUUM)。
  3. 如果命令标签是 “ANALYZE”,则将数据库IO 状态设置为读(IOSTATE_READ)。
  4. 最后,它根据是否是 “INSERT”、“UPDATE”、“CREATE TABLE AS”、“CREATE INDEX”、“ALTER TABLE” 或 “CLUSTER” 命令来设置后端状态STMTTAG_WRITE),以便后续检查默认事务的只读属性

PotalSetIoState 函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

void PotalSetIoState(Portal portal)
{
    // 如果命令标签是 "COPY"
    if (strcmp(portal->commandTag, "COPY") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "VACUUM"
    if (strcmp(portal->commandTag, "VACUUM") == 0)
        // 将数据库的IO状态设置为 VACUUM(IOSTATE_VACUUM)
        pgstat_set_io_state(IOSTATE_VACUUM);

    // 如果命令标签是 "UPDATE"
    if (strcmp(portal->commandTag, "UPDATE") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "INSERT"
    if (strcmp(portal->commandTag, "INSERT") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "CREATE TABLE"
    if (strcmp(portal->commandTag, "CREATE TABLE") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "ALTER TABLE"
    if (strcmp(portal->commandTag, "ALTER TABLE") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "CREATE INDEX"
    if (strcmp(portal->commandTag, "CREATE INDEX") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "REINDEX INDEX"
    if (strcmp(portal->commandTag, "REINDEX INDEX") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "CLUSTER"
    if (strcmp(portal->commandTag, "CLUSTER") == 0)
        // 将数据库的IO状态设置为写(IOSTATE_WRITE)
        pgstat_set_io_state(IOSTATE_WRITE);

    // 如果命令标签是 "ANALYZE"
    if (strcmp(portal->commandTag, "ANALYZE") == 0)
        // 将数据库的IO状态设置为读(IOSTATE_READ)
        pgstat_set_io_state(IOSTATE_READ);

    /* 设置后端状态(用于检查默认事务的只读属性) */

    // 首先将后端状态设置为无(STMTTAG_NONE)
    pgstat_set_stmt_tag(STMTTAG_NONE);

    // 如果命令标签是 "INSERT"、"UPDATE"、"CREATE TABLE AS"、"CREATE INDEX"、"ALTER TABLE" 或 "CLUSTER"
    if (strcmp(portal->commandTag, "INSERT") == 0 || strcmp(portal->commandTag, "UPDATE") == 0 ||
        strcmp(portal->commandTag, "CREATE TABLE AS") == 0 || strcmp(portal->commandTag, "CREATE INDEX") == 0 ||
        strcmp(portal->commandTag, "ALTER TABLE") == 0 || strcmp(portal->commandTag, "CLUSTER") == 0)
    {
        // 将后端状态设置为写(STMTTAG_WRITE)
        pgstat_set_stmt_tag(STMTTAG_WRITE);
    }
}

FillPortalStore 函数

  FillPortalStore 函数用于执行查询并将其结果加载到 portal 的元组存储区域中。它支持以下情况:

  • PORTAL_ONE_RETURNINGPORTAL_ONE_MOD_WITH:运行 portal 中的查询并将主查询的输出发送到 tuple store 中,辅助查询的输出将被丢弃。
  • PORTAL_UTIL_SELECT:运行实用程序查询并将其输出发送到 tuple store 中。

  在每种情况下,都会创建一个目标接收器DestReceiver)来处理结果,并将结果存储在 portalholdStore 中。完成标记(completionTag)用于指示查询的执行状态,最后,将实际命令结果覆盖 portal 的命令标签。如果不支持的策略被传入,函数将抛出一个错误。其函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

/*
 * FillPortalStore
 *		运行查询并将结果元组加载到portal的元组存储区域(tuple store)中。
 *
 * 仅用于 PORTAL_ONE_RETURNING、PORTAL_ONE_MOD_WITH 和 PORTAL_UTIL_SELECT 情况。
 */
static void FillPortalStore(Portal portal, bool isTopLevel)
{
    DestReceiver* treceiver = NULL;
    char completionTag[COMPLETION_TAG_BUFSIZE];

    // 创建一个用于保存结果的 tuple store
    PortalCreateHoldStore(portal);
    // 创建一个目标接收器用于接收查询结果
    treceiver = CreateDestReceiver(DestTuplestore);
    // 设置目标接收器参数,将结果发送到 portal 的 holdStore 中
    SetTuplestoreDestReceiverParams(treceiver, portal->holdStore, portal->holdContext, false);

    // 初始化完成标记为空字符串
    completionTag[0] = '\0';

    switch (portal->strategy) {
        case PORTAL_ONE_RETURNING:
        case PORTAL_ONE_MOD_WITH:

            /*
             * 就像默认的 MULTI_QUERY 情况一样,运行 portal 以完成操作,但将主查询的输出发送到 tuple store 中,
             * 辅助查询的输出将被丢弃。
             */
            PortalRunMulti(portal, isTopLevel, treceiver, None_Receiver, completionTag);
            break;

        case PORTAL_UTIL_SELECT:
            // 运行实用程序查询,将输出发送到 tuple store 中
            PortalRunUtility(portal, (Node*)linitial(portal->stmts), isTopLevel, treceiver, completionTag);
            break;

        default:
            ereport(ERROR,
                (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
                    errmodule(MOD_EXECUTOR),
                    errmsg("unsupported portal strategy: %d", (int)portal->strategy)));
            break;
    }

    // 使用实际命令结果覆盖默认的完成标记
    if (completionTag[0] != '\0')
        portal->commandTag = pstrdup(completionTag);

    // 销毁目标接收器
    (*treceiver->rDestroy)(treceiver);
}

注:

  1. PortalRunMulti 函数
  • 用于运行 portal 中的多个查询。
  • 它通常用于处理 PORTAL_MULTI_QUERY 策略,该策略允许一个 portal 包含多个查询,并按顺序执行它们。
  • FillPortalStore 函数中,当 portal->strategy 的值为PORTAL_ONE_RETURNINGPORTAL_ONE_MOD_WITH 时,会调用 PortalRunMulti 来执行查询并将主查询的输出发送到 tuple store 中。
  1. PortalRunUtility 函数
  • 用于运行实用程序查询,如 COPY、VACUUM、CREATE TABLE 等。
  • 它接收一个查询节点Node*)作为参数,并运行该查询。
  • FillPortalStore 函数中,当 portal->strategy 的值为PORTAL_UTIL_SELECT 时,会调用 PortalRunUtility 来运行实用程序查询,并将其输出发送到 tuple store 中。

DoPortalRunFetch 函数

  DoPortalRunFetch 函数是用于执行 portal 数据抓取操作的核心函数,根据不同的抓取方向和行数,执行相应的查询并返回抓取的行数。函数返回一个 long 类型的值,表示处理的行数,通常用于结果标签。

函数参数解释

  1. portal:要执行抓取操作的 portal
  2. fdirection:抓取的方向,可以是 FETCH_FORWARD(向前抓取)、FETCH_BACKWARD(向后抓取)、FETCH_ABSOLUTE(绝对抓取)或 FETCH_RELATIVE(相对抓取)。
  3. count:抓取的行数或位置。具体的意义取决于抓取方向。
  4. dest:目标接收器,用于指定抓取的结果输出方式。

  函数首先对参数进行了一些合法性检查初始化操作,然后根据不同的抓取方向行数执行相应的抓取操作。以下是不同抓取方向和行数的处理方式:

标 志解 释
FETCH_FORWARD / FETCH_BACKWARD根据指定的方向抓取指定数量的行
FETCH_ABSOLUTE根据绝对位置抓取指定位置的行
FETCH_RELATIVE根据相对位置抓取指定位置的行

  count 为正数时表示向前抓取,为负数时表示向后抓取,为零时表示重新抓取当前行
  函数源码如下所示:(路径:src/gausskernel/process/tcop/pquery.cpp

/*
 * DoPortalRunFetch
 *		Guts of PortalRunFetch --- the portal context is already set up
 *
 * Returns number of rows processed (suitable for use in result tag)
 */
static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, DestReceiver* dest)
{
    bool forward = false;

    // 断言:portal 策略为 PORTAL_ONE_SELECT、PORTAL_ONE_RETURNING、PORTAL_ONE_MOD_WITH、PORTAL_UTIL_SELECT 之一
    AssertEreport(portal->strategy == PORTAL_ONE_SELECT || portal->strategy == PORTAL_ONE_RETURNING ||
                      portal->strategy == PORTAL_ONE_MOD_WITH || portal->strategy == PORTAL_UTIL_SELECT,
        MOD_EXECUTOR,
        "portal strategy is not select, returning, mod_with, or util select");

    /* workload client manager */
    // 如果启用了工作负载管理并且 portal->queryDesc 尚未执行,则进行资源跟踪初始化
    if (ENABLE_WORKLOAD_CONTROL && portal->queryDesc && !portal->queryDesc->executed) {
        if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE) {
            // 检查是否需要跟踪资源
            u_sess->exec_cxt.need_track_resource = WLMNeedTrackResource(portal->queryDesc);

            // 如果需要跟踪资源且当前没有游标记录,则添加游标记录
            if (u_sess->exec_cxt.need_track_resource && t_thrd.wlm_cxt.collect_info->sdetail.statement &&
                portal->queryDesc->sourceText && !t_thrd.wlm_cxt.has_cursor_record) {
                USE_MEMORY_CONTEXT(g_instance.wlm_cxt->query_resource_track_mcxt);

                // 设置 I/O 状态为读
                pgstat_set_io_state(IOSTATE_READ);

                // 构造完整的查询字符串
                uint32 query_str_len = strlen(t_thrd.wlm_cxt.collect_info->sdetail.statement) +
                                       strlen(portal->queryDesc->sourceText) + 3; /* 3 是 "()" 和 '\0' 的长度 */
                char* query_str = (char*)palloc0(query_str_len);
                int rc = snprintf_s(query_str,
                    query_str_len,
                    query_str_len - 1,
                    "%s(%s)",
                    t_thrd.wlm_cxt.collect_info->sdetail.statement,
                    portal->queryDesc->sourceText);
                securec_check_ss(rc, "\0", "\0");

                pfree_ext(t_thrd.wlm_cxt.collect_info->sdetail.statement);
                t_thrd.wlm_cxt.collect_info->sdetail.statement = query_str;

                uint32 hashCode = WLMHashCode(&u_sess->wlm_cxt->wlm_params.qid, sizeof(Qid));
                LockSessRealTHashPartition(hashCode, LW_EXCLUSIVE);
                WLMDNodeInfo* info = (WLMDNodeInfo*)hash_search(g_instance.wlm_cxt->stat_manager.collect_info_hashtbl,
                    &u_sess->wlm_cxt->wlm_params.qid,
                    HASH_FIND,
                    NULL);
                if (info != NULL) {
                    pfree_ext(info->statement);
                    info->statement = pstrdup(t_thrd.wlm_cxt.collect_info->sdetail.statement);
                    t_thrd.wlm_cxt.has_cursor_record = true;
                }

                UnLockSessRealTHashPartition(hashCode);
            }
        }

        // 初始化查询计划和启用动态工作负载管理
        WLMInitQueryPlan(portal->queryDesc);
        dywlm_client_manager(portal->queryDesc);
    }

    switch (fdirection) {
        case FETCH_FORWARD:
            if (count < 0) {
                fdirection = FETCH_BACKWARD;
                count = -count;
            }
            /* 从 switch 语句中跳出,与 FETCH_BACKWARD 共享代码 */
            break;
        case FETCH_BACKWARD:
            if (count < 0) {
                fdirection = FETCH_FORWARD;
                count = -count;
            }
            /* 从 switch 语句中跳出,与 FETCH_FORWARD 共享代码 */
            break;
        case FETCH_ABSOLUTE:
            if (count > 0) {
                /*
                 * 定义:倒回到起始位置,前进 count-1 行,返回下一行(如果存在)。
                 * 如果目标位置小于 portalPos,则需要回到起始位置,否则可以向前抓取目标行。
                 */
                if (portal->posOverflow || portal->portalPos == LONG_MAX || count - 1 < portal->portalPos) {
                    DoPortalRewind(portal);
                    if (count > 1)
                        (void)PortalRunSelect(portal, true, count - 1, None_Receiver);
                } else {
                    long pos = portal->portalPos;

                    if (portal->atEnd)
                        pos++; /* 如果已经在结束位置,需要额外抓取一行 */
                    if (count <= pos)
                        (void)PortalRunSelect(portal, false, pos - count + 1, None_Receiver);
                    else if (count > pos + 1)
                        (void)PortalRunSelect(portal, true, count - pos - 1, None_Receiver);
                }
                return PortalRunSelect(portal, true, 1L, dest);
            } else if (count < 0) {
                /*
                 * 定义:前进到末尾,倒退 abs(count)-1 行,返回前一行(如果存在)。
                 * 如果我们事先不知道末尾在哪里,我们就无法进行优化(考虑 count > 查询大小的一半的情况?我们可以在知道大小后进行回绕...)。
                 */
                (void)PortalRunSelect(portal, true, FETCH_ALL, None_Receiver);
                if (count < -1)
                    (void)PortalRunSelect(portal, false, -count - 1, None_Receiver);
                return PortalRunSelect(portal, false, 1L, dest);
            } else {
                /* 回到起始位置,不返回任何行 */
                DoPortalRewind(portal);
                return PortalRunSelect(portal, true, 0L, dest);
            }
            break;
        case FETCH_RELATIVE:
            if (count > 0) {
                /*
                 * 定义:前进 count-1 行,返回下一行(如果存在)。
                 */
                if (count > 1)
                    (void)PortalRunSelect(portal, true, count - 1, None_Receiver);
                return PortalRunSelect(portal, true, 1L, dest);
            } else if (count < 0) {
                /*
                 * 定义:倒退 abs(count)-1 行,返回前一行(如果存在)。
                 */
                if (count < -1)
                    (void)PortalRunSelect(portal, false, -count - 1, None_Receiver);
                return PortalRunSelect(portal, false, 1L, dest);
            } else {
                /* 与 FETCH FORWARD 0 相同,从 switch 语句中跳出 */
                fdirection = FETCH_FORWARD;
            }
            break;
        default:
            ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                    errmodule(MOD_EXECUTOR),
                    errmsg("bogus direction")));
            break;
    }

    /*
     * 到达此处时,fdirection 为 FETCH_FORWARD 或 FETCH_BACKWARD,count >= 0。
     */
    forward = (fdirection == FETCH_FORWARD);

    /*
     * 当 count 为零时,表示重新抓取当前行(符合 SQL92 的定义)
     */
    if (count == 0) {
        bool on_row = false;

        /* 当前是否处于某一行上 */
        on_row = (!portal->atStart && !portal->atEnd);

        if (dest->mydest == DestNone) {
            /* MOVE 0 根据 FETCH 0 是否返回行来返回 0/1 */
            return on_row ? 1L : 0L;
        } else {
            /*
             * 如果当前处于某一行上,倒退一行以重新抓取该行。
             * 如果当前没有处于某一行上,我们仍然需要启动和关闭执行器,以确保目标被正确初始化和关闭。
             * 对于 PortalRunSelect,count == 0 表示我们不会检索任何行。
             */
            if (on_row) {
                (void)PortalRunSelect(portal, false, 1L, None_Receiver);
                /* 设置为前进一行以抓取一行 */
                count = 1;
                forward = true;
            }
        }
    }

    /*
     * 优化 MOVE BACKWARD ALL 为 Rewind 操作。
     */
    if (!forward && count == FETCH_ALL && dest->mydest == DestNone) {
        long result = portal->portalPos;

        if (result > 0 && !portal->atEnd)
            result--;
        DoPortalRewind(portal);
        /* 如果 pos 已溢出,result 是不准确的,但这是我们能做到的最好的 */
        return result;
    }

    return PortalRunSelect(portal, forward, count, dest);
}

PortalRunSelect 函数

  PortalRunSelect 函数根据传入的参数,在指定的 portal 中执行查询,按照指定的方向和行数获取结果集的一部分,并返回实际处理的行数。具体来说,PortalRunSelect 函数是在指定的 portal执行查询,并根据参数指定的方向行数获取结果集的一部分。它主要用于在PORTAL_ONE_SELECT模式下执行查询,或者从已完成的 “holdStore” 中获取结果行,也支持在结果集中按指定的方向前进后退指定数量的行数。函数内部会根据查询方向、行数以及 “holdStore是否可用来调用执行器,并返回实际处理的行数
  其函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

/*
 * PortalRunSelect
 *		在 PORTAL_ONE_SELECT 模式下执行一个 portal 的查询,也用于从已完成的 holdStore
 *		中获取结果行,对于更复杂的非顺序访问 portal,参见 PortalRunFetch。
 *
 * count <= 0 被解释为无操作:目标会启动和关闭,但没有其他操作。
 * 同样,count == FETCH_ALL 被解释为“所有行”。
 *
 * 调用者必须已经验证了 Portal 并进行了适当的设置(参见 PortalRun)。
 *
 * 返回处理的行数(适用于结果标签)
 */
static uint64 PortalRunSelect(Portal portal, bool forward, long count, DestReceiver* dest)
{
    QueryDesc* queryDesc = NULL;
    ScanDirection direction;
    uint64 nprocessed;

    /*
     * 如果我们从已完成的光标或已完成的实用程序查询中获取数据,那么 queryDesc 将为 NULL;在该路径中不能使用它。
     */
    queryDesc = PortalGetQueryDesc(portal);

    /* 如果没有准备好的查询和已保留的数据,那么调用者搞错了 */
    AssertEreport(queryDesc || portal->holdStore, MOD_EXECUTOR, "have no ready query or held data");

    /*
     * 强制将 queryDesc 的目标设置为正确的值。这支持 MOVE,例如,将传入 dest = DestNone。只要我们在每次获取时进行,就可以更改这一点。
     * (执行器不得假定 dest 从不更改。)
     */
    if (queryDesc != NULL)
        queryDesc->dest = dest;

    /*
     * 确定前进的方向,并检查我们是否已经处于该方向上可用元组的末尾。
     * 如果是这样,将方向设置为 NoMovementScanDirection,以避免尝试获取任何元组。(此检查存在是因为并非所有计划节点类型都能够在已经返回 NULL 一次的情况下再次被调用。)
     * 然后调用执行器(我们不能跳过这一步,因为目标需要查看设置和关闭,即使没有可用的元组)。
     * 最后,根据检索到的元组数量更新 portal 位置状态。
     */
    if (forward) {
        if (portal->atEnd || count <= 0)
            direction = NoMovementScanDirection;
        else
            direction = ForwardScanDirection;

        /* 在执行器中,零 count 处理所有行 */
        if (count == FETCH_ALL)
            count = 0;

        if (portal->holdStore) {
            /* 如果是解释计划语句,我们已经在 ExplainQuery 中更改了标记 */
            if (strcmp(portal->commandTag, "EXPLAIN") == 0 || strcmp(portal->commandTag, "EXPLAIN SUCCESS") == 0)
                nprocessed = RunFromExplainStore(portal, direction, dest);
            else
                nprocessed = RunFromStore(portal, direction, count, dest);
        } else {
#ifdef ENABLE_MOT
            if (!(portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
#endif
                PushActiveSnapshot(queryDesc->snapshot);
#ifdef ENABLE_MOT
            }
#endif

#ifdef PGXC
            if (portal->name != NULL && portal->name[0] != '\0' && IsA(queryDesc->planstate, RemoteQueryState)) {
                /*
                 * queryDesc 中的快照包含创建游标的命令的命令标识。
                 * 我们将该快照复制到 RemoteQueryState 中,以便在向对应的远程节点发送 SELECT(来自获取的结果)时,
                 * 使用创建游标的命令的命令标识。
                 */
                RemoteQueryState* rqs = (RemoteQueryState*)queryDesc->planstate;

                // 获取 portal 中的缓存扫描描述符
                rqs->ss.ss_currentScanDesc = (TableScanDesc)portal->scanDesc;
                // 复制快照到扫描描述符
                portal->scanDesc->rs_snapshot = queryDesc->snapshot;
                rqs->cursor = (char*)portal->name;
            }
#endif

            ExecutorRun(queryDesc, direction, count);

            /*
             * 如果我们在 DWS CN 上,且 queryDesc->plannedstmt->has_obsrel 为真,则是 <<IS_PGXC_COORDINATOR && !StreamTopConsumerAmI()>>。
             */
            if (IS_PGXC_COORDINATOR && !StreamTopConsumerAmI() && queryDesc->plannedstmt->has_obsrel &&
                u_sess->instr_cxt.obs_instr) {
                u_sess->instr_cxt.obs_instr->insertData(queryDesc->plannedstmt->queryId);
            }

            nprocessed = queryDesc->estate->es_processed;
#ifdef ENABLE_MOT
            if (!(portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
#endif
                PopActiveSnapshot();
#ifdef ENABLE_MOT
            }
#endif
        }

        if (!ScanDirectionIsNoMovement(direction)) {
            long oldPos;

            if (nprocessed > 0)
                portal->atStart = false; /* 可以向后移动了 */
            if (count == 0 || (unsigned long)nprocessed < (unsigned long)count)
                portal->atEnd = true; /* 我们检索了所有行 */
            oldPos = portal->portalPos;
            portal->portalPos += nprocessed;
            /* 当我们离开末尾时,portalPos 不会前进 */
            if (portal->portalPos < oldPos)
                portal->posOverflow = true;
        }
    } else {
        if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
            ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Cursor can only scan forward")));

        if (portal->atStart || count <= 0)
            direction = NoMovementScanDirection;
        else
            direction = BackwardScanDirection;

        /* 在执行器中,零 count 处理所有行 */
        if (count == FETCH_ALL)
            count = 0;

        if (portal->holdStore)
            nprocessed = RunFromStore(portal, direction, count, dest);
        else {
#ifdef ENABLE_MOT
            if (!(portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
#endif
                PushActiveSnapshot(queryDesc->snapshot);
#ifdef ENABLE_MOT
            }
#endif
            ExecutorRun(queryDesc, direction, count);
            nprocessed = queryDesc->estate->es_processed;
#ifdef ENABLE_MOT
            if (!(portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
#endif
                PopActiveSnapshot();
#ifdef ENABLE_MOT
            }
#endif
        }

        if (!ScanDirectionIsNoMovement(direction)) {
            if (nprocessed > 0 && portal->atEnd) {
                portal->atEnd = false; /* 可以向前移动了 */
                portal->portalPos++;   /* 调整终点的情况 */
            }
            if (count == 0 || (unsigned long)nprocessed < (unsigned long)count) {
                portal->atStart = true; /* 我们检索了所有行 */
                portal->portalPos = 0;
                portal->posOverflow = false;
            } else {
                long oldPos;

                oldPos = portal->portalPos;
                portal->portalPos -= nprocessed;
                if (portal->portalPos > oldPos || portal->portalPos <= 0)
                    portal->posOverflow = true;
            }
        }
    }

    return nprocessed;
}

PortalRunMulti 函数

  PortalRunMulti 函数用于执行一个 portal 中包含的多个查询或者非查询语句(例如创建表、删除表等)。它通过循环处理 portal 中的每个查询或语句,并逐一执行它们。对于查询语句,它会为每个查询创建一个新的快照snapshot),然后执行查询,获取结果或执行相应的操作。对于非查询语句,它会执行相应的操作。总的来说,PortalRunMulti 函数的主要作用是将一个 portal 中的多个查询非查询语句执行完毕,并返回执行结果的状态或标签。如果有多个语句,它会逐一执行它们,确保每个语句都被执行。

函数入参解释如下

  1. Portal portal:要执行的Portal对象,它包含了待执行的查询计划参数命令标签等信息。
  2. bool isTopLevel:一个布尔值,表示当前执行是否在顶层。如果是顶层执行,则为true,否则为false。这通常用于确定是否应该设置新的活动快照。
  3. DestReceiver* dest:一个指向目标接收器Destination Receiver)的指针,用于接收查询结果的输出。这是执行查询时结果输出的目标。
  4. DestReceiver* altdest:一个指向备用目标接收器的指针,用于处理特殊情况下的结果输出,通常用于处理不可计划的语句。
  5. char* completionTag:一个指向字符数组的指针,用于存储命令完成标签。在执行查询后,该参数用于返回命令执行的结果标签

  函数源码如下:(路径:src/gausskernel/process/tcop/pquery.cpp

/*
 * PortalRunMulti
 *		Execute a portal's queries in the general case (multi queries
 *		or non-SELECT-like queries)
 */
static void PortalRunMulti(
    Portal portal, bool isTopLevel, DestReceiver* dest, DestReceiver* altdest, char* completionTag)
{
    bool active_snapshot_set = false; // 是否已经设置了活动快照
    ListCell* stmtlist_item = NULL;   // 声明一个列表项迭代器指针,用于遍历portal中的语句列表
    PGSTAT_INIT_TIME_RECORD();        // 初始化统计信息记录

#ifdef PGXC
    CombineTag combine;   // PGXC模式下,用于合并标签的结构体
    combine.cmdType = CMD_UNKNOWN;
    combine.data[0] = '\0';
#endif

    bool force_local_snapshot = false;

    if ((portal != NULL) && (portal->cplan != NULL)) {
        /* 将single_shard_stmt从portal的计划中复制到本地变量force_local_snapshot */
        force_local_snapshot = portal->cplan->single_shard_stmt;
    }

    /*
     * 如果目标是DestRemoteExecute,将其更改为DestNone。
     * 原因是客户端不会期望任何元组,事实上也无法知道它们是什么,
     * 因为当这种portal执行策略生效时,Describe没有提供RowDescription消息的机制。
     * 目前,这只会影响被重写规则添加到非SELECT查询中的SELECT命令:这些命令将被执行,
     * 但结果将被丢弃,除非使用"simple Query"协议。
     */
    if (dest->mydest == DestRemoteExecute)
        dest = None_Receiver;
    if (altdest->mydest == DestRemoteExecute)
        altdest = None_Receiver;

    /* SQL活动特性:处理CREATE TABLE AS语句的情况 */
    uint32 instrument_option = 0;
    if (IS_PGXC_COORDINATOR && u_sess->attr.attr_resource.resource_track_level == RESOURCE_TRACK_OPERATOR &&
        IS_STREAM && u_sess->attr.attr_resource.use_workload_manager &&
        t_thrd.wlm_cxt.collect_info->status != WLM_STATUS_RUNNING && IsSupportExplain(portal->commandTag) &&
        !u_sess->attr.attr_sql.enable_cluster_resize) {
        instrument_option |= INSTRUMENT_TIMER;
        instrument_option |= INSTRUMENT_BUFFERS;
    }

    /*
     * 循环处理从分析和重写生成的单个解析树中生成的各个查询。
     */
    foreach (stmtlist_item, portal->stmts) {
        Node* stmt = (Node*)lfirst(stmtlist_item); // 获取当前语句

#ifdef ENABLE_MOT
        bool isMOTTable = false;
        JitExec::JitContext* mot_jit_context = nullptr;
#endif

        /*
         * 如果在之前的命令中接收到了取消信号,则退出
         */
        CHECK_FOR_INTERRUPTS();

        if (IsA(stmt, PlannedStmt) && ((PlannedStmt*)stmt)->utilityStmt == NULL) {
            /*
             * 处理可计划查询。
             */
            PlannedStmt* pstmt = (PlannedStmt*)stmt;

            TRACE_POSTGRESQL_QUERY_EXECUTE_START(); // 开始追踪执行

            if (u_sess->attr.attr_common.log_executor_stats)
                ResetUsage(); // 重置执行统计信息

            PGSTAT_START_TIME_RECORD(); // 开始记录执行时间

            /*
             * 对于可计划查询,始终必须有一个快照,除非它是一个MOT查询。
             * 第一次执行时,创建一个新的快照;对于同一portal中的后续查询,只需更新快照的命令计数器。
             */
#ifdef ENABLE_MOT
            if ((portal->cplan != NULL && portal->cplan->storageEngineType == SE_TYPE_MOT)) {
                isMOTTable = true;
                mot_jit_context = portal->cplan->mot_jit_context;
            }
            if (!isMOTTable) {
#endif
                if (!active_snapshot_set) {
                    PushActiveSnapshot(GetTransactionSnapshot(force_local_snapshot));
                    active_snapshot_set = true;
                } else
                    UpdateActiveSnapshotCommandId();
#ifdef ENABLE_MOT
            }
#endif

            if (IS_PGXC_COORDINATOR || IS_SINGLE_NODE)
                pstmt->instrument_option = instrument_option;

            if (pstmt->canSetTag) {
                /* 语句可以设置标签字符串 */
#ifdef ENABLE_MOT
                ProcessQuery(pstmt, portal->sourceText, portal->portalParams,
                    isMOTTable, mot_jit_context, dest, completionTag);
#else
                ProcessQuery(pstmt, portal->sourceText, portal->portalParams, dest, completionTag);
#endif
#ifdef PGXC
                /* 用于处理INSERT的特殊情况 */
                if (IS_PGXC_COORDINATOR && pstmt->commandType == CMD_INSERT)
                    HandleCmdComplete(pstmt->commandType, &combine, completionTag, strlen(completionTag));
#endif
            } else {
                /* 由重写添加的语句无法设置标签 */
#ifdef ENABLE_MOT
                ProcessQuery(pstmt, portal->sourceText, portal->portalParams,
                    isMOTTable, mot_jit_context, altdest, NULL);
#else
                ProcessQuery(pstmt, portal->sourceText, portal->portalParams, altdest, NULL);
#endif
            }

            PGSTAT_END_TIME_RECORD(EXECUTION_TIME); // 结束执行时间记录

            if (u_sess->attr.attr_common.log_executor_stats)
                ShowUsage("EXECUTOR STATISTICS"); // 显示执行统计信息

            TRACE_POSTGRESQL_QUERY_EXECUTE_DONE(); // 结束追踪执行
        } else {
            /*
             * 处理实用程序函数(创建、销毁等)
             *
             * 假设这些实用程序函数可以设置标签,如果它们是portal中唯一的语句。
             *
             * 我们不能在此处为实用程序命令设置快照(如果需要快照,PortalRunUtility将会设置)。
             * 如果实用程序命令是portal中较长列表的一部分,那么唯一的情况是规则允许包含NotifyStmt。
             * NotifyStmt不关心是否有快照,所以如果有快照,我们就保留当前快照不变。
             */
            if (list_length(portal->stmts) == 1) {
                AssertEreport(!active_snapshot_set, MOD_EXECUTOR, "No active snapshot for utility commands");
                /* 语句可以设置标签字符串 */
                PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag);
            } else if (IsA(stmt, AlterTableStmt) || IsA(stmt, ViewStmt) || IsA(stmt, RuleStmt)) {
                AssertEreport(!active_snapshot_set, MOD_EXECUTOR, "No active snapshot for utility commands");
                /* 语句可以设置标签字符串 */
                PortalRunUtility(portal, stmt, isTopLevel, dest, NULL);
            } else {
                AssertEreport(IsA(stmt, NotifyStmt), MOD_EXECUTOR, "Not a NotifyStmt");
                /* 由重写添加的语句无法设置标签 */
                PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL);
            }
        }

        /*
         * 在查询之间递增命令计数器,但在最后一个查询之后不递增。
         */
        if (lnext(stmtlist_item) != NULL)
            CommandCounterIncrement();

        /*
         * 清除子上下文以释放临时内存。
         */
        AssertEreport(
            PortalGetHeapMemory(portal) == CurrentMemoryContext, MOD_EXECUTOR, "Memory context is not consistant");
        MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
    }

    /* 如果我们推送了快照,则弹出快照。 */
    if (active_snapshot_set)
        PopActiveSnapshot();

    /*
     * 如果提供了命令完成标签,则使用它。否则,使用portal的commandTag作为默认的完成标签。
     *
     * 例外情况:客户端期望INSERT/UPDATE/DELETE标签具有计数,因此我们用零来伪造它们。
     * 如果没有与原始命令相同类型的替代查询,则这可能会发生在DO INSTEAD规则中。
     * 这里打印"0 0"是因为从技术上讲,没有匹配标签类型的查询,如果一个更新了一行,则为其他查询类型打印非零计数似乎是错误的。
     * 例如,如果INSERT执行UPDATE,则如果更新了一行,INSERT将不会打印"0 1"。
     * 有关详细信息,请参阅QueryRewrite(),第3步。
     */
    errno_t errorno = EOK;
#ifdef PGXC
    if (IS_PGXC_COORDINATOR && completionTag != NULL && combine.data[0] != '\0') {
        errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, combine.data);
        securec_check(errorno, "\0", "\0");
    }
#endif

    if (completionTag != NULL && completionTag[0] == '\0') {
        if (portal->commandTag) {
            errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, portal->commandTag);
            securec_check(errorno, "\0", "\0");
        }
        if (strcmp(completionTag, "SELECT") == 0) {
            errorno = sprintf_s(completionTag, COMPLETION_TAG_BUFSIZE, "SELECT 0 0");
            securec_check_ss(errorno, "\0", "\0");
        } else if (strcmp(completionTag, "INSERT") == 0) {
            errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "INSERT 0 0");
            securec_check(errorno, "\0", "\0");
        } else if (strcmp(completionTag, "UPDATE") == 0) {
            errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "UPDATE 0");
            securec_check(errorno, "\0", "\0");
        } else if (strcmp(completionTag, "DELETE") == 0) {
            errorno = strcpy_s(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE 0");
            securec_check(errorno, "\0", "\0");
        }
    }
}

MarkPortalDone 函数

  MarkPortalDone函数的主要作用是将一个 PortalACTIVE 状态转换为 DONE 状态。并进行必要的清理工作,以确保资源得到正确释放。需要注意的是,不应该直接将portal->status 设置为 PORTAL_DONE ,而是应该调用这个函数来执行状态转换。这里我认为是因为将状态转换的逻辑封装在函数中有助于代码的可维护性。如果以后需要修改状态转换的方式或添加其他处理步骤,只需修改 MarkPortalDone 函数而不是在多个地方修改状态。这样可以降低出现错误或漏掉某些清理步骤的风险。

函数的主要步骤如下:

  1. 首先,函数会断言当前 Portal 的状态必须是 PORTAL_ACTIVE ,如果不是,就会触发断言失败,表示状态转换出现问题。
  2. 接着,函数会将 Portal 的状态从 PORTAL_ACTIVE 修改为 PORTAL_DONE ,完成了状态的转换。
  3. 最后,函数会调用 portalcmds.c 中的清理函数(如果存在),以便清理已知的 Portal 状态。这是因为在一些情况下,例如在已中止事务中执行 ROLLBACK 命令时,清理函数的执行可以防止在 AtCleanup_Portals 时出现断言失败。

  函数源码如下所示:(路径:src/common/backend/utils/mmgr/portalmem.cpp

/*
 * MarkPortalDone
 *		将一个 portal 从 ACTIVE 状态转换为 DONE 状态。
 *
 * 注意:永远不要直接设置 portal->status = PORTAL_DONE;应该调用这个函数来执行状态转换。
 */
void MarkPortalDone(Portal portal)
{
    /* 执行状态转换 */
    Assert(portal->status == PORTAL_ACTIVE);
    portal->status = PORTAL_DONE;

    /*
     * 允许 portalcmds.c 清理其了解的状态。我们现在可以执行这个操作,因为无法再执行 portal 了。
     *
     * 在某些情况下,例如在已中止事务中执行 ROLLBACK 命令,这可以防止由于在 AtCleanup_Portals 时清理钩子仍未执行而导致的断言失败。
     */
    if (PointerIsValid(portal->cleanup)) {
        (*portal->cleanup)(portal);
        portal->cleanup = NULL;
    }
}

http://www.niftyadmin.cn/n/5043780.html

相关文章

大数据Flink(八十七):DML:Joins之Regular Join

文章目录 DML:Joins之Regular Join DML:Joins之Regular Join Flink 也支持了非常多的数据 Join 方式,主要包括以下三种: 动态表(流)与动态表(流)的 Join动态表(流)与外部维表(比如 Redis)的 Join动态表字段的列转行(一种特殊的 Join)细分 Flink SQL 支持的

python 小案例87

下面是一个简单的动态网页案例&#xff0c;展示了如何创建一个基本的动态网页应用&#xff1a; 步骤1: 创建HTML文件&#xff08;index.html&#xff09; <!DOCTYPE html> <html> <head><title>动态网页案例</title><link rel"styleshee…

SkyWalking快速上手(八)——sleuth+zipkin和SkyWalking的区别

文章目录 使用Sleuth和Zipkin进行分布式链路追踪1. 介绍2. 准备工作3. 配置Sleuth和Zipkin4. 编写代码5. 查看链路追踪数据结论 Sleuth Zipkin vs SkyWalking目录介绍Sleuth ZipkinSleuthZipkin SkyWalking架构特点 对比结论 使用Sleuth和Zipkin进行分布式链路追踪 1. 介绍 …

PCIe设备的动态管理:移除与重新扫描

简介&#xff1a;在日常工作中&#xff0c;可能会遇到需要动态地管理PCIe设备的情况&#xff0c;例如&#xff0c;硬件调试、驱动程序开发或特定的系统维护任务。本文将详细介绍如何在Linux环境中&#xff0c;使用shell脚本实现PCIe设备的动态移除和重新扫描。 PCIe设备简介&a…

MyBatis友人帐之一对多和多对一处理

一、概念 多对一的理解&#xff1a; 多个学生对应一个老师 如果对于学生这边&#xff0c;就是一个多对一的现象&#xff0c;即从学生这边关联一个老师&#xff01; 1.1数据库设计 CREATE TABLE teacher ( id INT(10) NOT NULL, name VARCHAR(30) DEFAULT NULL, PRIMARY KE…

《从菜鸟到大师之路 ElasticSearch 篇》

《从菜鸟到大师之路 ElasticSearch 篇》 &#xff08;一&#xff09;&#xff1a;ElasticSearch 基础概念、生态和应用场景 为什么需要学习 ElasticSearch 根据 DB Engine 的排名显示&#xff0c; ElasticSearch 是最受欢迎的 企业级搜索引擎 。下图红色勾选的是我们前面的系…

基于开源模型的实时人脸识别系统(九):软件说明

续 人脸识别_CodingInCV的博客-CSDN博客 文章目录 前言简介模型选择的要求总体流程图人脸检测人脸跟踪人脸质量人脸关键点人脸识别代码结构人脸识别的逻辑高阶设置 前言 前面的文章我们介绍了整个系统里的关键步骤&#xff0c;基于这些步骤我们就可以搭建出属于自己的人脸识别…

窜货采买第三方怎么选择

窜货溯源服务听起来并不难&#xff0c;无非就是买货&#xff0c;但是否能买到货&#xff0c;同时在买到之后能否顺利完成溯源工作&#xff0c;也是非常有学问的&#xff0c;很多品牌会选择第三方服务商进行采买合作&#xff0c;这样可以规避品牌自己操作时的不合规性&#xff0…