这篇文章主要介绍了hive中lateral view怎么用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
Lateral view与UDTF函数一起使用,UDTF对每个输入行产生0或者多个输出行。Lateral view首先在基表的每个输入行应用UDTF,然后连接结果输出行与输入行组成拥有指定表别名的虚拟表。
explain SELECT id, sq,myCol from window_test_table LATERAL VIEW explode(split(sq,',')) myTab as myCol;
这个sql 经历了两条线:
ts(TableScan)-->lvf(Lateral View Forward)-->sel(Select)-->lvj(Lateral View Join)-->sel(Select) ts(TableScan)-->lvf(Lateral View Forward)-->sel(Select)-->udtf-->lvj(Lateral View Join)-->sel(Select)
不多说,常规读表操作
@Override public void process(Object row, int tag) throws HiveException { forward(row, inputObjInspectors[tag]); }
几乎什么都没做,数据怎么来的,还怎么送出去。
筛选出你需要的非explode的列:id,sq
筛选出explode的列:split(sq, ',')
@Override public void process(Object row, int tag) throws HiveException { StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; List<? extends StructField> fields = soi.getAllStructFieldRefs(); //从row里解出字段 for (int i = 0; i < fields.size(); i++) { objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i)); } //真正处理数据的是 genericUDTF的某个实现类,比如,explode,那就是GenericUDTFExplode.java 的process genericUDTF.process(objToSendToUDTF); //这里判断一下有没有outer关键字。这里真的真的真的是,可能用了很久了,还不知道udtf还有个outer 关键字 if (conf.isOuterLV() && collector.getCounter() == 0) { //思考一下这一步是干嘛? collector.collect(outerObj); } collector.reset(); }
GenericUDTFExplode.java就相当容易理解了,毕竟我们自己写udtf时,也是这么做的:
/** * GenericUDTFExplode. * */ @Description(name = "explode", value = "_FUNC_(a) - separates the elements of array a into multiple rows," + " or the elements of a map into multiple rows and columns ") public class GenericUDTFExplode extends GenericUDTF { .... @Override //主要处理数据的方法 public void process(Object[] o) throws HiveException { switch (inputOI.getCategory()) { case LIST: //处理list ListObjectInspector listOI = (ListObjectInspector)inputOI; List<?> list = listOI.getList(o[0]); if (list == null) { return; //当数组里没有值时,不发送数据 } for (Object r : list) { forwardListObj[0] = r; forward(forwardListObj); } break; case MAP: //处理map MapObjectInspector mapOI = (MapObjectInspector)inputOI; Map<?,?> map = mapOI.getMap(o[0]); if (map == null) { return; } for (Entry<?,?> r : map.entrySet()) { forwardMapObj[0] = r.getKey(); forwardMapObj[1] = r.getValue(); forward(forwardMapObj); } break; default: throw new TaskExecutionException("explode() can only operate on an array or a map"); } } .... }
当UDTF不产生任何行时,比如explode()函数的输入列为空,LATERALVIEW就不会生成任何输出行。在这种情况下原有行永远不会出现在结果中。OUTRE可被用于阻止这种情况,输出行中来自UDTF的列将被设置为NULL。
例如:
实际上从代码里,也能够看到:
UDTF会借助UDTFCollector为其展开的结果计数,并forward:
@Override public void collect(Object input) throws HiveException { op.forwardUDTFOutput(input); counter++; }
如果没有展开结果,counter就为0。 这样,进入outer之后,会把之前建好的没有内容的outerObj给forward到下个 算子LateralViewJoinOperator
@Override public void process(Object row, int tag) throws HiveException { StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag]; //标识是左侧select过来的 if (tag == SELECT_TAG) { selectObjs.clear(); selectObjs.addAll(soi.getStructFieldsDataAsList(row)); } else if (tag == UDTF_TAG) { //代表是右侧udtf过来的 acc.clear(); acc.addAll(selectObjs); acc.addAll(soi.getStructFieldsDataAsList(row)); //合并数据 forward(acc, outputObjInspector); } else { throw new HiveException("Invalid tag"); } }
LateralViewJoinOperator处理逻辑也是很简单明了,这里的join也是简单的List.addAll
Lateral view explode 会产生shuffle吗?
当然不会,毋庸置疑! 其实一开始看执行计划就会发现,没有reduce任务呀~~
这里的Join代表的是两份数据联接到一起的意思,并不是真正的意义上的join。
感谢你能够认真阅读完这篇文章,希望小编分享的“hive中lateral view怎么用”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。