本篇内容介绍了“怎么使用PostgreSQL ExecAgg函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体
/* ---------------------
* AggState information
*
* ss.ss_ScanTupleSlot refers to output of underlying plan.
* ss.ss_ScanTupleSlot指的是基础计划的输出.
* (ss = ScanState,ps = PlanState)
*
* Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and
* ecxt_aggnulls arrays, which hold the computed agg values for the current
* input group during evaluation of an Agg node's output tuple(s). We
* create a second ExprContext, tmpcontext, in which to evaluate input
* expressions and run the aggregate transition functions.
* 注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组,
* 这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值.
* ---------------------
*/
/* these structs are private in nodeAgg.c: */
//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
//第一个字段是NodeTag(继承自ScanState)
ScanState ss; /* its first field is NodeTag */
//targetlist和quals中所有的Aggref
List *aggs; /* all Aggref nodes in targetlist & quals */
//链表的大小(可以为0)
int numaggs; /* length of list (could be zero!) */
//pertrans条目大小
int numtrans; /* number of pertrans items */
//Agg策略模式
AggStrategy aggstrategy; /* strategy mode */
//agg-splitting模式,参见nodes.h
AggSplit aggsplit; /* agg-splitting mode, see nodes.h */
//指向当前步骤数据的指针
AggStatePerPhase phase; /* pointer to current phase data */
//步骤数(包括0)
int numphases; /* number of phases (including phase 0) */
//当前步骤
int current_phase; /* current phase number */
//per-Aggref信息
AggStatePerAgg peragg; /* per-Aggref information */
//per-Trans状态信息
AggStatePerTrans pertrans; /* per-Trans state information */
//长生命周期数据的ExprContexts(hashtable)
ExprContext *hashcontext; /* econtexts for long-lived data (hashtable) */
////长生命周期数据的ExprContexts(每一个GS使用)
ExprContext **aggcontexts; /* econtexts for long-lived data (per GS) */
//输入表达式的ExprContext
ExprContext *tmpcontext; /* econtext for input expressions */
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
//当前活跃的aggcontext
ExprContext *curaggcontext; /* currently active aggcontext */
//当前活跃的aggregate(如存在)
AggStatePerAgg curperagg; /* currently active aggregate, if any */
#define FIELDNO_AGGSTATE_CURPERTRANS 16
//当前活跃的trans state
AggStatePerTrans curpertrans; /* currently active trans state, if any */
//输入结束?
bool input_done; /* indicates end of input */
//Agg扫描结束?
bool agg_done; /* indicates completion of Agg scan */
//最后一个grouping set
int projected_set; /* The last projected grouping set */
#define FIELDNO_AGGSTATE_CURRENT_SET 20
//将要解析的当前grouping set
int current_set; /* The current grouping set being evaluated */
//当前投影操作的分组列
Bitmapset *grouped_cols; /* grouped cols in current projection */
//倒序的分组列链表
List *all_grouped_cols; /* list of all grouped cols in DESC order */
/* These fields are for grouping set phase data */
//-------- 下面的列用于grouping set步骤数据
//所有步骤中最大的sets大小
int maxsets; /* The max number of sets in any phase */
//所有步骤的数组
AggStatePerPhase phases; /* array of all phases */
//对于phases > 1,已排序的输入信息
Tuplesortstate *sort_in; /* sorted input to phases > 1 */
//对于下一个步骤,输入已拷贝
Tuplesortstate *sort_out; /* input is copied here for next phase */
//排序结果的slot
TupleTableSlot *sort_slot; /* slot for sort results */
/* these fields are used in AGG_PLAIN and AGG_SORTED modes: */
//------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
//per-group指针的grouping set编号数组
AggStatePerGroup *pergroups; /* grouping set indexed array of per-group
* pointers */
//当前组的第一个元组拷贝
HeapTuple grp_firstTuple; /* copy of first tuple of current group */
/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
//--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
//是否已填充hash表?
bool table_filled; /* hash table filled yet? */
//hash桶数?
int num_hashes;
//相应的哈希表数据数组
AggStatePerHash perhash; /* array of per-hashtable data */
//per-group指针的grouping set编号数组
AggStatePerGroup *hash_pergroup; /* grouping set indexed array of
* per-group pointers */
/* support for evaluation of agg input expressions: */
//---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
//首先是->pergroups,然后是hash_pergroup
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
* ->hash_pergroup */
//投影实现机制
ProjectionInfo *combinedproj; /* projection machinery */
} AggState;
/* Primitive options supported by nodeAgg.c: */
//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE 0x01 /* substitute combinefn for transfn */
#define AGGSPLITOP_SKIPFINAL 0x02 /* skip finalfn, return state as-is */
#define AGGSPLITOP_SERIALIZE 0x04 /* apply serializefn to output */
#define AGGSPLITOP_DESERIALIZE 0x08 /* apply deserializefn to input */
/* Supported operating modes (i.e., useful combinations of these options): */
//支持的操作模式
typedef enum AggSplit
{
/* Basic, non-split aggregation: */
//基本 : 非split聚合
AGGSPLIT_SIMPLE = 0,
/* Initial phase of partial aggregation, with serialization: */
//部分聚合的初始步骤,序列化
AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
/* Final phase of partial aggregation, with deserialization: */
//部分聚合的最终步骤,反序列化
AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
/* Test whether an AggSplit value selects each primitive option: */
//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
ExecAgg函数,首先获取AggState运行状态,然后根据各个阶段(aggstate->phase)的策略(aggstrategy)执行相应的逻辑.如使用Hash聚合,则只有一个节点,但有两个策略,首先是AGG_HASHED,该策略对输入元组按照分组列值进行Hash,同时执行转换函数计算中间结果值,缓存到哈希表中;然后执行AGG_MIXED策略,从Hash表中获取结果元组并返回结果元组(每一result为一个结果行).
/*
* ExecAgg -
*
* ExecAgg receives tuples from its outer subplan and aggregates over
* the appropriate attribute for each aggregate function use (Aggref
* node) appearing in the targetlist or qual of the node. The number
* of tuples to aggregate over depends on whether grouped or plain
* aggregation is selected. In grouped aggregation, we produce a result
* row for each group; in plain aggregation there's a single result row
* for the whole query. In either case, the value of each aggregate is
* stored in the expression context to be used when ExecProject evaluates
* the result tuple.
* ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合.
* 需要聚合的元组数量依赖于是否已分组或者选择普通聚合.
* 在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行.
* 不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组)
*/
static TupleTableSlot *
ExecAgg(PlanState *pstate)
{
AggState *node = castNode(AggState, pstate);
TupleTableSlot *result = NULL;
CHECK_FOR_INTERRUPTS();
if (!node->agg_done)
{
/* Dispatch based on strategy */
//基于策略进行分发
switch (node->phase->aggstrategy)
{
case AGG_HASHED:
if (!node->table_filled)
agg_fill_hash_table(node);
/* FALLTHROUGH */
//填充后,执行MIXED
case AGG_MIXED:
result = agg_retrieve_hash_table(node);
break;
case AGG_PLAIN:
case AGG_SORTED:
result = agg_retrieve_direct(node);
break;
}
if (!TupIsNull(result))
return result;
}
return NULL;
}
agg_fill_hash_table
读取输入并构建哈希表.
lookup_hash_entries函数根据输入元组构建分组列哈希表(搜索或新建条目),advance_aggregates调用转换函数计算中间结果并缓存.
/*
* ExecAgg for hashed case: read input and build hash table
* 读取输入并构建哈希表
*/
static void
agg_fill_hash_table(AggState *aggstate)
{
TupleTableSlot *outerslot;
ExprContext *tmpcontext = aggstate->tmpcontext;
/*
* Process each outer-plan tuple, and then fetch the next one, until we
* exhaust the outer plan.
* 处理每一个outer-plan返回的元组,然后继续提取下一个,直至完成所有元组的处理.
*/
for (;;)
{
//--------- 循环直至完成所有元组的处理
//提取输入的元组
outerslot = fetch_input_tuple(aggstate);
if (TupIsNull(outerslot))
break;//已完成处理,退出循环
/* set up for lookup_hash_entries and advance_aggregates */
//配置lookup_hash_entries和advance_aggregates函数
//把元组放在临时内存上下文中
tmpcontext->ecxt_outertuple = outerslot;
/* Find or build hashtable entries */
//检索或构建哈希表条目
lookup_hash_entries(aggstate);
/* Advance the aggregates (or combine functions) */
//推动聚合(或组合函数)
advance_aggregates(aggstate);
/*
* Reset per-input-tuple context after each tuple, but note that the
* hash lookups do this too
* 重置per-input-tuple内存上下文,但需要注意hash检索也会做这个事情
*/
ResetExprContext(aggstate->tmpcontext);
}
aggstate->table_filled = true;
/* Initialize to walk the first hash table */
//初始化用于遍历第一个哈希表
select_current_set(aggstate, 0, true);
ResetTupleHashIterator(aggstate->perhash[0].hashtable,
&aggstate->perhash[0].hashiter);
}
agg_retrieve_hash_table
agg_retrieve_hash_table函数在hash表中检索结果,执行投影等相关操作.
/*
* ExecAgg for hashed case: retrieving groups from hash table
* ExecAgg(Hash实现版本):在hash表中检索组
*/
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
ExprContext *econtext;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
TupleHashEntryData *entry;
TupleTableSlot *firstSlot;
TupleTableSlot *result;
AggStatePerHash perhash;
/*
* get state info from node.
* 从node节点中获取状态信息.
*
* econtext is the per-output-tuple expression context.
* econtext是per-output-tuple表达式上下文.
*/
econtext = aggstate->ss.ps.ps_ExprContext;
peragg = aggstate->peragg;
firstSlot = aggstate->ss.ss_ScanTupleSlot;
/*
* Note that perhash (and therefore anything accessed through it) can
* change inside the loop, as we change between grouping sets.
* 注意,在分组之间切换时,perhash在循环中可能会改变
*/
perhash = &aggstate->perhash[aggstate->current_set];
/*
* We loop retrieving groups until we find one satisfying
* aggstate->ss.ps.qual
* 循环检索groups,直至检索到一个符合aggstate->ss.ps.qual条件的组.
*/
while (!aggstate->agg_done)
{
//------------- 选好
//获取Slot
TupleTableSlot *hashslot = perhash->hashslot;
int i;
//检查中断
CHECK_FOR_INTERRUPTS();
/*
* Find the next entry in the hash table
* 检索hash表的下一个条目
*/
entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
if (entry == NULL)
{
//条目为NULL,切换到下一个set
int nextset = aggstate->current_set + 1;
if (nextset < aggstate->num_hashes)
{
/*
* Switch to next grouping set, reinitialize, and restart the
* loop.
* 切换至下一个grouping set,重新初始化并重启循环
*/
select_current_set(aggstate, nextset, true);
perhash = &aggstate->perhash[aggstate->current_set];
ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
continue;
}
else
{
/* No more hashtables, so done */
//已完成检索,设置标记,退出
aggstate->agg_done = true;
return NULL;
}
}
/*
* Clear the per-output-tuple context for each group
* 为每一个group清除per-output-tuple上下文
*
* We intentionally don't use ReScanExprContext here; if any aggs have
* registered shutdown callbacks, they mustn't be called yet, since we
* might not be done with that agg.
* 在这里不会用到ReScanExprContext,如果存在aggs注册了shutdown回调,
* 那应该还没有调用,因为我们可能还没有完成该agg的处理.
*/
ResetExprContext(econtext);
/*
* Transform representative tuple back into one with the right
* columns.
* 将典型元组转回具有正确列的元组.
*/
ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
slot_getallattrs(hashslot);
//清理元组
//重置firstSlot
ExecClearTuple(firstSlot);
memset(firstSlot->tts_isnull, true,
firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
for (i = 0; i < perhash->numhashGrpCols; i++)
{
//重置firstSlot
int varNumber = perhash->hashGrpColIdxInput[i] - 1;
firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
}
ExecStoreVirtualTuple(firstSlot);
pergroup = (AggStatePerGroup) entry->additional;
/*
* Use the representative input tuple for any references to
* non-aggregated input columns in the qual and tlist.
* 为qual和tlist中的非聚合输入列依赖使用典型输入元组
*/
econtext->ecxt_outertuple = firstSlot;
//准备投影slot
prepare_projection_slot(aggstate,
econtext->ecxt_outertuple,
aggstate->current_set);
//最终的聚合操作
finalize_aggregates(aggstate, peragg, pergroup);
//投影
result = project_aggregates(aggstate);
if (result)
return result;
}
/* No more groups */
//没有更多的groups了,返回NULL
return NULL;
}
“怎么使用PostgreSQL ExecAgg函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/6906/viewspace-2644146/