这期内容当中小编将会给大家带来有关数据湖的最佳调优是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1.选择最佳恰当的分区列
对于delta 表建议指定分区列。企业中最常见的分区列就是date,地域这些。遵循以下两个经验法则来决定要按哪个列进行分区:
a.如果列的基数很高,请不要使用该列进行分区。例如,如果按userId列进行分区,分区数可能就是用户总数,这明显不是一个好的分区策略。
b.每个分区中的数据量:如果希望该分区中的数据至少为1 GB,则可以按满足这个需求的列进行分区。
2.合并文件
如果是不断将数据写入Delta表,随着时间的推移,会产生大量文件,尤其是如果小数据量的添加数据时。这个可能会大大降低表的查询速率,也可能影响文件系统的性能。理想情况下,应定期将大量的小文件重写为少量的较大的文件。
可以通过将表重新分区为较少数量的文件来实现压缩表功能。另外,可以将dataChange配置指定为false表示该操作不会变更数据,仅重新排列数据布局。这将确保由于此压缩操作对其他并行操作的影响最小。
例如,可以将一个表压缩为16个文件:
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
如果对表进行了分区,并且您只想基于谓词对一个分区进行重新分区,则可以使用where仅读取分区,并使用replaceWhere写回该分区:
val path = "..."val partition = "year = '2019'"val numFilesPerPartition = 16spark.read .format("delta") .load(path) .where(partition) .repartition(numFilesPerPartition) .write .option("dataChange", "false") .format("delta") .mode("overwrite") .option("replaceWhere", partition) .save(path)
警告:
在更改数据的操作上配置dataChange = false可能会损坏表中的数据。
3.merge操作的性能调优
下面几个方法可以有效减少merge的处理时间:
a.减少匹配查找的数据量
默认情况下,merge操作会扫描整个delta lake表找到满足条件的数据。可以加些谓词,以减少数据量。比如,数据是以country和date进行分区的,而你只想更新特定国家的昨天的数据。就可以增加一些条件,比如:
events.date = current_date() AND events.country = 'USA'
这样就只会处理指定分区的数据,大大减少了数据扫描量。也可以避免不同分区之间操作的一些冲突。
b.合并文件
如果数据存储的时候有很多小文件,就会降低数据的读取速度。可以合并小文件成一些大文件,来提升读取的速度。后面会说到这个问题。
c.控制shuffle的分区数
为了计算和更新数据,merge操作会对数据进行多次shuffle。shuffle过程中task数量是由参数spark.sql.shuffle.partitions来设置,默认是200。该参数不仅能控制shuffle的并行度,也能决定输出的文件数。增加这个值虽然可以增加并行度,但也相应的增加了产生小文件数。
d.写出数据之间进行重分区
对与分区表,merge操作会产生很多小文件,会比shuffle分区数多很多。原因是每个shuffle任务会为多分区表产生更多的文件,这可能会是一个性能瓶颈。所以,很多场景中使用表的分区列对数据进行写入前重分区是很有效的。可以通过设置spark.delta.merge.repartitionBeforeWrite为true来生效。
上述就是小编为大家分享的数据湖的最佳调优是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。