怎么实现SparkStreaming转化操作,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
DStream的转化操作分为无状态 和有状态 两种
在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。
有状态转化操作需要使用之前批次的数据或者中间结果来计算当前批次的数据,有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转换操作。
无状态转化操作的实质就说把简单的RDD转化操作应用到每个批次上,也就是转化DStream的每一个RDD
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream 的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也 就是对 DStream 中的 RDD 应用转换。
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
val sc: StreamingContext = new StreamingContext(conf, Seconds(3))
val lines = sc.socketTextStream("localhost", 9999)
// transform方法可以将底层RDD获取到后进行操作
// 1. DStream功能不完善
// 2. 需要代码周期性的执行
// Code : Driver端
val newDS: DStream[String] = lines.transform(
rdd => {
// Code : Driver端,(周期性执行)
rdd.map(
str => {
// Code : Executor端
str
}
)
}
)
// Code : Driver端
val newDS1: DStream[String] = lines.map(
data => {
// Code : Executor端
data
}
)
sc.start()
sc.awaitTermination()
}
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val data9999 = ssc.socketTextStream("localhost", 9999)
val data8888 = ssc.socketTextStream("localhost", 8888)
val map9999: DStream[(String, Int)] = data9999.map((_,9))
val map8888: DStream[(String, Int)] = data8888.map((_,8))
// 所谓的DStream的Join操作,其实就是两个RDD的join
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()
ssc.start()
ssc.awaitTermination()
}
有状态转化操作是跨时间区间跟踪数据的操作,也就是说,一些先前批次的数据也被用来在新的批次中用于计算结果。有状态转换的主要的两种类型:
滑动窗口:以一个时间阶段为滑动窗口进行操作
updateStateByKey():通过key值来跟踪数据的状态变化
有状态转化操作需要在StreamingContext中打开检查点机制来提高容错
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey")
val sc: StreamingContext = new StreamingContext(conf, Seconds(4))
sc.checkpoint("cp")
val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999)
val value: DStream[(String, Int)] = ds.map(((_: String), 1))
// updateStateByKey:根据key对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值表示相同的key的value数据的集合
// 第二个值表示缓存区key对应的计算值
val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
val newCount: Int = option.getOrElse(0) + seq.sum
Option(newCount)
})
state.print()
sc.start()
sc.awaitTermination()
}
所有基于窗口的函数都需要两个参数,分别对应窗口时长和滑动步长,并且两者都必须是SparkStreaming的批次间隔的整数倍。
窗口时长控制的是每次用来计算的批次的个数
滑动步长用于控制对新的DStream进行计算的间隔
基于window进行窗口内元素计数操作
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))
val wordToCount = windowDS.reduceByKey(_+_)
wordToCount.print()
ssc.start()
ssc.awaitTermination()
}
有逆操作规约是一种更高效的规约操作,通过只考虑新进入窗口的元素和离开窗口的元素,让spark增量计算归约的结果,其在代码上的体现就是reduceFunc 和 invReduceFunc
普通归约操作
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("cp")
val lines = ssc.socketTextStream("localhost", 9999)
lines.reduceByWindow(
(x: String, y: String) => {
x + "-" + y
},
Seconds(9), Seconds(3)
).print()
ssc.start()
ssc.awaitTermination()
}
有逆归约操作
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("cp")
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
/**
* 基于窗口进行有逆归约:通过控制窗口流出和进入的元素来提高性能
*/
val windowDS: DStream[(String, Int)] =
wordToOne.reduceByKeyAndWindow(
(x:Int, y:Int) => { x + y},
(x:Int, y:Int) => {x - y},
Seconds(9), Seconds(3))
windowDS.print()
ssc.start()
ssc.awaitTermination()
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("cp")
val lines = ssc.socketTextStream("localhost", 9999)
/**
* 统计窗口中输入数据的个数
* 比如 3s内输入了10条数据,则打印10
*/
val countByWindow: DStream[Long] = lines.countByWindow(
Seconds(9), Seconds(3)
)
countByWindow.print()
/**
* 统计窗口中每个值的个数
* 比如 3s内输入了1个3 2个4 3个5,则打印(3,1)(2,4)(3,5)
*/
val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow(
Seconds(9), Seconds(3)
)
countByValueAndWindow.print()
ssc.start()
ssc.awaitTermination()
}
看完上述内容,你们掌握怎么实现SparkStreaming转化操作的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4930910/blog/4952664