温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark 3.0的新功能是什么呢

发布时间:2021-12-17 11:19:23 来源:亿速云 阅读:152 作者:柒染 栏目:大数据

今天就跟大家聊聊有关Spark 3.0的新功能是什么呢,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

最近,Apache Spark社区发布了Spark  3.0的预览版,该预览版包含许多重要的新功能,这些功能将帮助Spark创造强大的影响力,在此大数据和数据科学时代,该产品已拥有广泛的企业用户和开发人员。

在新版本中,Spark社区已将一些功能从Spark SQL移植到了编程的Scala API(

org.apache.spark.sql.functions),这鼓励开发人员直接将此功能用作其DataFrame转换的一部分,而不是直接输入  进入SQL模式或创建视图,并使用此函数以及SQL表达式或callUDF函数。

社区还辛苦地引入了一些新的数据转换功能和partition_transforms函数,这些功能在与Spark的新DataFrameWriterv2一起使用以将数据写到某些外部存储时非常有用。

Spark 3中的一些新功能已经是Databricks Spark以前版本的一部分。  因此,如果您在Databricks云中工作,您可能会发现其中一些熟悉的功能。

下面介绍了Spark SQL和Scala API中用于DataFrame操作访问的Spark新功能,以及从Spark SQL移植到Scala  API以进行编程访问的功能。

Spark SQL中的Spark 3.0中引入的功能以及用于DataFrame转换的功能

from_csv

像from_json一样,此函数解析包含CSV字符串的列,并将其转换为Struct类型。 如果CSV字符串不可解析,则将返回null。

例:

  • 该函数需要一个Struct模式和一些选项,这些模式和选项指示如何解析CSV字符串。 选项与CSV数据源相同。

ss="dp-sql">ss="alt">val studentInfo = ss="string">"1,Jerin,CSE"::ss="string">"2,Jerlin,ECE"::ss="string">"3,Arun,CSE"::Nil ss="">val ss="keyword">schema = new StructType()  ss="alt">            .ss="keyword">add(ss="string">"Id",IntegerType) ss="">            .ss="keyword">add(ss="string">"Name",StringType) ss="alt">            .ss="keyword">add(ss="string">"Dept",StringType) ss="">val options = Map(ss="string">"delimiter" ->ss="string">",") ss="alt">val studentDF = studentInfo.toDF(ss="string">"Student_Info") ss="">.withColumn(ss="string">"csv_struct",from_csv('Student_Info, ss="keyword">schema,options)) ss="alt">studentDF.show()

to_csv

要将"结构类型"列转换为CSV字符串。

例:

  • 与Struct type列一起,此函数还接受可选的options参数,该参数指示如何将Struct列转换为CSV字符串。

ss="dp-sql">ss="alt">studentDF ss="">.withColumn(ss="string">"csv_string",to_csv($ss="string">"csv_struct",Map.empty[String, String].asJava)) ss="alt">.show

推断CSV字符串的模式,并以DDL格式返回模式。

例:

  • 该函数需要一个CSV字符串列和一个可选参数,其中包含如何解析CSV字符串的选项。

ss="dp-sql">ss="alt">studentDF ss="">  .withColumn(ss="string">"schema",schema_of_csv(ss="string">"csv_string")) ss="alt">  .show

for_all

将给定谓词应用于数组中的所有元素,并且仅当数组中的所有元素求值为true时返回true,否则返回false。

例:

  • 检查给定Array列中的所有元素是否均是偶数。

ss="dp-sql">ss="alt">val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"int_array") ss="">df.withColumn(ss="string">"flag",forall($ss="string">"int_array",(x:ss="keyword">Column)=>(lit(x%2==0)))) ss="alt">.show

transform

将函数应用于数组中的所有元素后,返回一个新数组。

例:

  • 将" 1"添加到数组中的所有元素。

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"num_array",transform($ss="string">"num_array",x=>x+1)).show

overlay

要替换列的内容,请使用从指定字节位置到可选的指定字节长度的实际替换内容。

例:

  • 将特定人员的问候语更改为传统的" Hello World"

这里我们用世界替换人名,因为名字的起始位置是7,并且我们要在替换内容之前删除完整的姓名,需要删除的字节位置的长度应大于或等于最大值  列中名称的长度。

因此,我们将替换词传递为"world",将内容替换为" 7"的特定起始位置,从指定起始位置移除的位置数为" 12"(如果未指定,则该位置是可选的  函数只会从指定的起始位置将源内容替换为替换内容)。

覆盖替换了StringType,TimeStampType,IntegerType等中的内容。但是Column的返回类型将始终为StringType,而与Column输入类型无关。

ss="dp-sql">ss="alt">val greetingMsg = ss="string">"Hello Arun"::ss="string">"Hello Mohit Chawla"::ss="string">"Hello Shaurya"::Nil ss="">val greetingDF = greetingMsg.toDF(ss="string">"greet_msg") ss="alt">greetingDF.withColumn(ss="string">"greet_msg",overlay($ss="string">"greet_msg",lit(ss="string">"World"),lit(ss="string">"7"),lit(ss="string">"12"))) ss="">.show

分裂

根据给定的正则表达式和指定的限制拆分字符串表达式,该限制指示将正则表达式应用于给定的字符串表达式的次数。

如果指定的限制小于或等于零,则正则表达式将在字符串上应用多次,并且结果数组将根据给定的正则表达式包含所有可能的字符串拆分。

如果指定的限制大于零,则将使用不超过该限制的正则表达式

例:

  • 根据正则表达式将给定的字符串表达式拆分为两个。 即 字符串定界符。

ss="dp-sql">ss="alt">val num = ss="string">"one~two~three"::ss="string">"four~five"::Nil ss="">val numDF = num.toDF(ss="string">"numbers") ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",2)) ss="alt">.show

将同一个字符串表达式分成多个部分,直到出现分隔符

ss="dp-sql">ss="alt">numDF ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",0)) ss="alt">.show

map_entries

将映射键值转换为无序的条目数组。

例:

  • 获取数组中Map的所有键和值。

ss="dp-sql">ss="alt">val df = Seq(Map(1->ss="string">"x",2->ss="string">"y")).toDF(ss="string">"key_values") ss="">df.withColumn(ss="string">"key_value_array",map_entries($ss="string">"key_values")) ss="alt">.show

map_zip_with

使用功能根据键将两个Map合并为一个。

例:

  • 要计算跨部门员工的总销售额,并通过传递一个函数,该函数将基于键汇总来自两个不同"地图"列的总销售额,从而在单个地图中获取特定员工的总销售额。

ss="dp-sql">ss="alt">val df = Seq((Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000), ss="">             Map(ss="string">"EID_1"->1000,ss="string">"EID_2"->2500)))   .toDF(ss="string">"emp_sales_dept1",ss="string">"emp_sales_dept2") ss="alt"> ss="">df. ss="alt">withColumn(ss="string">"total_emp_sales",map_zip_with($ss="string">"emp_sales_dept1",$ss="string">"emp_sales_dept2",(k,v1,v2)=>(v1+v2))) ss="">.show

map_filter

返回仅包含满足给定谓词功能的Map值的新键值对。

例:

  • 仅筛选出销售值高于20000的员工

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">          .toDF(ss="string">"emp_sales") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"filtered_sales",map_filter($ss="string">"emp_sales",(k,v)=>(v>20000))) ss="">.show

transform_values

根据给定的函数操作Map列中所有元素的值。

例:

  • 通过给每个雇员加薪5000来计算雇员薪水

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) ss="">         .toDF(ss="string">"emp_salary") ss="alt"> ss="">df ss="alt">.withColumn(ss="string">"emp_salary",transform_values($ss="string">"emp_salary",(k,v)=>(v+5000))) ss="">.show

transform_keys

根据给定的函数操作Map列中所有元素的键。

例:

  • 要将公司名称" XYZ"添加到员工编号。

ss="dp-sql">ss="alt">val df = Seq(Map(ss="string">"EID_1" -> 10000, ss="string">"EID_2" -> 25000)) ss="">        .toDF(ss="string">"employees") ss="alt">df ss="">.withColumn(ss="string">"employees", transform_keys($ss="string">"employees", (k, v) => concat(k,lit(ss="string">"_XYZ")))) ss="alt">.show

xhash74

要计算给定列内容的哈希码,请使用64位xxhash算法并将结果返回为long。

从Spark SQL移植到Spark 3.0中的Scala API进行DataFrame转换的功能

Scala API可使用大多数Spark SQL函数,该函数可将相同的函数用作DataFrame操作的一部分。 但是仍然有一些功能不能作为编程功能使用。  要使用这些功能,必须进入Spark SQL模式并将这些功能用作SQL表达式的一部分,或使用Spark" callUDF"功能使用相同的功能。  随着功能的普及和使用不断发展,这些功能中的某些功能过去曾被移植到新版本的程序化Spark API中。 以下是从以前版本的Spark SQL函数移植到Scala  API(

org.spark.apache.sql.functions)的函数

date_sub

从日期,时间戳记和字符串数据类型中减去天数。 如果数据类型为字符串,则其格式应可转换为日期" yyyy-MM-dd"或" yyyy-MM-dd  HH:mm:ss.ssss"

例:

  • 从eventDateTime中减去" 1天"。

如果要减去的天数为负,则此功能会将给定的天数添加到实际日期中。

ss="dp-sql">ss="alt">var df = Seq( ss="">        (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">        (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">        (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">        (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">     .toDF(ss="string">"typeId",ss="string">"eventDateTime") ss=""> ss="alt"> df.withColumn(ss="string">"Adjusted_Date",date_sub($ss="string">"eventDateTime",1)).show()

date_add

与date_sub相同,但是将天数添加到实际天数中。

例:

  • 将" 1天"添加到eventDateTime

ss="dp-sql">ss="alt">var df = Seq( ss="">         (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">         (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">         (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">         (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">         ) ss="alt">    .toDF(ss="string">"Id",ss="string">"eventDateTime") ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",date_add($ss="string">"eventDateTime",1)) ss="">.show()

months_add

像date_add和date_sub一样,此功能有助于添加月份。

减去月份,将要减去的月份数设为负数,因为没有单独的减去函数用于减去月份

例:

  • 从eventDateTime添加和减去一个月。

ss="dp-sql">ss="alt">var df = Seq( ss="">    (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), ss="alt">    (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), ss="">    (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), ss="alt">    (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) ss="">     ).toDF(ss="string">"typeId",ss="string">"eventDateTime") ss="alt">//ss="keyword">To ss="keyword">add one months ss=""> df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",1)) ss="">.show() ss="alt">//ss="keyword">To subtract one months ss="">df ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",-1)) ss="">.show()

zip_with

通过应用函数合并左右数组。

此函数期望两个数组的长度都相同,如果其中一个数组比另一个数组短,则将添加null以匹配更长的数组长度。

例:

  • 将两个数组的内容相加并合并为一个

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),Seq(5,10,3))) ss="">         .toDF(ss="string">"array_1",ss="string">"array_2") ss="alt">   ss=""> df ss="alt">.withColumn(ss="string">"merged_array",zip_with($ss="string">"array_1",$ss="string">"array_2",(x,y)=>(x+y))) ss=""> .show

将谓词应用于所有元素,并检查数组中的至少一个或多个元素是否对谓词函数成立。

例:

  • 检查数组中至少一个元素是否为偶数。

ss="dp-sql">ss="alt">val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"flag",exists($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

过滤

将给定谓词应用于数组中的所有元素,并过滤掉谓词为true的元素。

例:

  • 仅过滤掉数组中的偶数元素。

ss="dp-sql">ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array") ss="">df.withColumn(ss="string">"even_array",filter($ss="string">"num_array", x =>lit(x%2===0))) ss="alt">.show

聚合 aggregate

使用给定函数将给定数组和另一个值/状态简化为单个值,并应用可选的finish函数将缩减后的值转换为另一个状态/值。

例:

  • 将10加到数组的总和并将结果乘以2

ss="dp-sql">ss="alt">val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8)) ss="">  .toDF(ss="string">"num_array",ss="string">"constant") ss="alt">df.withColumn(ss="string">"reduced_array",aggregate($ss="string">"num_array", $ss="string">"constant",(x,y)=>x+y,x => x*2)) ss="">  .show

Spark 3.0中为Spark SQL模式引入的功能

以下是新的SQL函数,您只能在Spark SQL模式下才能使用它们。

acosh

查找给定表达式的双曲余弦的倒数。

asinh

找出给定表达式的双曲正弦的逆。

atanh

查找给定表达式的双曲正切的逆。

bit_and,bit_or和bit_xor

计算按位AND,OR和XOR值

bit_count

返回计数的位数。

bool_and和bool_or

验证表达式的所有值是否为真或验证表达式中的至少一个为真。

count_if

返回一列中的真值数量

例:

  • 找出给定列中的偶数值

ss="dp-sql">ss="alt">var df = Seq((1),(2),(4)).toDF(ss="string">"num") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select count_if(num %2==0) from table").show

date_part

提取日期/时间戳的一部分,例如小时,分钟等…

div

用于将表达式或带有另一个表达式/列的列分开

every 和 sum

如果给定的表达式对每个列的所有列值都求值为true,并且至少一个值对某些值求得true,则此函数返回true。

make_date,make_interval和make_timestamp

构造日期,时间戳和特定间隔。

例:

ss="dp-sql">ss="alt">ss="keyword">SELECT make_timestamp(2020, 01, 7, 30, 45.887)

max_by和min_by

比较两列并返回与右列的最大值/最小值关联的左列的值

例:

ss="dp-sql">ss="alt">var df = Seq((1,1),(2,1),(4,3)).toDF(ss="string">"x",ss="string">"y") ss=""> ss="alt"> df.createOrReplaceTempView(ss="string">"table") ss="">spark.sql(ss="string">"select max_by(x,y) from table").show

类型

返回列值的数据类型

返回Spark版本及其git版本

justify_days,justify_hours和justify_interval

新引入的对齐功能用于调整时间间隔。

例:

  • 表示30天为一个月,

ss="dp-sql">ss="alt">ss="keyword">SELECT justify_days(interval ss="string">'30 day')

分区转换功能

从Spark 3.0及更高版本开始,存在一些新功能,这些功能有助于对数据进行分区,我将在另一篇文章中介绍。

总体而言,我们已经分析了所有数据转换和分析功能,这些功能是3.0版本中产生的火花。 希望本指南有助于您了解这些新功能。  这些功能肯定会加速火花开发工作,并有助于建立坚固有效的火花管道。

看完上述内容,你们对Spark 3.0的新功能是什么呢有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI