如何进行SparkSQL与Hive metastore Parquet转换的分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
这里从表schema的处理角度而言,就必须注意Hive和Parquet兼容性,主要有两个区别:
1.Hive是大小写敏感的,但Parquet相反
2.Hive会将所有列视为nullable,但是nullability在parquet里有独特的意义
1.有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持Parquet侧的数据类型,这样就可以处理到nullability类型了(空值问题)
2.兼容处理的schema应只包含在Hive元数据里的schema信息,主要体现在以下两个方面:
(1)只出现在Parquet schema的字段会被忽略
// 第一种方式应用的比较多1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")2. sparkSession.catalog.refreshByPath(s"${path}")
最后说一下最近后台小伙伴在生产中遇到的一个问题,大家如果在业务处理中遇到类似的问题,提供一个思路。
在说问题之前首先了解一个参数spark.sql.parquet.writeLegacyFormat(默认false)的作用:
比如,对于decimal数据类型的兼容处理,不设置true时,经常会报类似如下的错误:
Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file hdfs://hadoop/data/test_decimal/dt=20200515000000/part-00000-9820eba2-8a40-446d-8c28-37027a1b1f2d-c000.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)... Caused by: java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary at parquet.column.Dictionary.decodeToBinary(Dictionary.java:44)...
此时我们需要将spark.sql.parquet.writeLegacyFormat设置为true来解决上述的异常问题。
但如果同时设置spark.sql.hive.convertMetastoreParquet为false时,要注意一些数据类型以及精度的处理,比如对于decimal类型的处理。通过一个例子复原一下当时的场景:
1.创建Hive外部表testdb.test_decimal,其中字段fee_rate为decimal(10,10)
CREATE EXTERNAL TABLE `testdb`.`test_decimal`(`no` STRING , `fee_rate` DECIMAL(10,10)) PARTITIONED BY (`dt` STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://hadoop/data/test_decimal' TBLPROPERTIES ( 'transient_lastDdlTime' = '1589160440' ) ;
2.将testdb.item中的数据处理后保存到testdb.test_decimal中
// 这里为了展示方便,直接查询testdb.item中的数据
// 注意: 字段fee_rate的类型为decimal(10,6)
select no, fee_rate from testdb.item where dt=20190528;
// testdb.item中数据示例如下
+-------------------+----------------+
| no| fee_rate|
+-------------------+----------------+
| 1| 0.000000|
| 2| 0.000000|
| 3| 0.000000|
+-------------------+----------------+
// tmp是上述查询testdb.item获得的临时表// 以parquet格式保存到test_decimal的20200529分区中save overwrite tmp as parquet.`/data/test_decimal/dt=20200529`; msck repair TABLE testdb.item;
上述1-3都能成功执行,数据也能保存到testdb.test_decimal中,但是当查询testdb.test_decimal中的数据时,比如执行sql:
select * from testdb.test_decimal where dt = 20200529;
Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ...
究其原因是因为按照上述两个参数的配置,testdb.item中fee_rate字段类型为decimal(10,6),数据为0.000000,经过一系列处理0.000000最终会被处理为0,看下边最终导致空指针异常的部分,就会一目了然。
public static BigDecimal enforcePrecisionScale(BigDecimal bd, int maxPrecision, int maxScale) { if (bd == null) { return null; } else { bd = trim(bd); if (bd.scale() > maxScale) { bd = bd.setScale(maxScale, RoundingMode.HALF_UP); } // testdb.test_decimal中fee_rate的类型decimal(10,10),即precision为10,scale也为10 // 对应这里即maxPrecision和maxScale分别为10,则maxIntDigits为0 int maxIntDigits = maxPrecision - maxScale; // bd对应0。对于0而言,precision为1,scale为0 // 处理之后 intDigits为1 int intDigits = bd.precision() - bd.scale(); return intDigits > maxIntDigits ? null : bd; }}
解决办法也很简单,就是将testdb.test_decimal中的fee_rate数据类型和依赖的表testdb.item中的fee_rate保持完全一致,即也为decimal(10,6)。
这个现象在实际应用环境中经常遇到,通用的解决办法就是将要保存的表中的数据类型与依赖的表(物理表或者临时表)的字段类型保持完全一致。
看完上述内容,你们掌握如何进行SparkSQL与Hive metastore Parquet转换的分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。