用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:
class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])
val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M
val SPLIT_SLOP:Double = 1.1; // 10% slop
val NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles";
@throws[IOException]
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader
override protected def getFormatMinSplitSize: Long = 2000L
@throws[IOException]
override protected def listStatus(job: JobContext): List[FileStatus] = {
val files: List[FileStatus] = super.listStatus(job)
val len: Int = files.size
var j: Int = 0
for (i<-0 to len-1){
val f = files.get(i)
if(f.isDirectory){
val pth:Path = f.getPath
val fs: FileSystem = pth.getFileSystem(job.getConfiguration)
files.set(i,fs.getFileStatus(new Path(pth, "data")))
}
if((files.get(i)).getLen() != 0L) {
files.set(j, files.get(i))
j+=1
}
}
files.subList(0, j)
}
@throws[IOException]
override def getSplits(job: JobContext): List[InputSplit] = {
val sw :Stopwatch= new Stopwatch().start();
val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job));
val maxSize :Long= FileInputFormat.getMaxSplitSize(job);
// generate splits
val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
val files: List[FileStatus] = listStatus(job)
for ( i<- 0 to files.size()-1) {
val file = files.get(i)
val path:Path = file.getPath();
val length:Long = file.getLen();
if (length != 0) {
var blkLocations: Array[BlockLocation] = null
if (file.isInstanceOf[LocatedFileStatus] ) {
blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations()
} else {
val fs:FileSystem = path.getFileSystem(job.getConfiguration())
blkLocations = fs.getFileBlockLocations(file, 0, length)
}
if (isSplitable(job, path)) {
// val blockSize:Long = file.getBlockSize()
val blockSize:Long = sequenceFileBlockSize
val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize)
var bytesRemaining:Long = length;
while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) {
val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations(blkIndex).getHosts(),
blkLocations(blkIndex).getCachedHosts()))
bytesRemaining -= splitSize
}
if (bytesRemaining != 0) {
val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations(blkIndex).getHosts(),
blkLocations(blkIndex).getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(),
blkLocations(0).getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new Array[String](0)))
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size())
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis())
}
return splits
}
}
sequenceFileBlockSize 改成自己想要的大小
使用:
val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
function(new String(x._2.getBytes))
})
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。