Spark SQL 是 Spark 用来处理结构化数据(结构化数据可以来自外部结构化数据源也可以通 过 RDD 获取)的一个模块,它提供了一个编程抽象叫做 DataFrame 并且作为分布式 SQL 查 询引擎的作用。
外部的结构化数据源包括 JSON、Parquet(默认)、RMDBS、Hive 等。当前 Spark SQL 使用 Catalyst 优化器来对 SQL 进行优化,从而得到更加高效的执行方案。并且可以将结果存储到外部系统。
- 容易整合
- 统一的数据访问方式
- 兼容hive
- 标准的数据连接
- spark sql 的前身是shark。但是spark sql抛弃了原有shark的代码,汲取了shark的一些优点,如:列存储(In-Memory Columnar Storage)、Hive 兼容性等,重新开发 SparkSQL。
- spark -1.1 2014 年 9 月 11 日,发布 Spark1.1.0。Spark 从 1.0 开始引入 SparkSQL(Shark 不再支持升级与维护)。Spark1.1.0 变化较大是 SparkSQL 和 MLlib
- spark -1.3 增加了dataframe新
- spark -1.4 增加了窗口分析函数
- spark - 1.5 钨丝计划。Hive 中有 UDF 与 UDAF,Spark 中对 UDF 支持较早
- spark 1.6 执行的 sql 中可以增加"--"注释,Spark-1.5/1.6 的新特性,引入 DataSet 的概念
- spark 2.x SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入 SparkSession 统一了 RDD,DataFrame,DataSet 的编程入口
SparkSession 是 Spark-2.0 引如的新概念。SparkSession 为用户提供了统一的切入点,来让用户学习 Spark 的各项功能。
随着 DataSet 和 DataFrame 的 API 逐渐成为标准的 API,SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession 封装了 SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext 也被保存下来。
特点:
- 为用户提供一个统一的切入点使用 Spark 各项功能
- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序
- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互
- 与 Spark 交互之时不需要显示的创建 SparkConf、SparkContext 以及 SQlContext,这些对 象已经封闭在 SparkSession 中
- SparkSession 提供对 Hive 特征的内部支持:用 HiveQL 写 SQL 语句,访问 Hive UDFs,从 Hive 表中读取数据
SparkSession的创建:
在spark-shell中SparkSession 会被自动初始化一个对象叫做 spark,为了向后兼容,Spark-Shell 还提供了一个 SparkContext 的初始化对象,方便用户操作:
在代码开发的时候创建:
val conf = new SparkConf()
val spark: SparkSession = SparkSession.builder()
.appName("_01spark_sql")
.config(conf)
.getOrCreate()
这里主要说的是RDD的局限性:
- RDD是不支持spark-sql的
- RDD 仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义
- RDD 需要用户自己优化程序,对程序员要求较高
- 从不同数据源读取数据相对困难,读取到不同格式的数据都必须用户自己定义转换方式 合并多个数据源中的数据也较困难
DataFrame 被称为 SchemaRDD。以行为单位构成的分布式数据集合,按照列赋予不同的名称。对 select、fileter、aggregation 和 sort 等操作符的抽象。其中 Schema 是就是元数据,是语义描述信息。DataFrame是分布式的Row对象的集合.
DataFrame = RDD+Schema = SchemaRDD
优势:
- DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
- DataFrame 自带优化器 Catalyst,可以自动优化程序
- DataFrame 提供了一整套的 Data Source API
特点:
- 支持 单机 KB 级到集群 PB 级的数据处理
- 支持多种数据格式和存储系统
- 通过 Spark SQL Catalyst 优化器可以进行高效的代码生成和优化
- 能够无缝集成所有的大数据处理工具
- 提供 Python, Java, Scala, R 语言 API
由于 DataFrame 的数据类型统一是 Row,所以 DataFrame 也是有缺点的。Row 运行时类型检查,比如 salary 是字符串类型,下面语句也只有运行时才进行类型检查。 dataframe.filter("salary>1000").show()
Dataset扩展了 DataFrame API,提供了编译时类型检查,面向对象风格的 API。
Dataset 可以和 DataFrame、RDD 相互转换。DataFrame=Dataset[Row],可见 DataFrame 是一种特殊的 Dataset。
这里小编要重点强调一下二者的区别,但是在学习spark-sql的时候就对二者的关系不太清楚,而且在面试的时候也问到了这个问题,真的是一番血泪史啊。
通过查看多个前辈对二者的总结我大概的总结一下二者的区别:
- Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row
- DataSet可以在编译时检查类型,而DataFrame只有在正真运行的时候才会检查
- DataFrame每一行的类型都是Row,不解析我们就无法知晓其中有哪些字段,每个字段又是什么类型。我们只能通过getAs[类型]或者row(i)的方式来获取特定的字段内容(超级大弊端);而dataSet每一行的类型是不一定的,在自定义了case class之后就可以很自由的获取每一行的信息。
好了 废话说了一堆,不如直接上代码:
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
conf.setMaster("local[2]")
.setAppName("SparkSqlTest")
//设置spark的序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//将自定义的对象,加入序列化器中
.registerKryoClasses(Array(classOf[Person]))
//构建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.config(conf).getOrCreate()
//创建sparkContext对象
val sc: SparkContext = spark.sparkContext
val list = List(
new Person("委xx", 18),
new Person("吴xx", 20),
new Person("戚xx", 30),
new Person("王xx", 40),
new Person("薛xx", 18)
)
//创建DataFrame
//构建元数据
val schema = StructType(List(
StructField("name", DataTypes.StringType),
StructField("age", DataTypes.IntegerType)
))
//构建RDD
val listRDD: RDD[Person] = sc.makeRDD(list)
val RowRDD: RDD[Row] = listRDD.map(field => {
Row(field.name, field.age)
})
val perDF: DataFrame = spark.createDataFrame(RowRDD,schema)
//创建DataSet
import spark.implicits._ //这句话一定要加
val perDS: Dataset[Person] = perDF.as[Person]
/**
* 这里主要介绍DF 和 DS的区别
*/
perDF.foreach(field=>{
val name=field.get(0) //根据元素的index,取出相应的元素的值
val age=field.getInt(1) //根据元素的index和元素的类型取出元素的值
field.getAs[Int]("age") //根据元素的类型和元素的名称取出元素的值
println(s"${age},${name}")
})
perDS.foreach(field=>{
//直接根据上面定义的元素的名称取值
val age=field.age
val name=field.name
println(s"${age},${name}")
})
}
}
case class Person(name: String, age: Int)
个人感觉,就是DataFrame虽然集成和很多优点,但是,如果想从DataFrame中取出具体的某个对象的某个属性,是不能确定的,步骤比较繁琐,而且类型不确定。但是使用DataSet则有效额的避免了所有的问题。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。