用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:
class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] { private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]]) val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M val SPLIT_SLOP:Double = 1.1; // 10% slop val NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles"; @throws[IOException] def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader override protected def getFormatMinSplitSize: Long = 2000L @throws[IOException] override protected def listStatus(job: JobContext): List[FileStatus] = { val files: List[FileStatus] = super.listStatus(job) val len: Int = files.size var j: Int = 0 for (i<-0 to len-1){ val f = files.get(i) if(f.isDirectory){ val pth:Path = f.getPath val fs: FileSystem = pth.getFileSystem(job.getConfiguration) files.set(i,fs.getFileStatus(new Path(pth, "data"))) } if((files.get(i)).getLen() != 0L) { files.set(j, files.get(i)) j+=1 } } files.subList(0, j) } @throws[IOException] override def getSplits(job: JobContext): List[InputSplit] = { val sw :Stopwatch= new Stopwatch().start(); val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job)); val maxSize :Long= FileInputFormat.getMaxSplitSize(job); // generate splits val splits: ArrayList[InputSplit] = new ArrayList[InputSplit] val files: List[FileStatus] = listStatus(job) for ( i<- 0 to files.size()-1) { val file = files.get(i) val path:Path = file.getPath(); val length:Long = file.getLen(); if (length != 0) { var blkLocations: Array[BlockLocation] = null if (file.isInstanceOf[LocatedFileStatus] ) { blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations() } else { val fs:FileSystem = path.getFileSystem(job.getConfiguration()) blkLocations = fs.getFileBlockLocations(file, 0, length) } if (isSplitable(job, path)) { // val blockSize:Long = file.getBlockSize() val blockSize:Long = sequenceFileBlockSize val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize) var bytesRemaining:Long = length; while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) { val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining) splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations(blkIndex).getHosts(), blkLocations(blkIndex).getCachedHosts())) bytesRemaining -= splitSize } if (bytesRemaining != 0) { val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining) splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations(blkIndex).getHosts(), blkLocations(blkIndex).getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(), blkLocations(0).getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new Array[String](0))) } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()) sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()) } return splits } }
sequenceFileBlockSize 改成自己想要的大小
使用:
val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{ function(new String(x._2.getBytes)) })
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。