温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark sql在scala中使用的方式有哪些

发布时间:2021-12-09 09:22:33 来源:亿速云 阅读:124 作者:iii 栏目:开发技术

这篇文章主要介绍“spark sql在scala中使用的方式有哪些”,在日常操作中,相信很多人在spark sql在scala中使用的方式有哪些问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spark sql在scala中使用的方式有哪些”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

package hgs.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
//第一种方法创建dataframe
object SqlTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sqltest1").setMaster("local")
    val context = new SparkContext(conf)
    val sqlContext = new SQLContext(context)
    
    val rdd = context.textFile("d:\\person",1)
    val rdd2 = rdd.map(x=>{val t = x.split(" ");person(t(0).toInt,t(1),t(2).toInt)})
    //第一种方法创建dataframe,在这里需要导入隐式转换
    import sqlContext.implicits._      
    val persondf = rdd2.toDF()  
    //这个方法在2.1.0里面被废除
    //persondf.registerTempTable("person")
    //使用该函数代替
    persondf.createOrReplaceTempView("person")
    val result = sqlContext.sql("select * from person order by age desc")
    //打印查询的结果
    result.show()
    //或者将结果保存到文件
    result.write.json("d://personselect")
   
    context.stop()
  }
}
case class person(id:Int,name:String,age:Int)
//第二种方法创建dataframe
//3.1.2.	通过StructType直接指定Schema
object SqlTest2{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sqltest2").setMaster("local")
    val context = new SparkContext(conf)
    val sqlContext = new SQLContext(context)
    
    val rdd = context.textFile("d:\\person",1)
    
    //第一种方法创建dataframe,在这里需要导入隐式转换
    //创建schema,即一个映射关系   
    val personShcema = StructType(
    List(
        //下面为一个列的描述,分别为 列名,数据类型,是否为空
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)
     )    
    )
    
    val rdd2 = rdd.map(x=>{val t = x.split(" ");Row(t(0).toInt,t(1),t(2).toInt)})
    //通过这种方式创建dataframe
    val persondf = sqlContext.createDataFrame(rdd2, personShcema)
    //将dataframe映射为一个临时的表
    persondf.createOrReplaceTempView("person")
    //查询数据展示
    sqlContext.sql("select * from person order by age desc").show()
    context.stop()
/*  查询出的数据
 *  +---+----+---+
    | id|name|age|
    +---+----+---+
    |  1| hgs| 26|
    |  3|  zz| 25|
    |  2|  wd| 24|
    |  4|  cm| 24|
    +---+----+---+
    */
    
  }
}
一些笔记:
checkpoint:
	将rdd中间过程持久化到hdfs上面,如果某个rdd失败,则从hdfs回复,这样代价较小
	sc.setCheckpointDir("hdfs dir or other fs dir "),建议将RDD cache之后再
	checkpoin这样将减少一次运算直接从内存中将RDD进行checkpoin
	但是这样之前依赖的RDD也会被丢弃
RDD Objects构建DAG--->DAGScheduler(TaskSet(每个Task在每个excutor上&&切分stage,并提价stage))
    ------>TaskScheduler(Task&&提交task,)------>Worker	(执行task)
stage:根据依赖关系区分stage,当遇到一个宽依赖(节点之间交换数据)的时候划分一个stage
	其中窄依赖:父RDD的分区数据只传向一个子RDD分区,而宽依赖则是父RDD的分区数据会传向多个子RDD的或者多个分区
	
spark SQL:处理结构化的数据
	DataFrames:与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,
		除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持
		嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层
		的关系操作,比函数式的RDD API要更加友好,门槛更低。由于与R和Pandas的DataFrame类似,
		Spark DataFrame很好地继承了传统单机数据分析的开发体验
	创建DataFrame: 将数据映射为class,RDD.toDF 
	通过sql查询,将df注册为一个表1. df.registerTempTable("test") sqlContext.sql("select * from test").show 
								 2.通过StructType定义:StrutType(List())
hive 3.0.0 与spark
	1.将hive-site.xml hdfs-site.xml  core-site.xml复制到spark的conf文件夹下 ,将mysql驱动放到spark的jars文件夹下面
	2.在hive中的语句在spark-sql中完全适用:
		create table person(id int,name string,age int) row format delimited fields terminated by ' ';
		load data inpath 'hdfs://bigdata00:9000/person' into table person;
		select * from person;
		数据如下:
			1	hgs	26
			2	wd	24
			3	zz	25
			4	cm	24
	3.在spark-sql console交互中会打印很多的INFO级别的信息,很烦人,解决办法是
		在conf文件夹下:
		   mv log4j.properties.template  log4j.properties
			将log4j.rootCategory=INFO, console 修改为log4j.rootCategory=WARN, console

到此,关于“spark sql在scala中使用的方式有哪些”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI