温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何浅析Hive和Spark SQL读文件时的输入任务划分

发布时间:2021-12-09 17:35:49 来源:亿速云 阅读:321 作者:柒染 栏目:大数据

如何浅析Hive和Spark SQL读文件时的输入任务划分,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

我们就来讲解Hive和Spark SQL是如何切分输入路径的。

Hive

Hive是起步较早的SQL on Hadoop项目,最早也是诞生于Hadoop中,所以输入划分这部分的代码与Hadoop相关度非常高。现在Hive普遍使用的输入格式是CombineHiveInputFormat,它继承于HiveInputFormat,而HiveInputFormat实现了Hadoop的InputFormat接口,其中的getSplits方法用来获取具体的划分结果,划分出的一份输入数据被称为一个“Split”。在执行时,每个Split对应到一个map任务。在划分Split时,首先挑出不能合并到一起的目录——比如开启了事务功能的路径。这些不能合并的目录必须单独处理,剩下的路径交给私有方法getCombineSplits,这样Hive的一个map task最多可以处理多个目录下的文件。在实际操作中,我们一般只要通过set mapred.max.split.size=xx;即可控制文件合并的大小。当一个文件过大时,父类的getSplits也会帮我们完成相应的切分工作。

Spark SQL

Spark的表有两种:DataSource表和Hive表。另外Spark后续版本中DataSource V2也将逐渐流行,目前还在不断发展中,暂时就不在这里讨论。我们知道Spark SQL其实底层是Spark RDD,而RDD执行时,每个map task会处理RDD的一个Partition中的数据(注意这里的Partition是RDD的概念,要和表的Partition进行区分)。因此,Spark SQL作业的任务切分关键在于底层RDD的partition如何切分。

Data Source表

Spark SQL的DataSource表在最终执行的RDD类为FileScanRDD,由FileSourceScanExec创建出来。在创建这种RDD的时候,具体的Partition直接作为参数传给了构造函数,因此划分输入的方法也在DataSourceScanExec.scala文件中。具体分两步:首先把文件划分为PartitionFile,再将较小的PartitionFile进行合并。

第一步部分代码如下:

  if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>val remaining = file.getLen - offsetval size = if (remaining > maxSplitBytes) maxSplitBytes else remainingval hosts = getBlockHosts(blockLocations, offset, size)PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {val hosts = getBlockHosts(blockLocations, 0, file.getLen)Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,0, file.getLen, partitionDeleteDeltas, hosts))
  }

我们可以看出,Spark SQL首先根据文件类型判断单个文件是否能够切割,如果可以则按maxSplitBytes进行切割。如果一个文件剩余部分无法填满maxSplitBytes,也单独作为一个Partition。

第二部分代码如下所示:

  splitFiles.foreach { file =>if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }// Add the given file to the current partition.currentSize += file.length + openCostInBytes
    currentFiles += file
  }

这样我们就可以依次遍历第一步切好的块,再按照maxSplitBytes进行合并。注意合并文件时还需加上打开文件的预估代价openCostInBytes。那么maxSplitBytesopenCostInBytes这两个关键参数怎么来的呢?

  val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum  val bytesPerCore = totalBytes / defaultParallelism  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不难看出,主要是spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes、调度器默认并发度以及所有输入文件实际大小所控制。

Hive表

Spark SQL中的Hive表底层的RDD类为HadoopRDD,由HadoopTableReader类实现。不过这次,具体的Partition划分还是依赖HadoopRDDgetPartitions方法,具体实现如下:

  override def getPartitions: Array[Partition] = {
    ...try {      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }      val array = new Array[Partition](inputSplits.size)      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不难看出,在处理Hive表的时候,Spark SQL把任务划分又交给了Hadoop的InputFormat那一套。不过需要注意的是,并不是所有Hive表都归为这一类,Spark SQL会默认对ORC和Parquet的表进行转化,用自己的Data Source实现OrcFileFormatParquetFileFormat来把这两种表作为Data Source表来处理。

看完上述内容,你们掌握如何浅析Hive和Spark SQL读文件时的输入任务划分的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI