Spark组件Spark SQL的实例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:
type DataFrame = Dataset[Row]。
val ds ="/路径/people.json")val ds ="jdbc").options(Map("url" -> "jdbc:mysql://ip:port/db","driver" -> "com.mysql.jdbc.Driver","dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()
1.定义一个case class,利用反射机制来推断1) 从HDFS中加载文件为普通RDDval lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))2) 定义case class(相当于表的schema)case class Person(id:Int, name:String, age:Int)3) 将RDD和case class关联val personRDD = => Person(x(0).toInt, x(1), x(2).toInt))4) 将RDD转换成DataFrameval ds= personRDD.toDF2.手动定义一个schema StructType,直接指定在RDD上val schemaString ="name age"val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))val rowRdd =>Row(p(0),p(1)))val ds = sparkSession.createDataFrame(rowRdd,schema)
注意:直接使用col方法需要import org.apache.spark.sql.functions._
首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:
Class.forName("org.apache.hive.jdbc.HiveDriver")val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");try { val stat = conn.createStatement() val res = stat.executeQuery("select * from people limit 1") while ( { println(res.getString("name")) }} catch { case e: Exception => e.printStackTrace()} finally{ if(conn!=null) conn.close()}
Spark SQL 获取Hive数据
val spark = SparkSession.builder()
val udf_str_length = udf{(str:String) => str.length}spark.udf.register("str_length",udf_str_length)val ds"路径/people.json")ds.createOrReplaceTempView("people")sparkSession.sql("select str_length(address) from people")
import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.expressions.MutableAggregationBufferimport org.apache.spark.sql.expressions.UserDefinedAggregateFunctionimport org.apache.spark.sql.types._object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)}// Register the function to access itspark.udf.register("myAverage", MyAverage)val df ="examples/src/main/resources/employees.json")df.createOrReplaceTempView("employees") result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble}val ds ="examples/src/main/resources/employees.json").as[Employee] Convert the function to a `TypedColumn` and give it a nameval averageSalary ="average_salary")val result =
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>