温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

周期性清除Spark Streaming流状态的方法是什么

发布时间:2021-12-16 21:23:22 阅读:190 作者:柒染 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

本篇文章为大家展示了周期性清除Spark Streaming流状态的方法是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:

 val productPvStream = stream.mapPartitions(records => {    var result = new ListBuffer[(String, Int)]      for (record <- records) {        result += Tuple2(record.key(), 1)      }    result.iterator  }).reduceByKey(_ + _).mapWithState(    StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {      val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)      state.update(sum)      (productId, sum)  })).stateSnapshots()

现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。

编写脚本重启Streaming程序

用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`if [ ${cnt} -eq 1 ]; then  pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`  kill -9 ${pid}  sleep 20  cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`  if [ ${cnt} -eq 0 ]; then    nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1  fifi

这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

给StreamingContext设置超时

在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

def msTillTomorrow = {  val now = new Date()  val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)  tomorrow.getTime - now.getTime}

然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {    val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))    ssc.checkpoint(CHECKPOINT_DIR)    // ...处理逻辑...    ssc.start()    ssc.awaitTerminationOrTimeout(msTillTomorrow)    ssc.stop(false, true)    Thread.sleep(BATCH_INTERVAL * 1000)  }

在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。

两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。

上述内容就是周期性清除Spark Streaming流状态的方法是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/wangzhiwubigdata/blog/4589962

AI

开发者交流群×