温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

修改SequenceFileInputFormat hdfs blocksize

发布时间:2020-07-04 09:50:55 来源:网络 阅读:579 作者:xiaobin0303 栏目:开发技术

用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:

class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
  private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])

  val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M
  val  SPLIT_SLOP:Double = 1.1;   // 10% slop
  val  NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles";

  @throws[IOException]
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader

  override protected def getFormatMinSplitSize: Long = 2000L

  @throws[IOException]
  override protected def listStatus(job: JobContext): List[FileStatus] = {
    val files: List[FileStatus] = super.listStatus(job)
    val len: Int = files.size
    var j: Int = 0

    for (i<-0 to len-1){
      val f = files.get(i)
      if(f.isDirectory){
        val pth:Path = f.getPath
        val fs: FileSystem = pth.getFileSystem(job.getConfiguration)
        files.set(i,fs.getFileStatus(new Path(pth, "data")))
      }
      if((files.get(i)).getLen() != 0L) {
        files.set(j, files.get(i))
        j+=1
      }
    }

    files.subList(0, j)
  }

  @throws[IOException]
  override def getSplits(job: JobContext): List[InputSplit] = {
    val sw :Stopwatch= new Stopwatch().start();
    val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job));
    val maxSize :Long= FileInputFormat.getMaxSplitSize(job);

    // generate splits
    val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
    val files: List[FileStatus] = listStatus(job)
    for ( i<- 0 to files.size()-1) {
      val file = files.get(i)
       val path:Path = file.getPath();
      val length:Long = file.getLen();
      if (length != 0) {
        var blkLocations: Array[BlockLocation] = null
        if (file.isInstanceOf[LocatedFileStatus] ) {
          blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations()
        } else {
          val fs:FileSystem = path.getFileSystem(job.getConfiguration())
          blkLocations = fs.getFileBlockLocations(file, 0, length)
        }
        if (isSplitable(job, path)) {
//          val blockSize:Long = file.getBlockSize()
          val blockSize:Long = sequenceFileBlockSize
          val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize)

          var bytesRemaining:Long = length;
          while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()))
            bytesRemaining -= splitSize
          }

          if (bytesRemaining != 0) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(),
          blkLocations(0).getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new Array[String](0)))
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size())
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.elapsedMillis())
    }
    return splits
  }


}


sequenceFileBlockSize  改成自己想要的大小


使用:

val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
  function(new String(x._2.getBytes))
})


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI