最近项目中使用SparkSQL来做数据的统计分析,闲来就记录下来。
直接上代码:
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object SparkSQL {
//定义两个case class A和B:
// A是用户的基本信息:包括客户号、***号和性别
// B是用户的交易信息:包括客户号、消费金额和消费状态
case class A(custom_id:String,id_code:String,sex:String)
case class B(custom_id:String,money:String,status:Int)
def main(args:Array[String]): Unit = {
//数据量不大时,测试发现使用local[*]的效率要比local和基于YARN的效率都高。
//这里使用local[*]模式,设置AppName为"SparkSQL"
val sc = new SparkContext("local[*]", "SparkSQL")
val sqlContext = new SQLContext(sc)
import sqlContext.createSchemaRDD
//定义两个RDD:A_RDD和B_RDD。数据之间以char(1)char(1)分隔,取出对应的客户信息。
val A_RDD = sc.textFile("hdfs://172.16.30.2:25000/usr/tmpdata/A.dat").map(_.split("\u0001\u0001")).map(t => tbclient(t(0), t(4), t(13)))
val B_RDD = sc.textFile("hdfs://172.16.30.3:25000/usr/tmpdata/B.dat").map(_.split("\u0001\u0001")).map(t=>tbtrans(t(16),t(33),t(71).toInt))
//将普通RDD转为SchemaRDD
A_RDD.registerTempTable("A_RDD")
B_RDD.registerTempTable("B_RDD")
def toInt(s: String): Int = {
try {
s.toInt
} catch {
case e: Exception => 9999
}
}
def myfun2(id_code:String):Int = {
val i = id_code.length
i
}
//定义函数:根据***号判断属相
//这里注意Scala的substring方法的使用,和Java、Oracle等都不同
def myfun5(id_code:String):String = {
var year = ""
if(id_code.length == 18){
val md = toInt(id_code.substring(6,10))
val i = 1900
val years=new Array[String](12)
years(0) = "鼠"
years(1) = "牛"
years(2) = "虎"
years(3) = "兔"
years(4) = "龙"
years(5) = "蛇"
years(6) = "马"
years(7) = "羊"
years(8) = "猴"
years(9) = "鸡"
years(10) = "狗"
years(11) = "猪"
year = years((md-i)%years.length)
}
year
}
//设置年龄段
def myfun3(id_code:String):String = {
var rt = ""
if(id_code.length == 18){
val age = toInt(id_code.substring(6,10))
if(age >= 1910 && age < 1920){
rt = "1910 ~ 1920"
}
else if(age >= 1920 && age < 1930){
rt = "1920 ~ 1930"
}
else if(age >= 1930 && age < 1940){
rt = "1930 ~ 1940"
}
else if(age >= 1940 && age < 1950){
rt = "1940 ~ 1950"
}
else if(age >= 1950 && age < 1960){
rt = "1950 ~ 1960"
}
else if(age >= 1960 && age <1970){
rt = "1960 ~ 1970"
}
else if(age >= 1970 && age <1980){
rt = "1970 ~ 1980"
}
else if(age >= 1980 && age <1990){
rt = "1980 ~ 1990"
}
else if(age >= 1990 && age <2000){
rt = "1990 ~ 2000"
}
else if(age >= 2000 && age <2010){
rt = "2000 ~ 2010"
}
else if(age >= 2010 && age<2014){
rt = "2010以后"
}
}
rt
}
//划分消费金额区间
def myfun4(money:String):String = {
var rt = ""
if(money>="10000" && money<"50000"){
rt = "10000 ~ 50000"
}
else if(money>="50000" && money<"60000"){
rt = "50000 ~ 60000"
}
else if(money>="60000" && money<"70000"){
rt = "60000 ~ 70000"
}
else if(money>="70000" && money<"80000"){
rt = "70000 ~ 80000"
}
else if(money>="80000" && money<"100000"){
rt = "80000 ~ 100000"
}
else if(money>="100000" && money<"150000"){
rt = "100000 ~ 150000"
}
else if(money>="150000" && money<"200000"){
rt = "150000 ~ 200000"
}
else if(money>="200000" && money<"1000000"){
rt = "200000 ~ 1000000"
}
else if(money>="1000000" && money<"10000000"){
rt = "1000000 ~ 10000000"
}
else if(money>="10000000" && money<"50000000"){
rt = "10000000 ~ 50000000"
}
else if(money>="5000000" && money<"100000000"){
rt = "5000000 ~ 100000000"
}
rt
}
//根据生日判断星座
def myfun1(id_code:String):String = {
var rt = ""
if(id_code.length == 18){
val md = toInt(id_code.substring(10,14))
if (md >= 120 && md <= 219){
rt = "水瓶座"
}
else if (md >= 220 && md <= 320){
rt = "双鱼座"
}
else if (md >= 321 && md <= 420){
rt = "白羊座"
}
else if (md >= 421 && md <= 521){
rt = "金牛座"
}
else if (md >= 522 && md <= 621){
rt = "双子座"
}
else if (md >= 622 && md <= 722){
rt = "巨蟹座"
}
else if (md >= 723 && md <= 823){
rt = "狮子座"
}
else if (md >= 824 && md <= 923){
rt = "***座"
}
else if (md >= 924 && md <= 1023){
rt = "天秤座"
}
else if (md >= 1024 && md <= 1122){
rt = "天蝎座"
}
else if (md >= 1123 && md <= 1222){
rt = "射手座"
}
else if ((md >= 1223 && md <= 1231) | (md >= 101 && md <= 119)){
rt = "摩蝎座"
}
else
rt = "无效"
}
rt
}
//注册函数
sqlContext.registerFunction("fun1",(x:String)=>myfun1(x))
sqlContext.registerFunction("fun3",(z:String)=>myfun3(z))
sqlContext.registerFunction("fun4",(m:String)=>myfun4(m))
sqlContext.registerFunction("fun5",(n:String)=>myfun5(n))
//星座统计,注意,这里必须要有fun2(id_code)=18这个限制,否则,第一个字段有这个限制,而第二个统计字段值却没有这个限制
val result1 = sqlContext.sql("select fun1(id_code),count(*) from A_RDD t where fun2(id_code)=18 group by fun1(id_code)")
//属相统计
val result2 = sqlContext.sql("select fun5(a.id_code),count(*) from A_RDD a where fun2(id_code)=18 group by fun5(a.id_code)")
//根据消费区间统计消费人数和总金额
val result3 = sqlContext.sql("select fun4(a.money),count(distinct a.custom_id),SUM(a.money) from B_RDD a where a.status=8 and a.custom_id in (select b.custom_id from A_RDD b where fun2(b.id_code)=18) group by fun4(a.money)")
//打印结果
result3.collect().foreach(println)
//也可以将结果保存到OS/HDFS上
result2.saveAsTextFile("file:///tmp/age")
}
}
在测试result3的时候,发现报错:
Exception in thread "main" java.lang.RuntimeException: [1.101] failure: ``NOT'' expected but `select' found
select fun5(a.id_code),count(*) from A_RDD a where fun2(a.id_code)=18 and a.custom_id IN (select distinct b.custom_id from B_RDD b where b.status=8) group by fun5
(a.id_code)
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:74)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:267)
at SparkSQL$.main(SparkSQL.scala:198)
at SparkSQL.main(SparkSQL.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
目前还在调试阶段,目测可能SparkSQL对条件中子查询的支持做的不是很好(只是猜测)。
如有问题,还望路过的高手不吝赐教。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。