一般网站的后台,海外制作网站,个人网站备案填写,淮北市重点工程建设局网站WindowAgg 窗口函数介绍WindowAgg理论层面源码层面WindowObjectData 结构体WindowStatePerFuncData 结构体WindowStatePerAggData 结构体eval_windowaggregates 函数update_frameheadpos 函数 声明#xff1a;本文的部分内容参考了他人的文章。在编写过程中#xff0c;我们尊… WindowAgg 窗口函数介绍WindowAgg理论层面源码层面WindowObjectData 结构体WindowStatePerFuncData 结构体WindowStatePerAggData 结构体eval_windowaggregates 函数update_frameheadpos 函数 声明本文的部分内容参考了他人的文章。在编写过程中我们尊重他人的知识产权和学术成果力求遵循合理使用原则并在适用的情况下注明引用来源。 本文主要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书 窗口函数介绍 首先我将提供一个简单的 SQL 用例并逐步解读窗口函数的使用过程。假设我们有一个名为 sales 的销售数据表表结构如下
CREATE TABLE sales (id SERIAL PRIMARY KEY,salesperson_id INT,sale_date DATE,sale_amount NUMERIC
);假设 sales 表包含以下数据
idsalesperson_idsale_datesale_amount112024-01-011000212024-01-021200322024-01-01800422024-01-021100532024-01-011500632024-01-021300
SQL 用例使用窗口函数计算每个销售人员的累计销售金额 我们希望计算每个销售人员在每个销售记录的日期上的累计销售金额。为了实现这一目标我们可以使用 SUM() 函数它会对每个销售人员的数据进行累计。 SQL 查询如下
SELECT id,salesperson_id,sale_date,sale_amount,SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM sales
ORDER BYsalesperson_id, sale_date;详细解读
SUM(sale_amount) 这是一个聚合函数通常用于对某个列的值进行汇总。在这个查询中SUM(sale_amount) 用于计算销售额的累计值。 OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) 这是一个窗口函数的关键部分指定了如何对结果进行分区、排序和聚合。具体来说 PARTITION BY salesperson_id这是窗口函数的分区操作将数据按 salesperson_id销售人员 ID分区。也就是说每个销售人员的数据将分别计算不同销售人员的累计销售是独立的。ORDER BY sale_date对每个分区内的数据按销售日期 (sale_date) 进行排序确保累计计算是按时间顺序进行的。ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW这是一个窗口帧的定义意味着每个分区的累计值从该分区的第一行开始计算一直到当前行。UNBOUNDED PRECEDING 表示从分区的第一行开始CURRENT ROW 表示包括当前行。 结果分析 查询结果将会返回每个销售人员的每笔销售记录并在 cumulative_sales 列显示该销售人员的累计销售金额。例如 idsalesperson_idsale_datesale_amountcumulative_sales112024-01-0110001000212024-01-0212002200322024-01-01800800422024-01-0211001900532024-01-0115001500632024-01-0213002800 对于销售人员 1第一个销售记录的累计销售金额为 1000第二个销售记录的累计销售金额为 1000 1200 2200。对于销售人员 2第一个销售记录的累计销售金额为 800第二个销售记录的累计销售金额为 800 1100 1900。对于销售人员 3第一个销售记录的累计销售金额为 1500第二个销售记录的累计销售金额为 1500 1300 2800。 窗口函数的工作机制
分区窗口函数首先会根据 PARTITION BY 子句将数据分成不同的分区。这里数据按 salesperson_id 分区每个销售人员的记录组成一个分区。排序在每个分区内数据会根据 ORDER BY 子句进行排序。在这个例子中按 sale_date 对每个销售人员的销售记录按时间顺序进行排序。累计ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 确保了每个销售人员从分区的第一行开始直到当前行的所有销售记录都会被累加形成一个累积的结果。
更多详细的窗口函数使用教程可以参阅GaussDB(DWS) SQL进阶之SQL操作之窗口函数
WindowAgg
理论层面 下面我们来了解一下 WindowAgg 算子先看看书中的描述 书中详细描述了 WindowAgg 节点在 PostgreSQL 中处理窗口函数时的执行过程包括如何管理分区、排序、聚合等。通过 WindowAggState 和相关的数据结构窗口聚合可以高效地计算多个窗口函数同时保持对数据的完整性。性能优化方面窗口函数的排序和缓存机制也起到了关键作用帮助提升计算效率。
源码层面
WindowObjectData 结构体 WindowObjectData 结构体用于在窗口函数调用过程中保存与窗口聚合操作相关的状态信息。在 PostgreSQL 中窗口函数用于基于窗口进行计算而每个窗口函数可能需要不同的上下文状态来处理其数据。源码如下所示路径src\backend\executor\nodeWindowAgg.c
/** 所有窗口函数的API都通过这个对象进行调用该对象会作为fcinfo-context传递给窗口函数。*/
typedef struct WindowObjectData
{NodeTag type; /* 类型标识符用于区分不同的节点类型 */WindowAggState *winstate; /* 指向父级窗口聚合状态的指针用于获取窗口聚合的上下文状态 */List *argstates; /* 窗口函数参数的表达式状态树 */void *localmem; /* 当前窗口函数在执行过程中使用的局部内存由WinGetPartitionLocalMemory分配 */int markptr; /* 用于标记当前窗口函数状态的tuplestore标记指针 */int readptr; /* 读取指针指向当前正在处理的行位置 */int64 markpos; /* 标记指针所指向的行号 */int64 seekpos; /* 读取指针所指向的行号 */
} WindowObjectData;WindowStatePerFuncData 结构体 WindowStatePerFuncData 结构体用于存储与窗口函数和窗口聚合操作相关的工作状态和数据。它包含了窗口函数执行时需要的各种信息如参数数量、排序规则、结果类型、是否为聚合函数等。这些信息对于在窗口函数计算过程中正确管理和执行窗口函数非常重要。在 PostgreSQL 中窗口函数的执行涉及多次状态保存和计算而这个结构体便用于管理这些窗口函数的具体执行细节。源码如下所示路径src\backend\executor\nodeWindowAgg.c
/** 为每个由该节点处理的窗口函数和窗口聚合创建一个 WindowStatePerFunc 结构体。*/
typedef struct WindowStatePerFuncData
{/* 链接到与此工作状态相关的 WindowFunc 表达式和状态节点 */WindowFuncExprState *wfuncstate; /* 当前窗口函数的表达式状态 */WindowFunc *wfunc; /* 当前窗口函数的定义结构体 */int numArguments; /* 窗口函数的参数数量 */FmgrInfo flinfo; /* 用于窗口函数的 fmgr 查找数据存储有关函数的信息 */Oid winCollation; /* 窗口函数的排序规则由当前函数派生 *//** 我们需要窗口函数结果的长度和 byval 信息以便知道如何复制/删除值。*/int16 resulttypeLen; /* 窗口函数返回值类型的长度 */bool resulttypeByVal; /* 窗口函数返回值类型是否为按值传递 */bool plain_agg; /* 是否仅为普通的聚合函数 */int aggno; /* 如果是指明其对应的 WindowStatePerAggData 的索引 */WindowObject winobj; /* 用于窗口函数 API 的对象 */
} WindowStatePerFuncData;WindowStatePerAggData 结构体 WindowStatePerAggData 结构体主要用于保存窗口聚合过程中普通聚合函数的工作状态。它包含了有关过渡函数、最终函数、初始值、当前帧的聚合结果、过渡值等详细信息。通过这些信息系统可以正确地计算窗口聚合函数的结果处理每个聚合操作的中间状态确保聚合计算按预期执行。此外该结构体还考虑了内存管理和函数调用的效率使得聚合操作在处理大数据量时能够高效执行。源码如下所示路径src\backend\executor\nodeWindowAgg.c
/** 对于普通的聚合窗口函数我们也有一个这样的结构体。*/
typedef struct WindowStatePerAggData
{/* 聚合函数的过渡函数 OID */Oid transfn_oid; /* 聚合函数的过渡函数的 OID */Oid invtransfn_oid; /* 反向过渡函数的 OID可能是 InvalidOid */Oid finalfn_oid; /* 最终函数的 OID可能是 InvalidOid *//** 聚合过渡函数的 fmgr 查找数据 --- 只有当对应的 OID 不为 InvalidOid 时才有效。* 特别注意函数的 fn_strict 标志在这里保存。*/FmgrInfo transfn; /* 聚合函数的过渡函数的 fmgr 查找数据 */FmgrInfo invtransfn; /* 反向过渡函数的 fmgr 查找数据 */FmgrInfo finalfn; /* 最终函数的 fmgr 查找数据 */int numFinalArgs; /* 传递给最终函数的参数个数 *//** 来自 pg_aggregate 入口的初始值*/Datum initValue; /* 初始值 */bool initValueIsNull; /* 初始值是否为 NULL *//** 当前帧边界的缓存值*/Datum resultValue; /* 当前计算帧的结果值 */bool resultValueIsNull; /* 结果值是否为 NULL *//** 需要输入、结果和过渡数据类型的长度和 byval 信息* 以便知道如何复制/删除值。*/int16 inputtypeLen, /* 输入类型的长度 */resulttypeLen, /* 结果类型的长度 */transtypeLen; /* 过渡数据类型的长度 */bool inputtypeByVal, /* 输入类型是否按值传递 */resulttypeByVal, /* 结果类型是否按值传递 */transtypeByVal; /* 过渡数据类型是否按值传递 */int wfuncno; /* 关联的 WindowStatePerFuncData 的索引 *//* 持有过渡值和可能的其他附加数据的上下文 */MemoryContext aggcontext; /* 聚合上下文可能是私有的或 winstate-aggcontext *//* 当前的过渡值 */Datum transValue; /* 当前过渡值 */bool transValueIsNull; /* 过渡值是否为 NULL */int64 transValueCount; /* 当前聚合的行数 *//* eval_windowaggregates() 函数中使用的数据 */bool restart; /* 是否需要在本轮聚合中重新启动此聚合 */
} WindowStatePerAggData;eval_windowaggregates 函数 eval_windowaggregates 函数主要用于窗口聚合的计算特别是普通聚合函数如 SUM()、COUNT() 等。它在处理窗口时根据窗口帧的位置和聚合的需求优化了聚合操作。在帧起始位置为 UNBOUNDED_PRECEDING 时采用增量计算策略在窗口帧发生变化时使用反向过渡函数或重新聚合数据。同时它通过复用已计算的结果来提高性能在需要时重启聚合并重置相应的状态。 此外它还管理了不同聚合函数的上下文确保在窗口帧的不同部分对每个聚合函数都进行正确的计算并在计算结束后保存结果。源码如下所示路径src\backend\executor\nodeWindowAgg.c
/** eval_windowaggregates* 评估作为窗口函数的普通聚合函数** 这与 nodeAgg.c 不同的地方在于首先如果窗口的帧开始位置发生变化我们使用反向过渡函数如果存在从过渡值中删除行。其次我们希望在将更多数据聚合到同一过渡值后可以多次调用聚合最终函数。这是 nodeAgg.c 中不要求的行为。*/
static void
eval_windowaggregates(WindowAggState *winstate)
{WindowStatePerAgg peraggstate; /* 用于存储每个聚合函数的状态 */int wfuncno, /* 窗口函数的索引 */numaggs, /* 聚合函数的数量 */numaggs_restart, /* 需要重启的聚合函数数量 */i; /* 循环变量 */int64 aggregatedupto_nonrestarted; /* 尚未聚合的行数 */MemoryContext oldContext; /* 内存上下文的备份 */ExprContext *econtext; /* 当前表达式上下文 */WindowObject agg_winobj; /* 窗口函数对象 */TupleTableSlot *agg_row_slot; /* 用于存储聚合数据的行槽 */TupleTableSlot *temp_slot; /* 临时槽用于存储中间结果 */numaggs winstate-numaggs; /* 获取窗口聚合函数的数量 */if (numaggs 0)return; /* 如果没有聚合函数直接返回 *//* 获取执行上下文 */econtext winstate-ss.ps.ps_ExprContext;agg_winobj winstate-agg_winobj;agg_row_slot winstate-agg_row_slot;temp_slot winstate-temp_slot_1;/** 如果窗口的帧起始位置为 UNBOUNDED_PRECEDING 且没有排除子句* 那么窗口帧由从分区开始处向前延伸的一组连续的行组成随着当前行向前推进行只进入帧内而不会退出帧。* 这样就可以使用增量策略来计算聚合值我们为每个加入帧的行运行过渡函数并在需要时运行最终函数来获取当前聚合值。* 这种方法比每次处理当前行时都重新运行整个聚合计算更高效。前提是假设最终函数不会破坏正在运行的过渡值这一点在 nodeAgg.c 中也有类似的假设。** 如果帧起始位置有时会移动我们仍然可以优化相邻的行尽可能使用增量聚合策略但如果帧头超出了上一个头我们将尝试使用反向过渡函数删除这些行。* 反向过渡函数会恢复聚合的当前状态仿佛被移除的行从未被聚合过。如果反向过渡函数无法删除该行或者根本没有反向过渡函数我们需要重新计算所有位于新帧边界内的元组的聚合结果。** 如果存在排除子句我们可能需要在一个不连续的行集上聚合因此需要重新计算每行的聚合。*//** 更新帧头位置** 窗口的帧头位置不应该向后移动如果发生这种情况代码将无法处理因此在安全起见我们会检查并报告错误。*/update_frameheadpos(winstate);if (winstate-frameheadpos winstate-aggregatedbase)elog(ERROR, window frame head moved backward);/** 如果帧没有变化我们可以重用之前保存的结果值。* 如果帧结束模式是 UNBOUNDED FOLLOWING 或 CURRENT ROW 且没有排除子句并且当前行位于前一行的帧内那么当前帧和前一帧的结束位置必须重合。* 这意味着我们可以复用结果值。*/if (winstate-aggregatedbase winstate-frameheadpos (winstate-frameOptions (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |FRAMEOPTION_END_CURRENT_ROW)) !(winstate-frameOptions FRAMEOPTION_EXCLUSION) winstate-aggregatedbase winstate-currentpos winstate-aggregatedupto winstate-currentpos){for (i 0; i numaggs; i){peraggstate winstate-peragg[i];wfuncno peraggstate-wfuncno;econtext-ecxt_aggvalues[wfuncno] peraggstate-resultValue;econtext-ecxt_aggnulls[wfuncno] peraggstate-resultValueIsNull;}return;}/* 初始化重启标志 */numaggs_restart 0;for (i 0; i numaggs; i){peraggstate winstate-peragg[i];/* 判断是否需要重启聚合函数 */if (winstate-currentpos 0 ||(winstate-aggregatedbase ! winstate-frameheadpos !OidIsValid(peraggstate-invtransfn_oid)) ||(winstate-frameOptions FRAMEOPTION_EXCLUSION) ||winstate-aggregatedupto winstate-frameheadpos){peraggstate-restart true;numaggs_restart;}elseperaggstate-restart false;}/** 如果有任何可能需要移动的聚合函数尝试通过删除从帧顶部掉落的输入行来将 aggregatedbase 向前推进。* 如果失败即 advance_windowaggregate_base 返回 false则需要重启聚合。*/while (numaggs_restart numaggs winstate-aggregatedbase winstate-frameheadpos){/** 获取要删除的元组。这应该永远不会失败因为我们应该已经处理过这些行。*/if (!window_gettupleslot(agg_winobj, winstate-aggregatedbase,temp_slot))elog(ERROR, could not re-fetch previously fetched frame row);/* 设置元组上下文用于计算聚合函数的参数 */winstate-tmpcontext-ecxt_outertuple temp_slot;/** 为每个聚合函数执行反向过渡除非该聚合已经标记为需要重启。*/for (i 0; i numaggs; i){bool ok;peraggstate winstate-peragg[i];if (peraggstate-restart)continue;wfuncno peraggstate-wfuncno;ok advance_windowaggregate_base(winstate,winstate-perfunc[wfuncno],peraggstate);if (!ok){/* 如果反向过渡函数失败则需要重启聚合 */peraggstate-restart true;numaggs_restart;}}/* 重置每个输入元组的上下文 */ResetExprContext(winstate-tmpcontext);/* 进展到下一个聚合行 */winstate-aggregatedbase;ExecClearTuple(temp_slot);}/** 如果我们成功推进了所有聚合的基准行aggregatedbase 现在应该等于 frameheadpos* 如果失败了我们必须强制更新 aggregatedbase。*/winstate-aggregatedbase winstate-frameheadpos;/** 如果为聚合函数创建了标记指针则将其推进到帧头以便 tuplestore 可以丢弃不必要的行。*/if (agg_winobj-markptr 0)WinSetMarkPosition(agg_winobj, winstate-frameheadpos);/** 现在重启需要重启的聚合函数。** 如果任何聚合函数需要重启我们假设使用共享上下文的聚合函数也需要重启* 并且在这种情况下我们会清理共享的 aggcontext。*/if (numaggs_restart 0)MemoryContextResetAndDeleteChildren(winstate-aggcontext);for (i 0; i numaggs; i){peraggstate winstate-peragg[i];/* 如果共享上下文的聚合函数需要重启则重启所有需要重启的聚合 */Assert(peraggstate-aggcontext ! winstate-aggcontext ||numaggs_restart 0 ||peraggstate-restart);if (peraggstate-restart){wfuncno peraggstate-wfuncno;initialize_windowaggregate(winstate,winstate-perfunc[wfuncno],peraggstate);}else if (!peraggstate-resultValueIsNull){if (!peraggstate-resulttypeByVal)pfree(DatumGetPointer(peraggstate-resultValue));peraggstate-resultValue (Datum) 0;peraggstate-resultValueIsNull true;}}/** 非重启的聚合现在包含 aggregatedbase 和 aggregatedupto 之间的行* 而重启的聚合不包含任何行。如果有重启的聚合我们必须从 frameheadpos 开始重新聚合* 否则可以从 aggregatedupto 开始继续聚合。*/aggregatedupto_nonrestarted winstate-aggregatedupto;if (numaggs_restart 0 winstate-aggregatedupto ! winstate-frameheadpos){winstate-aggregatedupto winstate-frameheadpos;ExecClearTuple(agg_row_slot);}/** 继续聚合直到遇到帧外的行或分区结束。*/for (;;){int ret;/* 如果没有获取行获取下一行 */if (TupIsNull(agg_row_slot)){if (!window_gettupleslot(agg_winobj, winstate-aggregatedupto,agg_row_slot))break; /* 到达分区结束 */}/** 如果当前行不在帧内跳过聚合。*/ret row_is_in_frame(winstate, winstate-aggregatedupto, agg_row_slot);if (ret 0)break;if (ret 0)goto next_tuple;/* 设置元组上下文 */winstate-tmpcontext-ecxt_outertuple agg_row_slot;/* 将行累加到聚合中 */for (i 0; i numaggs; i){peraggstate winstate-peragg[i];/* 跳过未重启的聚合 */if (!peraggstate-restart winstate-aggregatedupto aggregatedupto_nonrestarted)continue;wfuncno peraggstate-wfuncno;advance_windowaggregate(winstate,winstate-perfunc[wfuncno],peraggstate);}next_tuple:/* 重置每个输入元组的上下文 */ResetExprContext(winstate-tmpcontext);/* 进展到下一个聚合行 */winstate-aggregatedupto;ExecClearTuple(agg_row_slot);}/* 确保帧的结束位置不会向后移动 */Assert(aggregatedupto_nonrestarted winstate-aggregatedupto);/** 最终化聚合并填充结果和空值字段*/for (i 0; i numaggs; i){Datum *result;bool *isnull;peraggstate winstate-peragg[i];wfuncno peraggstate-wfuncno;result econtext-ecxt_aggvalues[wfuncno];isnull econtext-ecxt_aggnulls[wfuncno];finalize_windowaggregate(winstate,winstate-perfunc[wfuncno],peraggstate,result, isnull);/** 如果下一个行共享同一帧保存结果值*/if (!peraggstate-resulttypeByVal !*isnull){oldContext MemoryContextSwitchTo(peraggstate-aggcontext);peraggstate-resultValue datumCopy(*result,peraggstate-resulttypeByVal,peraggstate-resulttypeLen);MemoryContextSwitchTo(oldContext);}else{peraggstate-resultValue *result;}peraggstate-resultValueIsNull *isnull;}
}让我们通过一个具体的例子来分析 eval_windowaggregates 函数的每一步操作。假设我们有一个销售数据表 sales包含以下数据
salesperson_idsale_datesale_amount12024-01-0110012024-01-0220012024-01-0330022024-01-0115022024-01-0225022024-01-03350 假设我们希望计算每个销售人员的累计销售额并且使用的是窗口聚合函数按日期顺序ORDER BY sale_date来计算累计销售额。我们的窗口框架将从 UNBOUNDED PRECEDING 开始直到当前行结束。
1. 初始化和设置 在开始时窗口函数会为每个聚合函数在这个例子中是 SUM(sale_amount)创建一个 WindowStatePerAggData 结构体来保存当前的聚合状态。假设我们有两个销售人员的销售数据。对于每个销售人员eval_windowaggregates 将会处理每个销售记录维护其当前的聚合状态。 初始化numaggs 1因为只有一个聚合函数 SUM(sale_amount)。aggregatedbase 和 aggregatedupto 变量分别用于跟踪当前已聚合和尚未聚合的行。 2. 更新帧头位置 在窗口聚合中frameheadpos 表示窗口帧的起始位置。update_frameheadpos 会根据窗口的当前状态更新这一位置。例如假设当前处理的销售人员是销售员 1并且当前销售记录是 2024-01-03。 帧头位置更新frameheadpos 会根据查询的 PARTITION BY 和 ORDER BY 规则进行调整。这里frameheadpos 会指向销售员 1 在 2024-01-03 的行。 3. 优化增量计算 如果当前的窗口帧没有发生变化我们就可以复用之前保存的聚合结果而不必重新计算。例如在销售员 1 的数据中假设前两天2024-01-01 和 2024-01-02已经聚合完成。 复用结果假设当前帧的结束位置是 2024-01-03且没有排除子句EXCLUSION那么程序会检查窗口帧是否变化。如果没有变化即当前行仍然在上一帧内则复用先前的聚合结果。 4. 处理帧的变化 如果窗口帧的头位置发生变化我们需要做以下几步 检查是否需要重启聚合如果帧的头移动或者窗口的范围发生变化例如加入了 EXCLUSION 子句我们就需要重新聚合数据。eval_windowaggregates 会为每个聚合函数设置重启标志。更新聚合函数的状态在此过程中advance_windowaggregate_base 函数会根据新的帧头位置和数据调整聚合的基准状态aggregatedbase。 例如如果帧的起始位置从 2024-01-01 移动到 2024-01-02eval_windowaggregates 将使用反向过渡函数invtransfn删除帧头之前的行。 5. 重新聚合数据 如果 advance_windowaggregate_base 无法成功移动聚合的基准行即删除掉帧头之前的行或者没有反向过渡函数系统就会重新开始聚合。例如在 2024-01-02 之后的帧头位置可能需要从新的帧开始重新计算聚合结果。 重启聚合如果需要重启聚合例如因为反向过渡失败restart 标志会被设置为 true然后聚合函数的状态会被重新初始化。 6. 计算新行的聚合结果 如果当前的聚合状态已经准备好且没有出现需要重启的情况eval_windowaggregates 会开始将新的一行数据添加到聚合中。 逐行聚合每次计算新的聚合值时advance_windowaggregate 函数会根据当前行的数据更新聚合结果。例如在 2024-01-03销售员 1 的累计销售额将是 100 200 300 600。 7. 最终化聚合结果 当所有的行都被处理完后finalize_windowaggregate 会被调用来计算窗口聚合的最终结果。例如计算销售员 1 和销售员 2 的最终累计销售额。 保存和返回结果最终eval_windowaggregates 会保存每个聚合函数的结果并更新相应的输出字段。如果存在共享上下文即多个聚合函数使用同一个上下文它会进行清理以确保没有内存泄漏。 8. 返回结果 函数会返回每个窗口聚合函数的在这里插入代码片最终结果在每一行的输出中返回正确的累计销售额。 示例执行 假设我们在销售员 1 上执行上述操作
初始时销售员 1 在 2024-01-01 的销售额为 100聚合值为 100。
接着销售员 1 在 2024-01-02 的销售额为 200聚合值为 100 200 300。
最后在 2024-01-03销售员 1 的销售额为 300最终累计值为 100 200 300 600。update_frameheadpos 函数 update_frameheadpos 函数的主要功能是更新窗口聚合的帧头位置 frameheadpos确保其对于当前行有效。帧头的位置是窗口聚合计算的关键因为它决定了每个窗口函数计算时所依据的数据范围。下面是详细的逐行注释和对每个步骤的解释。路径src\backend\executor\nodeWindowAgg.c
/** update_frameheadpos* 使 frameheadpos 对当前行有效** 注意frameheadpos 计算时不考虑任何窗口排除子句当前行和/或其同组行即使在后续需要被排除时也会被视为帧的一部分。** 可能会覆盖 winstate-temp_slot_2。*/
static void
update_frameheadpos(WindowAggState *winstate)
{WindowAgg *node (WindowAgg *) winstate-ss.ps.plan; /* 获取窗口聚合节点 */int frameOptions winstate-frameOptions; /* 获取当前的帧选项 */MemoryContext oldcontext; /* 保存当前的内存上下文 *//* 如果帧头已经有效则不需要更新直接返回 */if (winstate-framehead_valid)return;/* 可能会在短生命周期的上下文中被调用因此切换到合适的内存上下文 */oldcontext MemoryContextSwitchTo(winstate-ss.ps.ps_ExprContext-ecxt_per_query_memory);/* 根据帧的起始选项来计算帧头 */if (frameOptions FRAMEOPTION_START_UNBOUNDED_PRECEDING){/* 在 UNBOUNDED PRECEDING 模式下帧头始终是分区的第一行 */winstate-frameheadpos 0;winstate-framehead_valid true;}else if (frameOptions FRAMEOPTION_START_CURRENT_ROW){/* 如果是 CURRENT ROW 模式根据排序模式计算帧头 */if (frameOptions FRAMEOPTION_ROWS){/* 在 ROWS 模式下帧头与当前行相同 */winstate-frameheadpos winstate-currentpos;winstate-framehead_valid true;}else if (frameOptions (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)){/* 如果没有 ORDER BY所有行是同行的 */if (node-ordNumCols 0){winstate-frameheadpos 0;winstate-framehead_valid true;MemoryContextSwitchTo(oldcontext);return;}/** 在 RANGE 或 GROUPS START_CURRENT_ROW 模式下帧头是当前行的同组中的第一行。* 我们保持帧头的最后已知位置并根据需要前进。*/tuplestore_select_read_pointer(winstate-buffer, winstate-framehead_ptr);if (winstate-frameheadpos 0 TupIsNull(winstate-framehead_slot)){/* 如果尚未获取第一行则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))elog(ERROR, unexpected end of tuplestore);}/* 检查当前行是否是正确的帧头 */while (!TupIsNull(winstate-framehead_slot)){if (are_peers(winstate, winstate-framehead_slot, winstate-ss.ss_ScanTupleSlot))break; /* 该行是正确的帧头 *//* 即使获取失败仍然推进帧头位置 */winstate-frameheadpos;spool_tuples(winstate, winstate-frameheadpos);if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))break; /* 到达分区末尾 */}winstate-framehead_valid true;}elseAssert(false); /* 如果既不是 RANGE 也不是 GROUPS应该抛出异常 */}else if (frameOptions FRAMEOPTION_START_OFFSET){/* 在 OFFSET 模式下帧头相对于当前行的位置是通过偏移量来决定的 */if (frameOptions FRAMEOPTION_ROWS){int64 offset DatumGetInt64(winstate-startOffsetValue);if (frameOptions FRAMEOPTION_START_OFFSET_PRECEDING)offset -offset; /* 如果是 PRECEDING则是负偏移量 */winstate-frameheadpos winstate-currentpos offset;/* 帧头不能小于第一行 */if (winstate-frameheadpos 0)winstate-frameheadpos 0;/* 确保帧头不超出分区末尾 */else if (winstate-frameheadpos winstate-currentpos 1){spool_tuples(winstate, winstate-frameheadpos - 1);if (winstate-frameheadpos winstate-spooled_rows)winstate-frameheadpos winstate-spooled_rows;}winstate-framehead_valid true;}else if (frameOptions FRAMEOPTION_RANGE){/** 在 RANGE START_OFFSET 模式下帧头是满足范围约束的第一行。* 我们保持帧头的最后已知位置并根据需要推进。*/int sortCol node-ordColIdx[0];bool sub, less;/* 确保有排序列 */Assert(node-ordNumCols 1);/* 计算用于范围检查的标志 */if (frameOptions FRAMEOPTION_START_OFFSET_PRECEDING)sub true;elsesub false;less false; /* 通常帧头应满足 sum */if (!winstate-inRangeAsc){sub !sub;less true;}tuplestore_select_read_pointer(winstate-buffer, winstate-framehead_ptr);if (winstate-frameheadpos 0 TupIsNull(winstate-framehead_slot)){/* 如果尚未获取第一行则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))elog(ERROR, unexpected end of tuplestore);}/* 逐行检查直到找到满足范围条件的帧头行 */while (!TupIsNull(winstate-framehead_slot)){Datum headval, currval;bool headisnull, currisnull;headval slot_getattr(winstate-framehead_slot, sortCol, headisnull);currval slot_getattr(winstate-ss.ss_ScanTupleSlot, sortCol, currisnull);if (headisnull || currisnull){/* 如果其中一行的值为 NULL按照 nulls_first 设置推进帧头 */if (winstate-inRangeNullsFirst){if (!headisnull || currisnull)break;}else{if (headisnull || !currisnull)break;}}else{if (DatumGetBool(FunctionCall5Coll(winstate-startInRangeFunc,winstate-inRangeColl,headval,currval,winstate-startOffsetValue,BoolGetDatum(sub),BoolGetDatum(less))))break; /* 该行是正确的帧头 */}/* 即使获取失败仍然推进帧头位置 */winstate-frameheadpos;spool_tuples(winstate, winstate-frameheadpos);if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))break; /* 到达分区末尾 */}winstate-framehead_valid true;}else if (frameOptions FRAMEOPTION_GROUPS){/** 在 GROUPS START_OFFSET 模式下帧头是满足偏移量约束的第一组的第一行。*/int64 offset DatumGetInt64(winstate-startOffsetValue);int64 minheadgroup;if (frameOptions FRAMEOPTION_START_OFFSET_PRECEDING)minheadgroup winstate-currentgroup - offset;elseminheadgroup winstate-currentgroup offset;tuplestore_select_read_pointer(winstate-buffer, winstate-framehead_ptr);if (winstate-frameheadpos 0 TupIsNull(winstate-framehead_slot)){/* 如果尚未获取第一行则将其获取到 framehead_slot */if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))elog(ERROR, unexpected end of tuplestore);}/* 逐组推进帧头 */while (!TupIsNull(winstate-framehead_slot)){if (winstate-frameheadgroup minheadgroup)break; /* 找到满足条件的帧头行 */ExecCopySlot(winstate-temp_slot_2, winstate-framehead_slot);winstate-frameheadpos;spool_tuples(winstate, winstate-frameheadpos);if (!tuplestore_gettupleslot(winstate-buffer, true, true, winstate-framehead_slot))break; /* 到达分区末尾 */if (!are_peers(winstate, winstate-temp_slot_2, winstate-framehead_slot))winstate-frameheadgroup;}ExecClearTuple(winstate-temp_slot_2);winstate-framehead_valid true;}elseAssert(false);}elseAssert(false);/* 恢复原内存上下文 */MemoryContextSwitchTo(oldcontext);
}依旧通过一个具体的例子来分析该函数的具体执行过程案例参考函数eval_windowaggregates。 案例背景 我们希望计算每个销售员的累计销售额。使用窗口函数 SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)即每个销售员的累计销售额是从该销售员的第一个销售日期开始到当前行的销售额的累积。 SQL 查询
SELECT salesperson_id, sale_date, sale_amount,SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
FROM sales;这个查询会根据 sale_date 排序每个销售员的数据并为每一行计算累计销售额。为了计算窗口函数update_frameheadpos 会在内部被调用来更新每个窗口的帧头位置。
详细步骤和代码说明 假设我们正在处理销售员 1 的数据查询的当前行是 2024-01-02。
第一步更新帧头位置 当函数 update_frameheadpos 被调用时它的作用是更新 frameheadpos即计算当前帧的起始位置。帧头位置决定了窗口函数计算时应包括哪些行。
1. 检查是否已经计算了帧头位置
if (winstate-framehead_valid)return; /* 如果帧头已经有效直接返回 */如果帧头已经计算过了就跳过计算避免重复计算。
2. 切换到合适的内存上下文
oldcontext MemoryContextSwitchTo(winstate-ss.ps.ps_ExprContext-ecxt_per_query_memory);这里我们切换到合适的内存上下文以确保计算不会泄漏内存。
3. 计算帧头位置 接下来根据帧的选项 (frameOptions)我们来决定帧头的位置。
如果是 UNBOUNDED PRECEDING
if (frameOptions FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{winstate-frameheadpos 0;winstate-framehead_valid true;
}这里UNBOUNDED PRECEDING 表示帧从分区的第一行开始。因此帧头位置就是 0即第一行。
如果是 CURRENT ROW
else if (frameOptions FRAMEOPTION_START_CURRENT_ROW)
{if (frameOptions FRAMEOPTION_ROWS){winstate-frameheadpos winstate-currentpos;winstate-framehead_valid true;}
}如果是 CURRENT ROW那么帧头就是当前行的位置。在我们的例子中假设当前行是 2024-01-02frameheadpos 就是当前行的位置。
第二步处理 RANGE 或 GROUPS 模式 如果窗口定义了 RANGE 或 GROUPS我们需要根据排序规则找到当前行所在的组并确定该组的第一行作为帧头。
4. 如果没有排序列ORDER BY
if (node-ordNumCols 0)
{winstate-frameheadpos 0;winstate-framehead_valid true;MemoryContextSwitchTo(oldcontext);return;
}如果没有定义排序列那么所有行被认为是同一组帧头位置就是 0即分区的第一行。
5. 如果有排序列 如果有排序列我们会根据当前行的值和分区内其他行的值找到与当前行同组的第一行作为帧头。例如如果是 2024-01-02 的数据程序会查找销售员 1 中销售额最早的那一行即 2024-01-01。
5. 查找同组的第一行
while (!TupIsNull(winstate-framehead_slot))
{if (are_peers(winstate, winstate-framehead_slot, winstate-ss.ss_ScanTupleSlot))break; /* 找到当前行同组的第一行作为帧头 */winstate-frameheadpos;spool_tuples(winstate, winstate-frameheadpos);
}这里我们通过检查每一行是否与当前行同组are_peers 函数找到属于同组的第一行作为帧头。
第三步更新帧头位置和返回
7. 设置帧头有效
winstate-framehead_valid true;一旦计算出帧头位置就将 framehead_valid 设置为 true表示帧头计算完成。
8. 恢复内存上下文
MemoryContextSwitchTo(oldcontext);最后恢复之前的内存上下文确保内存管理的正确性。
具体例子
假设当前行是 2024-01-02销售员 1。
查询的窗口帧使用的是 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW。
第一步frameheadpos 将被设置为 0即从 2024-01-01 开始。
第二步在 RANGE 模式下程序检查是否有排序列并找到销售员 1 在 2024-01-01 的销售额作为帧头。
第三步最终帧头位置 frameheadpos 被设置为 0并且标记为有效。因此当前行的累计销售额将从 2024-01-01 到 2024-01-02依此类推。 窗口模式通过不同的帧定义方式影响了窗口函数的计算范围从而决定了聚合计算的结果。 UNBOUNDED PRECEDING帧从分区的第一行开始适用于计算从分区开始到当前行的累计值。CURRENT ROW帧仅包含当前行适用于每行单独计算如排名。RANGE帧的起始位置是当前行所在同组的第一行适用于基于排序的聚合如销售排名。OFFSET帧的起始位置是当前行位置的偏移适用于计算行之间的偏移聚合。GROUPS帧的起始位置是当前行所在组的第一行适用于按组聚合。