小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
//map读入的键
package hgs.combinefileinputformat.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineFileKey implements WritableComparable<CombineFileKey> {
private String fileName;
private long offset;
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
@Override
public void readFields(DataInput input) throws IOException {
this.fileName = Text.readString(input);
this.offset = input.readLong();
}
@Override
public void write(DataOutput output) throws IOException {
Text.writeString(output, fileName);
output.writeLong(offset);
}
@Override
public int compareTo(CombineFileKey obj) {
int f = this.fileName.compareTo(obj.fileName);
if(f==0)
return (int)Math.signum((double)(this.offset-obj.offset));
return f;
}
@Override
public int hashCode() {
//摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
final int prime = 31;
int result = 1;
result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
result = prime * result + (int) (offset ^ (offset >>> 32));
return result;
}
@Override
public boolean equals(Object o) {
if(o instanceof CombineFileKey)
return this.compareTo((CombineFileKey)o)==0;
return false;
}
}
package hgs.combinefileinputformat.test;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
public class CombineFileReader extends RecordReader<CombineFileKey, Text>{
private long startOffset; //offset of the chunk;
private long end; //end of the chunk;
private long position; // current pos
private FileSystem fs;
private Path path;
private CombineFileKey key;
private Text value;
private FSDataInputStream input;
private LineReader reader;
public CombineFileReader(CombineFileSplit split,TaskAttemptContext context ,
Integer index) throws IOException {
//初始化path fs startOffset end
this.path = split.getPath(index);
this.fs = this.path.getFileSystem(context.getConfiguration());
this.startOffset = split.getOffset(index);
this.end = split.getLength()+this.startOffset;
//判断现在开始的位置是否在一行的内部
boolean skipFirstLine = false;
//open the file
this.input = fs.open(this.path);
//不等于0说明读取位置在一行的内部
if(this.startOffset !=0 ){
skipFirstLine = true;
--(this.startOffset);
//定位到开始读取的位置
this.input.seek(this.startOffset);
}
//初始化reader
this.reader = new LineReader(input);
if(skipFirstLine){ // skip first line and re-establish "startOffset".
//这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行
//将其实位置调整到一行的开始,这样的话会舍弃部分数据
this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min
((long)Integer.MAX_VALUE, this.end - this.startOffset));
}
this.position = this.startOffset;
}
@Override
public void close() throws IOException {}
@Override
public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {}
//返回当前的key
@Override
public CombineFileKey getCurrentKey() throws IOException, InterruptedException {
return key;
}
//返回当前的value
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
//执行的进度
@Override
public float getProgress() throws IOException, InterruptedException {
//返回的类型为float
if(this.startOffset==this.end){
return 0.0f;
}else{
return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset));
}
}
//该方法判断是否有下一个key value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//对key和value初始化
if(this.key == null){
this.key = new CombineFileKey();
this.key.setFileName(this.path.getName());
}
this.key.setOffset(this.position);
if(this.value == null){
this.value = new Text();
}
//读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成
int newSize = 0;
if(this.position<this.end){
newSize = reader.readLine(this.value);
position += newSize;
}
//没有数据,将key value置位空
if(newSize == 0){
this.key = null;
this.value = null;
return false;
}else{
return true;
}
}
}
package hgs.combinefileinputformat.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class CustCombineInputFormat extends CombineFileInputFormat<CombineFileKey, Text> {
public CustCombineInputFormat(){
super();
//最大切片大小
this.setMaxSplitSize(67108864);//64 MB
}
@Override
public RecordReader<CombineFileKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<CombineFileKey, Text>((CombineFileSplit)split,context,CombineFileReader.class);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
}
//驱动类
package hgs.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import hgs.combinefileinputformat.test.CustCombineInputFormat;
public class LetterCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//conf.set("mapreduce.map.log.level", "INFO");
///conf.set("mapreduce.reduce.log.level", "INFO");
Job job = Job.getInstance(conf, "LetterCount");
job.setJarByClass(hgs.test.LetterCountDriver.class);
// TODO: specify a mapper
job.setMapperClass(LetterCountMapper.class);
// TODO: specify a reducer
job.setReducerClass(LetterReducer.class);
// TODO: specify output types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
if(args[0].equals("1"))
job.setInputFormatClass(CustCombineInputFormat.class);
else{}
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job, new Path("/words"));
FileOutputFormat.setOutputPath(job, new Path("/result"));
if (!job.waitForCompletion(true))
return;
}
}
hdfs文件:
运行结果:不使用自定义的:CustCombineInputFormat
运行结果:在使用自定义的:CustCombineInputFormat
以上是“hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2217548/