Spark2.2.0中RDD转DataFrame的方式是怎样的,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
Spark SQL将现有的RDDs转换为数据集。
方法:使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法使代码更加简洁,并且当您在编写Spark应用程序时已经了解了模式时,它可以很好地工作。
第一种方法代码实例java版本实现:
数据准备studentDatatxt
1001,20,zhangsan1002,17,lisi1003,24,wangwu1004,16,zhaogang
本地模式代码实现:
package com.unicom.ljs.spark220.study;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:58 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class RDD2DataFrameReflect { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameReflect"); JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext=new SQLContext(sc); JavaRDD<String> lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\studentData.txt"); JavaRDD<Student2> studentRDD = lines.map(new Function<String, Student2>() { @Override public Student2 call(String line) throws Exception { String[] split = line.split(","); Student2 student=new Student2(); student.setId(Integer.valueOf(split[0])); student.setAge(Integer.valueOf(split[1])); student.setName(split[2]); return student; } }); //使用反射方式将RDD转换成dataFrame //将Student.calss传递进去,其实就是利用反射的方式来创建DataFrame Dataset<Row> dataFrame = sqlContext.createDataFrame(studentRDD, Student2.class); //拿到DataFrame之后将其注册为临时表,然后针对其中的数据执行SQL语句 dataFrame.registerTempTable("studentTable"); //针对student临时表,执行sql语句查询年龄小于18岁的学生, /*DataFrame rowDF */ Dataset<Row> dataset = sqlContext.sql("select * from studentTable where age < 18"); JavaRDD<Row> rowJavaRDD = dataset.toJavaRDD(); JavaRDD<Student2> ageRDD = rowJavaRDD.map(new Function<Row, Student2>() { @Override public Student2 call(Row row) throws Exception { Student2 student = new Student2(); student.setId(row.getInt(0)); student.setAge(row.getInt(1)); student.setName(row.getString(2)); return student; } }); ageRDD.foreach(new VoidFunction<Student2>() { @Override public void call(Student2 student) throws Exception { System.out.println(student.toString()); } }); }}
Student2类:
package com.unicom.ljs.spark220.study;import java.io.Serializable;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-01-20 08:57 * @version: v1.0 * @description: com.unicom.ljs.spark220.study */public class Student2 implements Serializable { int id; int age; String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Student2{" + "id=" + id + ", age=" + age + ", name='" + name + '\'' + '}'; }}
pom.xml关键依赖:
<spark.version>2.2.0</spark.version><scala.version>2.11.8</scala.version>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version></dependency>
看完上述内容,你们掌握Spark2.2.0中RDD转DataFrame的方式是怎样的的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/2380815/blog/4453756