代码如下:
package com.dt.spark.streaming
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}
/**
* 使用SparkStreaming结合SparkSQL对日志进行分析。
* 假设电商网站点击日志格式(简化)如下:
* userid,itemId,clickTime
* 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中
* Created by dinglq on 2016/5/4.
*/
object LogAnalyzerStreamingSQL {
val WINDOW_LENGTH = new Duration(600 * 1000)
val SLIDE_INTERVAL = new Duration(10 * 1000)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
//从数据库中加载itemInfo表
val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
"url"-> "jdbc:mysql://spark-master:3306/spark",
"driver"->"com.mysql.jdbc.Driver",
"dbtable"->"iteminfo",
"user"->"root",
"password"-> "vincent"
)).load()
itemInfoDF.registerTempTable("itemInfo")
val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")
val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()
val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
windowDStream.foreachRDD(accessLogs => {
if (accessLogs.isEmpty()) {
println("No logs received in this time interval")
} else {
accessLogs.toDF().registerTempTable("accessLogs")
val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
" (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
" ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)
// Persist top ten table for this window to HDFS as parquet file
topTenClickItemLast10Minus.show()
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
case class AccessLog(userId: String, itemId: String, clickTime: String) {
}
object AccessLog {
def parseLogLine(log: String): AccessLog = {
val logInfo = log.split(",")
if (logInfo.length == 3) {
AccessLog(logInfo(0),logInfo(1), logInfo(2))
}
else {
AccessLog("0","0","0")
}
}
}
MySQL中表的内容如下:
mysql> select * from spark.iteminfo;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001 | phone |
| 002 | computer |
| 003 | TV |
+--------+----------+
3 rows in set (0.00 sec)
在D创建目录logs_incoming
运行Spark Streaming 程序。
新建文件,内容如下:
0001,001,2016-05-04 22:10:20
0002,001,2016-05-04 22:10:21
0003,001,2016-05-04 22:10:22
0004,002,2016-05-04 22:10:23
0005,002,2016-05-04 22:10:24
0006,001,2016-05-04 22:10:25
0007,002,2016-05-04 22:10:26
0008,001,2016-05-04 22:10:27
0009,003,2016-05-04 22:10:28
0010,003,2016-05-04 22:10:29
0011,001,2016-05-04 22:10:30
0012,003,2016-05-04 22:10:31
0013,003,2016-05-04 22:10:32
将文件保存到目录logs_incoming 中,观察Spark程序的输出:
+------+--------+---+
|itemid|itemname|cnt|
+------+--------+---+
| 001| phone| 6|
| 003| TV| 4|
| 002|computer| 3|
+------+--------+---+
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。