Spark Streaming资源动态分配
Spark Streaming动态控制消费速率
Spark默认情况下粗粒度的,先分配好资源再计算。而Spark Streaming有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming资源动态申请
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && //参数配置是否开启资源动态分配_conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
_executorAllocationManager =
if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else { None
}
_executorAllocationManager.foreach(_.start())
ExecutorAllocationManager: 有定时器会不断的去扫描Executor的情况,正在运行的Stage,要运行在不同的Executor中,要么增加Executor或者减少。
ExecutorAllocationManager中schedule方法会被周期性触发进行资源动态调整。
/** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */private def schedule(): Unit = synchronized { val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime if (expired) {
initializing = false
removeExecutor(executorId)
} !expired
}
}
在ExecutorAllocationManager中会在线程池中定时器会不断的运行schedule.
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = {
listenerBus.addListener(listener) val scheduleTask = new Runnable() { override def run(): Unit = { try {
schedule()
} catch { case ct: ControlThrowable =>
throw ct case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}// intervalMillis定时器触发时间
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。