Spark组件Spark SQL的实例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:
type DataFrame = Dataset[Row]。
以加载json和mysql为例:
val ds = sparkSession.read.json("/路径/people.json")val ds = sparkSession.read.format("jdbc").options(Map("url" -> "jdbc:mysql://ip:port/db","driver" -> "com.mysql.jdbc.Driver","dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()
1.定义一个case class,利用反射机制来推断1) 从HDFS中加载文件为普通RDDval lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))2) 定义case class(相当于表的schema)case class Person(id:Int, name:String, age:Int)3) 将RDD和case class关联val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))4) 将RDD转换成DataFrameval ds= personRDD.toDF2.手动定义一个schema StructType,直接指定在RDD上val schemaString ="name age"val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))val ds = sparkSession.createDataFrame(rowRdd,schema)
注意:直接使用col方法需要import org.apache.spark.sql.functions._
//查询年龄最大的前两名
首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:
在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:
Class.forName("org.apache.hive.jdbc.HiveDriver")val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");try { val stat = conn.createStatement() val res = stat.executeQuery("select * from people limit 1") while (res.next()) { println(res.getString("name")) }} catch { case e: Exception => e.printStackTrace()} finally{ if(conn!=null) conn.close()}
Spark SQL 获取Hive数据
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:
val spark = SparkSession.builder()
val udf_str_length = udf{(str:String) => str.length}spark.udf.register("str_length",udf_str_length)val ds =sparkSession.read.json("路径/people.json")ds.createOrReplaceTempView("people")sparkSession.sql("select str_length(address) from people")
import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.expressions.MutableAggregationBufferimport org.apache.spark.sql.expressions.UserDefinedAggregateFunctionimport org.apache.spark.sql.types._object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage", MyAverage)val df = spark.read.json("examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees")df.show()val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")result.show()
Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble}val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]ds.show()// Convert the function to a `TypedColumn` and give it a nameval averageSalary = MyAverage.toColumn.name("average_salary")val result = ds.select(averageSalary)result.show()
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/bigdatalearnshare/blog/4836610