这篇文章给大家分享的是有关Spark存储Parquet数据到Hive时如何对map、array、struct字段类型进行处理的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
为了更好的说明导致问题的原因、现象以及解决方案,首先看下述示例:
-- 创建存储格式为parquet的Hive非分区表CREATE EXTERNAL TABLE `t1`(`id` STRING,`map_col` MAP<STRING, STRING>,`arr_col` ARRAY<STRING>,`struct_col` STRUCT<A:STRING,B:STRING>)STORED AS PARQUETLOCATION '/home/spark/test/tmp/t1';-- 创建存储格式为parquet的Hive分区表CREATE EXTERNAL TABLE `t2`(`id` STRING,`map_col` MAP<STRING, STRING>,`arr_col` ARRAY<STRING>,`struct_col` STRUCT<A:STRING,B:STRING>)PARTITIONED BY (`dt` STRING)STORED AS PARQUETLOCATION '/home/spark/test/tmp/t2';
insert into table t1 values(1,map(),array('1,1,1'),named_struct('A','1','B','1'));
insert into table t2 partition(dt='20200101')
t1表正常执行,但对t2执行上述insert语句时,报如下异常:
Caused by: parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely insteadat parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:244) at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeMap(DataWritableWriter.java:241) at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeValue(DataWritableWriter.java:116) at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeGroupFields(DataWritableWriter.java:89) at org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:60) ... 23 more
t1和t2从建表看唯一的区别就是t1不是分区表而t2是分区表,仅仅从报错信息是无法看出表分区产生这种问题的原因,看看源码是做了哪些不同的处理(这里为了方便,笔者这里直接给出分析这个问题的源码思路图):
从抛出的异常信息empty fields are illegal,关键看empty fields在哪里抛出,做了哪些处理,这要看MessageColumnIO中startField和endField是做了哪些处理:
public void startField(String field, int index) { try { if (MessageColumnIO.DEBUG) { this.log("startField(" + field + ", " + index + ")"); } this.currentColumnIO = ((GroupColumnIO)this.currentColumnIO).getChild(index); //MessageColumnIO中,startField方法中首先会将emptyField设置为true this.emptyField = true; if (MessageColumnIO.DEBUG) { this.printState(); } } catch (RuntimeException var4) { throw new ParquetEncodingException("error starting field " + field + " at " + index, var4); }}//endField方法中会针对emptyField是否为true来决定是否抛出异常public void endField(String field, int index) { if (MessageColumnIO.DEBUG) { this.log("endField(" + field + ", " + index + ")"); } this.currentColumnIO = this.currentColumnIO.getParent(); //如果到这里仍为true,则抛异常 if (this.emptyField) { throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead"); } else { this.fieldsWritten[this.currentLevel].markWritten(index); this.r[this.currentLevel] = this.currentLevel == 0 ? 0 : this.r[this.currentLevel - 1]; if (MessageColumnIO.DEBUG) { this.printState(); } }}
针对map做处理的一些源码:
private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) { // Get the internal map structure (MAP_KEY_VALUE) GroupType repeatedType = type.getType(0).asGroupType(); recordConsumer.startGroup(); recordConsumer.startField(repeatedType.getName(), 0); Map<?, ?> mapValues = inspector.getMap(value); Type keyType = repeatedType.getType(0); String keyName = keyType.getName(); ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); Type valuetype = repeatedType.getType(1); String valueName = valuetype.getName(); ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) { recordConsumer.startGroup(); if (keyValue != null) { // write key element Object keyElement = keyValue.getKey(); //recordConsumer此处对应的是MessageColumnIO中的MessageColumnIORecordConsumer //查看其中的startField和endField的处理 recordConsumer.startField(keyName, 0); //查看writeValue中对原始数据类型的处理,如int、boolean、varchar writeValue(keyElement, keyInspector, keyType); recordConsumer.endField(keyName, 0); // write value element Object valueElement = keyValue.getValue(); if (valueElement != null) { //同上 recordConsumer.startField(valueName, 1); writeValue(valueElement, valueInspector, valuetype); recordConsumer.endField(valueName, 1); } } recordConsumer.endGroup(); } recordConsumer.endField(repeatedType.getName(), 0); recordConsumer.endGroup();}private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) { //value为null,则return if (value == null) { return; } switch (inspector.getPrimitiveCategory()) { //PrimitiveCategory为VOID,则return case VOID: return; case DOUBLE: recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value)); break; //下面是对double、boolean、float、byte、int等数据类型做的处理,这里不在贴出 ....
这里只是以map为例,对于array、struct都有类似问题,看源码HiveFileFormat -> DataWritableWriter对这三者处理方式类似。类似的问题,在Hive的issue中https://issues.apache.org/jira/browse/HIVE-11625也有讨论。
1. 如果无法改变建表schema,或者存储时底层用的就是HiveFileFormat
-- 这种方式本质上还是用ParquetFileFormat,并且是内部表,生产中不建议直接使用这种方式CREATE TABLE `test`(`id` STRING,`map_col` MAP<STRING, STRING>,`arr_col` ARRAY<STRING>,`struct_col` STRUCT<A:STRING,B:STRING>)USING parquetOPTIONS(`serialization.format` '1');
3. 存储时指定ParquetFileFormat
感谢各位的阅读!关于“Spark存储Parquet数据到Hive时如何对map、array、struct字段类型进行处理”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/bigdatalearnshare/blog/4836602