本篇文章为大家展示了数据湖DeltaLake中的DDL操作是怎么实现的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
前面讲了delta lake简介,特性及基本操作。本文主要是讲DeltaLake的DDL操作,实际上是依赖于spark datasourcev2 和catalog API(3.0+)的,所以Deltalake整合spark的时候最好是3.0开始吧,正好最近spark 3.0也发布了。
对创建sparksession有点要求,需要加上两个配置:
valspark = SparkSession .builder() .appName(this.getClass.getCanonicalName) .master("local[2]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
1. 创建表
deltalake创建一张表有两种方式:
1). DataFrameWriter,这个大家不陌生,spark默认写文件的方式。
df.write.format("delta").saveAsTable("events") // create table in the metastore
df.write.format("delta").save("/delta/events") // create table by path
2). DeltaLake也支持使用spark sql新的DDL操作来创建表,CREATE TABLE.
-- Create table in the metastoreCREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING)USING DELTA
当创建使用Deltalake在metastore中创建一张表的时候,会在metastore中记录数据的位置信息。这样好处很明显,其他人使用的时候就比较方便查找,不用关注数据的真是存储位置。然而,metastore里不会存储数据内容是否有效。
2.数据分区
生产中使用构建数仓的时候会对数据进行分区操作,加速查询,优化DML操作。使用Delta lake创建分区表的时候,只需要指定一个分区列即可。下面是一个按照常见的按照时间进行分区的例子:
1).DDL操作
-- Create table in the metastoreCREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING)USING DELTAPARTITIONED BY (date)LOCATION '/delta/events'
2).Scala API
df.write.format("delta").partitionBy("date").saveAsTable("events") // create table in the metastoredf.write.format("delta").partitionBy("date").save("/delta/events") // create table by path
3.指定存储位置
其实,我们可以控制Delta lake的表数据文件的存储位置,在写DDL的时候可以指定path。
这个其实很像hive 的外部表的功能,指定位置的delta lake的表,可以视为是不会被metastore管理的,这种表在删除的时候,不会被真实删除数据。
假设创建Delta lake表的时候,指定的路径里数据文件已经存在,创建的时候delta lake 会做以下事情:
1).如果你仅仅在创建的时候指定了表名称和路径,如下:
CREATE TABLE eventsUSING DELTALOCATION '/delta/events'
hive metastore的表会自动从存在的数据中推断出schema,partition,和属性。该功能可用于将数据导入metastore中。
2).假设你指定了一些配置(schema,partition,或者表的属性),delta lake只会从已有数据中识别出你指定的配置信息,而不是全部配置。假设你指定的配置在已有数据中不存在,那么会抛出不一致异常。
3.读数据
数据可以直接支持sql查询,老spark用户也可以直接使用dataframe api去查询数据。
sql查询
SELECT * FROM events -- query table in the metastore
SELECT * FROM delta.`/delta/events` -- query table by path
dataframe查询
spark.table("events") // query table in the metastore
spark.read.format("delta").load("/delta/events") // create table by path
dataframe会自动读取最新的数据快照,用户不需要进行refresh table。当可以使用谓词下推的时候 delta lake会自动使用分区器和统计信息进行优化查询,进而减少数据加载。
4.写数据
a).Append
spark自身的append模式就可以完成往已有表里追加数据:
df.write.format("delta").mode("append").save("/delta/events")df.write.format("delta").mode("append").saveAsTable("events")
当然,delta 也支持insert into:
INSERT INTO events SELECT * FROM newEvents
b).overwrite
delta lake也支持直接覆盖整张表,直接使用overwrite模式即可。
dataframe api如下:
df.write.format("delta").mode("overwrite").save("/delta/events")df.write.format("delta").mode("overwrite").saveAsTable("events")
SQL API格式如下:
INSERT OVERWRITE events SELECT * FROM newEvents
使用Dataframe的时候,也可以支持只是覆盖指定分区的数据。下面的例子就是只覆盖一月份的数据:
df.write .format("delta") .mode("overwrite") .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") .save("/delta/events")
这段代码会自动将数据以dataframe的形式写出到指定的deltalake表分区里,然后执行原子替换。
注意:
默认delta lake只支持数据的覆盖,不支持schema的覆盖替换。
上述内容就是数据湖DeltaLake中的DDL操作是怎么实现的,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。