温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

数据湖DeltaLake中的DDL操作是怎么实现的

发布时间:2021-12-23 16:53:52 来源:亿速云 阅读:112 作者:柒染 栏目:大数据

本篇文章为大家展示了数据湖DeltaLake中的DDL操作是怎么实现的,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

前面讲了delta lake简介,特性及基本操作。本文主要是讲DeltaLake的DDL操作,实际上是依赖于spark datasourcev2 和catalog API(3.0+)的,所以Deltalake整合spark的时候最好是3.0开始吧,正好最近spark 3.0也发布了。

对创建sparksession有点要求,需要加上两个配置:

valspark = SparkSession  .builder()  .appName(this.getClass.getCanonicalName)  .master("local[2]")  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()

1. 创建表

deltalake创建一张表有两种方式:

1). DataFrameWriter,这个大家不陌生,spark默认写文件的方式。

df.write.format("delta").saveAsTable("events")      // create table in the metastore
df.write.format("delta").save("/delta/events")  // create table by path

2). DeltaLake也支持使用spark sql新的DDL操作来创建表,CREATE TABLE.

-- Create table in the metastoreCREATE TABLE events (  date DATE,  eventId STRING,  eventType STRING,  data STRING)USING DELTA

当创建使用Deltalake在metastore中创建一张表的时候,会在metastore中记录数据的位置信息。这样好处很明显,其他人使用的时候就比较方便查找,不用关注数据的真是存储位置。然而,metastore里不会存储数据内容是否有效。

2.数据分区

生产中使用构建数仓的时候会对数据进行分区操作,加速查询,优化DML操作。使用Delta lake创建分区表的时候,只需要指定一个分区列即可。下面是一个按照常见的按照时间进行分区的例子:

1).DDL操作

-- Create table in the metastoreCREATE TABLE events ( date DATE, eventId STRING, eventType STRING, data STRING)USING DELTAPARTITIONED BY (date)LOCATION '/delta/events'

2).Scala API

df.write.format("delta").partitionBy("date").saveAsTable("events")      // create table in the metastoredf.write.format("delta").partitionBy("date").save("/delta/events")  // create table by path

3.指定存储位置

其实,我们可以控制Delta lake的表数据文件的存储位置,在写DDL的时候可以指定path。

这个其实很像hive 的外部表的功能,指定位置的delta lake的表,可以视为是不会被metastore管理的,这种表在删除的时候,不会被真实删除数据。

假设创建Delta lake表的时候,指定的路径里数据文件已经存在,创建的时候delta lake 会做以下事情:

1).如果你仅仅在创建的时候指定了表名称和路径,如下:

CREATE TABLE eventsUSING DELTALOCATION '/delta/events'

hive metastore的表会自动从存在的数据中推断出schema,partition,和属性。该功能可用于将数据导入metastore中。

2).假设你指定了一些配置(schema,partition,或者表的属性),delta lake只会从已有数据中识别出你指定的配置信息,而不是全部配置。假设你指定的配置在已有数据中不存在,那么会抛出不一致异常。

3.读数据

数据可以直接支持sql查询,老spark用户也可以直接使用dataframe api去查询数据。

sql查询

SELECT * FROM events   -- query table in the metastore
SELECT * FROM delta.`/delta/events`  -- query table by path

dataframe查询

spark.table("events")      // query table in the metastore
spark.read.format("delta").load("/delta/events")  // create table by path

dataframe会自动读取最新的数据快照,用户不需要进行refresh table。当可以使用谓词下推的时候 delta lake会自动使用分区器和统计信息进行优化查询,进而减少数据加载。

4.写数据

a).Append

spark自身的append模式就可以完成往已有表里追加数据:

df.write.format("delta").mode("append").save("/delta/events")df.write.format("delta").mode("append").saveAsTable("events")

当然,delta 也支持insert into:

INSERT INTO events SELECT * FROM newEvents

b).overwrite

delta lake也支持直接覆盖整张表,直接使用overwrite模式即可。

dataframe api如下:

df.write.format("delta").mode("overwrite").save("/delta/events")df.write.format("delta").mode("overwrite").saveAsTable("events")

SQL API格式如下:

INSERT OVERWRITE events SELECT * FROM newEvents

使用Dataframe的时候,也可以支持只是覆盖指定分区的数据。下面的例子就是只覆盖一月份的数据:

df.write  .format("delta")  .mode("overwrite")  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")  .save("/delta/events")

这段代码会自动将数据以dataframe的形式写出到指定的deltalake表分区里,然后执行原子替换。

注意:

默认delta lake只支持数据的覆盖,不支持schema的覆盖替换。

上述内容就是数据湖DeltaLake中的DDL操作是怎么实现的,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

ddl
AI