这篇文章给大家分享的是有关sparl sql有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
1、读取json格式的文件创建DataFrame
java (spark1.6)
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("javaSpark01");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// Dataset<Row> df = sqlContext.read().format("json").load("G:/idea/scala/spark02/json");
Dataset<Row> df2 = sqlContext.read().json("G:/idea/scala/spark02/json");
df2.show();
//树形的形式显示schema信息
df2.printSchema();
//注册临时表 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
df2.registerTempTable("baidukt_table");
Dataset<Row> sql = sqlContext.sql("select * from baidukt_table");
sql.show();
Dataset<Row> sql1 = sqlContext.sql("select age,count(1) from baidukt_table group by age");
sql1.show();
}
scala(spark 1.6)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("Spark08 1.6")
val sc = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
val df = sqlContext.read.format("json").load("G:/idea/scala/spark02/json")
// val df1 = sqlContext.read.json("G:/idea/scala/spark02/json")
//显示前50行数据
df.show(50)
//树形的形式显示schema信息
df.printSchema()
//注册临时表
df.registerTempTable("baidukt_com_table")
val result = sqlContext.sql("select age,count(1) from baidukt_com_table group by age")
result.show()
val result1 = sqlContext.sql("select * from baidukt_com_table")
result1.show()
sc.stop()
}
java (spark 2.0++)
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("Spark 2.0 ++");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> df = spark.read().json("G:/idea/scala/spark02/json");
// Dataset<Row> df1 = spark.read().format("json").load("G:/idea/scala/spark02/json");
df.show();
df.printSchema();
df.createOrReplaceGlobalTempView("baidu_com_spark2");
Dataset<Row> resut = spark.sql("select * from baidu_com_spark2");
resut.show();
spark.stop();
}
scala(spark 2.0++)
def main(args: Array[String]): Unit = {
//用户的当前工作目录
// val location = System.setProperties("user.dir","spark_2.0"
val conf = new SparkConf().setAppName("Spark08 2.0++").setMaster("local[3]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//数据导入方式
val df: DataFrame = spark.read.json("G:/idea/scala/spark02/json")
// val df1: DataFrame = spark.read.format("json").load("G:/idea/scala/spark02/json")
//查看表
df.show()
//查看表
df.printSchema()
//直接使用spark SQL进行查询
//先注册为临时表
//createOrReplaceTempView:创建临时视图,此视图的生命周期与用于创建此数据集的[SparkSession]相关联。
//createGlobalTempView:创建全局临时视图,此时图的生命周期与Spark Application绑定。
df.createOrReplaceTempView("people")
val result: DataFrame = spark.sql("select * from people")
result.show()
spark.stop()
}
2、通过json格式的RDD创建DataFrame
java(spark 1.6)
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> data = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));
Dataset<Row> df = sqlContext.read().json(data);
df.show();
df.printSchema();
df.createOrReplaceTempView("baidu_com_spark2");
Dataset<Row> resut = sqlContext.sql("select * from baidu_com_spark2");
resut.show();
sc.stop();
}
scala(spark 1.6)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("spark10")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val data: RDD[String] = sc.parallelize(Array(
"{\"name\":\"zhangsan\",\"age\":18}",
"{\"name\":\"lisi\",\"age\":19}",
"{\"name\":\"wangwu\",\"age\":20}"
))
val df = sqlContext.read.json(data)
df.show()
df.printSchema()
df.createOrReplaceTempView("baidukt_com_spark1.6")
val result = sqlContext.sql("select * from baidukt_com_spark1.6")
result.show()
result.printSchema()
sc.stop()
}
3、非json格式的RDD创建DataFrame
3.1 通过反射的方式将非json格式的RDD转换成DataFrame(不推荐,所以不复制代码过来了)
3.2、态创建Schema将非json格式的RDD转换成DataFrame
4、读取parquet文件创建DataFrame(多次io 不推荐)
5、读取JDBC中的数据创建DataFrame(MySql为例)
java(spark 1.6)
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
/**
* 第一种方式读取MySql数据库表,加载为DataFrame
*/
Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://localhost:3306/spark");//连接地址和数据库名称
options.put("driver", "com.mysql.jdbc.Driver");//驱动
options.put("user", "root");//用户名
options.put("password", "admin");//密码
options.put("dbtable", "person");//表
Dataset<Row> person = sqlContext.read().format("jdbc").options(options).load();
person.show();
//注册临时表
person.registerTempTable("person");
/**
* 第二种方式读取MySql数据表加载为DataFrame
*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://localhost:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "admin");
reader.option("dbtable", "score");
Dataset<Row> score = reader.load();
score.show();
score.registerTempTable("score");
Dataset<Row> result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
result.show();
/**
* 将DataFrame结果保存到Mysql中
*/
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "admin");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties);
sc.stop();
}
scala (spark 1.6)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
* 第一种方式读取Mysql数据库表创建DF
*/
val options = new mutable.HashMap[String,String]();
options.put("url", "jdbc:mysql://localhost:3306/spark")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password", "admin")
options.put("dbtable","person")
val person = sqlContext.read.format("jdbc").options(options).load()
person.show()
person.registerTempTable("person")
/**
* 第二种方式读取Mysql数据库表创建DF
*/
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://localhost:3306/spark")
reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","admin")
reader.option("dbtable", "score")
val score = reader.load()
score.show()
score.registerTempTable("score")
val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
result.show()
/**
* 将数据写入到Mysql表中
*/
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "admin")
result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)
sc.stop()
}
6、读取Hive中的数据加载成DataFrame
HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
java(spark 1.6)
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
//HiveContext是SQLContext的子类。
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("USE spark");
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
//在hive中创建student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores INTO TABLE student_scores");
/**
* 查询表生成DataFrame
*/
Dataset<Row> goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.registerTempTable("goodstudent");
Dataset<Row> result = hiveContext.sql("select * from goodstudent");
result.show();
result.printSchema();
/**
* 将结果保存到hive表 good_student_infos
*/
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();
for(Row goodStudentRow : goodStudentRows) {
System.out.println(goodStudentRow);
}
sc.stop();
}
scala(spark 1.6)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("HiveSource")
val sc = new SparkContext(conf)
/**
* HiveContext是SQLContext的子类。
*/
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark")
hiveContext.sql("drop table if exists student_infos")
hiveContext.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
hiveContext.sql("drop table if exists student_scores")
hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")
val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
hiveContext.sql("drop table if exists good_student_infos")
/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
sc.stop()
}
7、自定义udf
scala(spark 1.6)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("spark13")
val spark = SparkSession.builder().config(conf).getOrCreate()
//rdd转df
val rdd: RDD[String] = spark.sparkContext.parallelize(Array("zhangsan","wangwu","masi"))
val rowRDD: RDD[Row] = rdd.map(RowFactory.create(_))
val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))
val df: DataFrame = spark.sqlContext.createDataFrame(rowRDD,schema)
df.show(50)
df.printSchema()
df.createOrReplaceTempView("test")
//自定义udf函数 函数名为StrLen,参数为String、Int String有问题,网上说需要java.lang.String类型
// spark.sqlContext.udf.register("StrLen",(s:String,i:Int)=>{s.length+i})
// spark.sqlContext.udf.register("StrLen",(i:Int)=>{i})
// spark.sql("select name ,StrLen(name,10) as length from test").show(20)
spark.sql("select name ,StrLen(10) as length from test").show(20)
}
java (spark 1.6)
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
* 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx
*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String t1) throws Exception {
return t1.length();
}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();
sc.stop();
}
感谢各位的阅读!关于“sparl sql有哪些”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/3962987/blog/3081420