温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark2.4.3中sparkSQL用户自定义函数该怎么理解

发布时间:2021-12-17 13:59:37 来源:亿速云 阅读:205 作者:柒染 栏目:大数据

这期内容当中小编将会给大家带来有关spark2.4.3中sparkSQL用户自定义函数该怎么理解,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

1、简介

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext

来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。

我们在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。

SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。

然后使用SQL语句来操作数据,也提供了HiveQL以及其他依赖于Hive的功能支持。

创建SparkSession

SparkSession 是 Spark SQL 的入口。

使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候,第一个要创建的对象就是 SparkSession。

Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。

Builder 的方法如下:

MethodDescription
getOrCreate获取或者新建一个 sparkSession
enableHiveSupport增加支持 hive Support
appName设置 application 的名字
config设置各种配置

2、sparkSQL基本使用方法

使用的spark版本2.4.3

spark 1.x中的SQLContext在新版本中已经被废弃,改为SparkSession.builder

spark2.4.3中sparkSQL用户自定义函数该怎么理解

可以写成

val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]")
val spark1=SparkSession.builder().config(conf).getOrCreate()

或(sparksession构造器私有化在builder中)

val spark = SparkSession.builder
      .appName("my spark application")
      .master("local[2]")
      .getOrCreate()

例:

import org.apache.spark.sql.SparkSession
object HelloWorld {
  def main(args: Array[String]): Unit = {
   /* val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]")
    val spark1=SparkSession.builder().config(conf).getOrCreate()*/
    val spark = SparkSession.builder
      .appName("my spark application")
      .master("local[2]")
      .getOrCreate()
    //读取数据
    val df = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/people.json")
    //展示所有数据
    df.show()
  //DSL
    df.select("name").show()
    //SQL
    df.createTempView("people")
    spark.sql("select * from people where age=30").show()
    //关闭
    spark.close()
  }
}

输出结果 1:

 //展示所有数据
    df.show()

spark2.4.3中sparkSQL用户自定义函数该怎么理解

输出结果 2:

//DSL
    df.select("name").show()

spark2.4.3中sparkSQL用户自定义函数该怎么理解

输出结果 3:

//SQL
    df.createTempView("people")
    spark.sql("select * from people where age=30").show()

spark2.4.3中sparkSQL用户自定义函数该怎么理解

3、通过udf自定义用户函数addName (实现将字段x前拼接上name:x)

scala> spark.read.json("./examples/src/main/resources/people.json")
res32: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> res32.createOrReplaceTempView("people")
scala> spark.sql("select * from people")
res38: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> spark.sql("select * from people").show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
scala> spark.udf.register("addName",(x:String)=> "name:"+x)
res40: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> spark.sql("select addName(name) as name from people").show
+------------+
|        name|
+------------+
|name:Michael|
|   name:Andy|
| name:Justin|
+------------+

4、通过udaf自定义用户函数

package com.ny.service
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
class CustomerAvg extends UserDefinedAggregateFunction {
  //输入的类型
  override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil)
  //缓存数据的类型
  override def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  //返回值类型
  override def dataType: DataType = LongType
  //幂等性
  override def deterministic: Boolean = true
  //初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
//更新 分区内操作
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getLong(0) +input.getLong(0)
    buffer(1)=buffer.getLong(1)+1L
  }
//合并 分区与分区之间操作
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
    buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
  }
  //最终执行的方法
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0)/buffer.getLong(1)
  }
}
object CustomerAvg{
  def main(args: Array[String]): Unit = {
     val spark= SparkSession.builder()
       .appName("MyAvg")
       .master("local[2]")
       .getOrCreate()
    spark.udf.register("MyAvg",new CustomerAvg)
//读数据
    val frame = spark.read.json("/usr/local/opt/spark-2.4.3/examples/src/main/resources/peopleCP.json")
   frame.createTempView("peopleCP")
    spark.sql("select MyAvg(age) avg_age from peopleCP").show()
    spark.stop()
  }
}
nancylulululu:resources nancy$ vi peopleCP.json 
{"name":"Michael","age":11}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

返回结果

spark2.4.3中sparkSQL用户自定义函数该怎么理解

上述就是小编为大家分享的spark2.4.3中sparkSQL用户自定义函数该怎么理解了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI