温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Hadoop中如何分区

发布时间:2021-12-09 15:44:33 来源:亿速云 阅读:166 作者:小新 栏目:云计算

小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

package partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class KpiApp {
	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";
	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		existsFile(conf);
		Job job = new Job(conf, KpiApp.class.getName());
		//打成Jar在Linux运行
		job.setJarByClass(KpiApp.class);
		
		//1.1
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//1.3 自定义分区
		job.setPartitionerClass(KpiPartition.class);
		job.setNumReduceTasks(2);
		
		//1.4 排序分组
		//1.5 聚合
		
		//2.1
		
		//2.2
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWritable.class);
		
		//2.3
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);
		
		job.waitForCompletion(true);
	}
	private static void existsFile(Configuration conf) throws IOException,
			URISyntaxException {
		FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
		if(fs.exists(new Path(OUTPUT_PATH))){
			fs.delete(new Path(OUTPUT_PATH), true);
		}
	}
	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String string = value.toString();
			String[] split = string.split("\t");
			String phone = split[1];
			Text key2 = new Text();
			key2.set(phone);
			
			KpiWritable v2= new KpiWritable();
			v2.set(split[6],split[7],split[8],split[9]);
			context.write(key2, v2);
		}
		
	}
	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{

		@Override
		protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)
				throws IOException, InterruptedException {
				long upPackNum = 0L;
				long downPackNum = 0L;
				long upPayLoad = 0L;
				long downPayLoad = 0L;
				for(KpiWritable writable : values){
					upPackNum += writable.upPackNum;
					downPackNum += writable.downPackNum;
					upPayLoad += writable.upPayLoad;
					downPayLoad += writable.downPayLoad;
				}
				KpiWritable value3 = new KpiWritable();
				value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));
				context.write(key2, value3);
		}
	}
}
class KpiWritable implements Writable{
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.upPackNum);
		out.writeLong(this.downPackNum);
		out.writeLong(this.upPayLoad);
		out.writeLong(this.downPayLoad);
	}

	public void set(String string, String string2, String string3,
			String string4) {
		this.upPackNum = Long.parseLong(string);
		this.downPackNum = Long.parseLong(string2);
		this.upPayLoad = Long.parseLong(string3);
		this.downPayLoad = Long.parseLong(string4);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public String toString() {
		return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
	}
}
class KpiPartition extends Partitioner<Text, KpiWritable>{

	@Override
	public int getPartition(Text key, KpiWritable value, int numPartitions) {
		String string = key.toString();
		return string.length()==11?0:1;
	}
}

  Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。

  HashPartitioner是MapReduce的默认Partitioner。

以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI