温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

DAG任务分解和Shuffle RDD怎么使用

发布时间:2021-12-30 10:08:08 来源:亿速云 阅读:161 作者:iii 栏目:云计算

这篇文章主要介绍“DAG任务分解和Shuffle RDD怎么使用”,在日常操作中,相信很多人在DAG任务分解和Shuffle RDD怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”DAG任务分解和Shuffle RDD怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1、DagScheduler分析

DagScheduler功能主要是负责RDD的各个stage的分解和任务提交。Stage分解是从触发任务调度过程的finalStage开始倒推寻找父stage,如果父stage没有提交任务则循环提交缺失的父stage。每个stage有一个父RDD的概念,根据分区数的多少创建多个任务(Task)。

Task的调度实际是通过TaskSchedulerImp来完成的,TaskSchedulerImp里根据环境部署的不同又会使用不同的Backend,比如Yarn集群、独立集群等其Backend是不一样的,这里先有个概念,先不深究Backend。

这里先看看DagScheduler的核心逻辑把。里面首先要研究的一个方法:

def submitMissingTasks(stage: Stage, jobId: Int)

该方法就是提交stage执行,为什么叫这个名称呢?说明这里的stage是需先需要提交执行的,没有其他依赖的stage还未执行了。

submitMissingTasks方法会根据RDD的依赖关系创建两种task,ResultTask和ShuffleMapTask。

一步步来,只看关键代码,因为整体代码太多了不利于理解关键逻辑。

1.1 生成序列化的taskBinary

taskBinaryBytes = stage match {
          case stage: ShuffleMapStage =>
            JavaUtils.bufferToArray(
              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
          case stage: ResultStage =>
            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
        }

taskBinaryBytes待会是要封装成对像分发到远端Executor上执行的,所以必须是可序列化的。

两者最主要区别就是:ShuffleMapStage的入参是依赖的shuffleDep;而ResultStage的入参是函数的定义func。

1.2 生成task

现在有了taskBinaryBytes,下一步就是生成Task了。

val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }

两种Task类型:ShuffleMapTask和ResultTask。这里要主要的是对Task而言,有多少分区(partition)就会生成多少个Task,Task是到分区维度的,而不是到RDD维度的,这个概念一定要明确。

1.3 提交Task

最后一步就是提交任务执行。这里就要用到taskScheduler了,当然了,这里的taskScheduler目前就是指TaskSchedulerImp。

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))

DagScheduler里还有一个方法这里可以提一下,就是:

submitWaitingChildStages(stage)

这个方法是提交等待当前stage执行的等待stage,这样DAG的整个调度过程就完整了。

2、Task执行

两种Task类型:ShuffleMapTask和ResultTask。

2.1 ResultTask

我们先看ResultTask的执行,它相对比较简单,核心方式是runTask,核心代码:

override def runTask(context: TaskContext): U = {   
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    func(context, rdd.iterator(partition, context))
  }

反序列化出来RDD和func,然后执行rdd的iterator方法获取数据集,并在此数据集上执行func函数,要注意实际上这是一次迭代过程而不是多次迭代过程。

2.2 ShuffleMapTask

ShuffleMapTask任务的执行相对复杂些。

核心方法还是runTask,核心代码:

override def runTask(context: TaskContext): MapStatus = {    
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    val rdd = rddAndDep._1
    val dep = rddAndDep._2
    dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
  }

首先反序列化出RDD和依赖项ShuffleDependency。然后用ShuffleWriterProcessor写数据到RDD。

这里的dep其实没太大意义,主要就是来判断是否要进行合并使用的,不影响理解整个shuffle流程,所以我们可以先不要管dep:

dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)

这里的rdd实际就是ShuffleMapTask所要生成的数据集。这句代码到底是什么意思呢? ShuffleWriterProcessor实际上是将数据集写到了BlockManager上去的,先看看ShuffleWriterProcessor的含义。

2.3 ShuffleWriterProcessor

ShuffleWriterProcessor的关键方法的定义先看一下。

def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _],
      partitionId: Int, context: TaskContext,partition: Partition): MapStatus = {
    var writer: ShuffleWriter[Any, Any] = null   
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
        dep.shuffleHandle,
        partitionId,
        context,
        createMetricsReporter(context))
    writer.write(
        rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get    
  }

ShuffleManager实际上就是BlockManager,管理块空间的。

Write是Shuffle写入器,写到BlockManager去;rdd.iterator(partition, context)就是当前Shuffle类型的RDD定义的数据集,dep是rdd计算数据集时依赖的RDD(这里的dep没多大意思先不管)。

这段代码的作用就是将shuffle rdd数据集输出到BlockManager上,在读取RDD的数据时,如果该RDD是shuffle类型,则需要到BlockManager上去读取,这里就是这个作用。

2.4 Shuffle RDD的相关概念

Shuffle类的RDD是指这类RDD的compute方法是依赖于其他RDD的,这里的其他RDD可以是多个。执行shuffle的RDD的计算过程的时候,是将一到多个依赖RDD的迭代器的输出作为数据源迭代器,在此之上执行自己的操作。所以shuffle RDD的compute方法里一定会用到依赖RDD的iterator方法。

可以看看CoGroupedRDD的源码,就能很快的理解shuffle的含义。

到此,关于“DAG任务分解和Shuffle RDD怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI