温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何进行DAGScheduler源码解读

发布时间:2022-01-14 16:30:31 来源:亿速云 阅读:164 作者:柒染 栏目:云计算

如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:

如何进行DAGScheduler源码解读

进入其构造函数中:

如何进行DAGScheduler源码解读

如何进行DAGScheduler源码解读

可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。

LiveListenerBus:

如何进行DAGScheduler源码解读

BlockManagerMaster:


通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法

private def initializeEventProcessActor() {  // blocking the thread until supervisor is started, which ensures eventProcessActor is  // not null before any job is submitted  // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null  implicit val timeout = Timeout(30 seconds)  val initEventActorReply =    dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))  eventProcessActor = Await.result(initEventActorReply, timeout.duration).
    asInstanceOf[ActorRef]
}

initializeEventProcessActor()

DAGSchedulerEventProcessActor:

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)  extends Actor with Logging {  override def preStart() {    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always    // valid when the messages arrive    // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候    // eventProcessActor已经准备就绪    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }  /**   * The main event loop of the DAG scheduler.   */  def receive = {    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,        listener, properties)    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }  override def postStop() {    // Cancel any active jobs in postStop hook    dagScheduler.cleanUpAfterSchedulerStop()
  }
}

可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。

关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI