小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package partition;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class KpiApp {
public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
public static void main(String[] args)throws Exception {
Configuration conf = new Configuration();
existsFile(conf);
Job job = new Job(conf, KpiApp.class.getName());
//打成Jar在Linux运行
job.setJarByClass(KpiApp.class);
//1.1
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
//1.2
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KpiWritable.class);
//1.3 自定义分区
job.setPartitionerClass(KpiPartition.class);
job.setNumReduceTasks(2);
//1.4 排序分组
//1.5 聚合
//2.1
//2.2
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(KpiWritable.class);
//2.3
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
}
private static void existsFile(Configuration conf) throws IOException,
URISyntaxException {
FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH))){
fs.delete(new Path(OUTPUT_PATH), true);
}
}
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String string = value.toString();
String[] split = string.split("\t");
String phone = split[1];
Text key2 = new Text();
key2.set(phone);
KpiWritable v2= new KpiWritable();
v2.set(split[6],split[7],split[8],split[9]);
context.write(key2, v2);
}
}
static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{
@Override
protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for(KpiWritable writable : values){
upPackNum += writable.upPackNum;
downPackNum += writable.downPackNum;
upPayLoad += writable.upPayLoad;
downPayLoad += writable.downPayLoad;
}
KpiWritable value3 = new KpiWritable();
value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
context.write(key2, value3);
}
}
}
class KpiWritable implements Writable{
long upPackNum;
long downPackNum;
long upPayLoad;
long downPayLoad;
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.upPackNum);
out.writeLong(this.downPackNum);
out.writeLong(this.upPayLoad);
out.writeLong(this.downPayLoad);
}
public void set(String string, String string2, String string3,
String string4) {
this.upPackNum = Long.parseLong(string);
this.downPackNum = Long.parseLong(string2);
this.upPayLoad = Long.parseLong(string3);
this.downPayLoad = Long.parseLong(string4);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public String toString() {
return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
}
}
class KpiPartition extends Partitioner<Text, KpiWritable>{
@Override
public int getPartition(Text key, KpiWritable value, int numPartitions) {
String string = key.toString();
return string.length()==11?0:1;
}
}
Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。
HashPartitioner是MapReduce的默认Partitioner。
以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/Xiao629/blog/205175