小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class KpiApp { public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat"; public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format"; public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); existsFile(conf); Job job = new Job(conf, KpiApp.class.getName()); //打成Jar在Linux运行 job.setJarByClass(KpiApp.class); //1.1 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //1.3 自定义分区 job.setPartitionerClass(KpiPartition.class); job.setNumReduceTasks(2); //1.4 排序分组 //1.5 聚合 //2.1 //2.2 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(KpiWritable.class); //2.3 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } private static void existsFile(Configuration conf) throws IOException, URISyntaxException { FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf); if(fs.exists(new Path(OUTPUT_PATH))){ fs.delete(new Path(OUTPUT_PATH), true); } } static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] split = string.split("\t"); String phone = split[1]; Text key2 = new Text(); key2.set(phone); KpiWritable v2= new KpiWritable(); v2.set(split[6],split[7],split[8],split[9]); context.write(key2, v2); } } static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{ @Override protected void reduce(Text key2, Iterable<KpiWritable> values,Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for(KpiWritable writable : values){ upPackNum += writable.upPackNum; downPackNum += writable.downPackNum; upPayLoad += writable.upPayLoad; downPayLoad += writable.downPayLoad; } KpiWritable value3 = new KpiWritable(); value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)); context.write(key2, value3); } } } class KpiWritable implements Writable{ long upPackNum; long downPackNum; long upPayLoad; long downPayLoad; @Override public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayLoad); } public void set(String string, String string2, String string3, String string4) { this.upPackNum = Long.parseLong(string); this.downPackNum = Long.parseLong(string2); this.upPayLoad = Long.parseLong(string3); this.downPayLoad = Long.parseLong(string4); } @Override public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } @Override public String toString() { return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad; } } class KpiPartition extends Partitioner<Text, KpiWritable>{ @Override public int getPartition(Text key, KpiWritable value, int numPartitions) { String string = key.toString(); return string.length()==11?0:1; } }
Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。