这篇文章主要介绍“Spark UDF变长参数的方法是什么”,在日常操作中,相信很多人在Spark UDF变长参数的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark UDF变长参数的方法是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
引子
变长参数对于我们来说并不陌生,在Java里我们这么写
public void varArgs(String... args)
在Scala里我们这么写
def varArgs(cols: String*): String
而在Spark里,很多时候我们有自己的业务逻辑,现成的functions满足不了我们的需求,而当我们需要处理同一行的多个列,将其经过我们自己的逻辑合并为一个列时,变长参数及其变种实现可以给我们提供帮助。
但是在Spark UDF里我们是 无法使用变长参数传值 的,但之所以本文以变长参数开头,是因为需求起于它,而通过对它进行变换,我们可以使用变长参数或Seq类型来接收参数。
下面通过Spark-Shell来做演示,以下三种方法都可以做到多列传参,分别是
变长参数(接受array类型)
Seq类型参数(接受array类型)
Row类型参数(接受struct类型)
变长参数类型的UDF
定义UDF方法
def myConcatVarargs(sep: String, cols: String*): String = cols.filter(_ != null).mkString(sep)
注册UDF函数
由于变长参数只能通过方法定义,所以这里使用部分应用函数来转换
val myConcatVarargsUDF = udf(myConcatVarargs _)
可以看到该UDF的定义如下
UserDefinedFunction(<function2>,StringType,List(StringType, ArrayType(StringType,true)))
也即变长参数转换为了ArrayType,而且函数是只包括两个参数,所以变长参数列表由此也可看出无法使用的。
变长参数列表传值
我们构造一个DataFrame如下
val df = sc.parallelize(Array(("aa", "bb", "cc"),("dd","ee","ff"))).toDF("A", "B", "C")
然后直接传入多个String类型的列到myConcatVarargsUDF
df.select(myConcatVarargsUDF(lit("-"), col("A"), col("B"), col("C"))).show
结果出现如下报错
java.lang.ClassCastException: anonfun$1 cannot be cast to scala.Function4
由此可以看出,使用变长参数列表的方式Spark是不支持的,它会被识别为四个参数的函数,而UDF确是被定义为两个参数而不是四个参数的函数!
变换:使用array()转换做第二个参数
我们使用Spark提供的array() function来转换参数为Array类型
df.select(myConcatVarargsUDF(lit("-"), array(col("A"), col("B"), col("C")))).show
结果如下
+-------------------+ |UDF(-,array(A,B,C))| +-------------------+ | aa-bb-cc| | dd-ee-ff| +-------------------+
由此可以看出,使用变长参数构造的UDF方法,可以通过构造Array的方式传参,来达到多列合并的目的。
使用Seq类型参数的UDF
上面提到,变长参数***被转为ArrayType,那不禁要想我们为嘛不使用Array或List类型呢?
实际上在UDF里,类型并不是我们可以随意定义的,比如使用List和Array就是不行的,我们自己定义的类型也是不行的,因为这涉及到数据的序列化和反序列化。
以Array/List为示例的错误
下面以Array类型为示例
定义函数
val myConcatArray = (cols: Array[String], sep: String) => cols.filter(_ != null).mkString(sep)
注册UDF
val myConcatArrayUDF = udf(myConcatArray)
可以看到给出的UDF签名是
UserDefinedFunction(<function2>,StringType,List())
应用UDF
df.select(myConcatArrayUDF(array(col("A"), col("B"), col("C")), lit("-"))).show
会发现报错
scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String
同样List作为参数类型也会报错,因为反序列化的时候无法构建对象,所以List和Array是无法直接作为UDF的参数类型的
以Seq做参数类型
定义调用如下
val myConcatSeq = (cols: Seq[Any], sep: String) => cols.filter(_ != null).mkString(sep) val myConcatSeqUDF = udf(myConcatSeq) df.select(myConcatSeqUDF(array(col("A"), col("B"), col("C")), lit("-"))).show
结果如下
+-------------------+ |UDF(array(A,B,C),-)| +-------------------+ | aa-bb-cc| | dd-ee-ff| +-------------------+
使用Row类型参数的UDF
我们可以使用Spark functions里struct方法构造结构体类型传参,然后用Row类型接UDF的参数,以达到多列传值的目的。
def myConcatRow: ((Row, String) => String) = (row, sep) => row.toSeq.filter(_ != null).mkString(sep) val myConcatRowUDF = udf(myConcatRow) df.select(myConcatRowUDF(struct(col("A"), col("B"), col("C")), lit("-"))).show
可以看到UDF的签名如下
UserDefinedFunction(<function2>,StringType,List())
结果如下
+--------------------+ |UDF(struct(A,B,C),-)| +--------------------+ | aa-bb-cc| | dd-ee-ff| +--------------------+
使用Row类型还可以使用模式提取,用起来会更方便
row match { case Row(aa:String, bb:Int) => }
***
对于上面三种方法,变长参数和Seq类型参数都需要array的函数包装为ArrayType,而使用Row类型的话,则需要struct函数构建结构体类型,其实都是为了数据的序列化和反序列化。三种方法中,Row的方式更灵活可靠,而且支持不同类型并且可以明确使用模式提取,用起来相当方便。
而由此我们也可以看出,UDF不支持List和Array类型的参数,同时 自定义参数类型 如果没有混合Spark的特质实现序列化和反序列化,那么在UDF里也是 无法用作参数类型 的。当然,Seq类型是可以 的,可以接多列的数组传值。
此外,我们也可以使用柯里化来达到多列传参的目的,只是不同参数个数需要定义不同的UDF了。
到此,关于“Spark UDF变长参数的方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。