小编给大家分享一下hadoop如何实现x计数器、分区、序列化,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
package com.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * 手机号码 流量[类型1、类型2、类型3] * 13500001234 12,56,78 * 18600001235 32,21,80 * 15800001235 16,33,56 * 13500001234 19,92,73 * 18600001235 53,55,29 * 18600001239 27,77,68 * * 计算得出 * 手机号 类型1汇总 类型2汇总 类型3汇总 */ public class WordCount extends Configured implements Tool { public static class Map extends Mapper<LongWritable, Text, Text, StreamWritable> { //避免每调用一次map就创建一次对象 private final Text phoneNum = new Text(); private final StreamWritable streamWritable = new StreamWritable(); private String firstLine = "#_#"; private String lastLine; public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //获得map输入的第一条记录 if("#_#".equals(firstLine)) { firstLine = key.toString() + "\t" + line; } //获得map输入的最后一条记录 lastLine = key.toString() + "\t" + line; //13500001234手机号码总共在多少行出现【自定义计数器】 Counter helloCounter = (Counter) context.getCounter("Words", "13500001234"); if(line.contains("13500001234")) { helloCounter.increment(1L); } String[] strs = line.split("\t"); //手机号码 phoneNum.set(strs[0]); //流量 String[] stream = strs[1].split(","); streamWritable.set(Long.parseLong(stream[0]), Long.parseLong(stream[1]), Long.parseLong(stream[2])); context.write(phoneNum, streamWritable); } protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,StreamWritable>.Context context) throws IOException ,InterruptedException { //获得map输入的第一条记录 System.out.println(firstLine); //获得map输出的最后一条记录 System.out.println(lastLine); }; } public static class Reduce extends Reducer<Text, StreamWritable, Text, StreamWritable> { //避免每调用一次reduce就创建一次对象 private StreamWritable streamWritable = new StreamWritable(); /* * map函数执行结束后,map输出的<k, v>一共有4个,分别是<hello, 1><you, 1>,<hello, 1>,<me, 1> * 分区,默认只有一个分区 job.setPartitionerClass * 排序 <hello, 1>,<hello, 1>,<me, 1><you, 1> * 分组 把相同key的value放到一个集合中 <hello, {1,1}><me, {1}><you, {1}>,每一组调用一次reduce函数 * 归约(可选) job.setCombinerClass */ public void reduce(Text key, Iterable<StreamWritable> values, Context context) throws IOException, InterruptedException { long stream1 = 0; long stream2 = 0; long stream3 = 0; Iterator<StreamWritable> it = values.iterator(); while(it.hasNext()) { streamWritable = it.next(); stream1 = stream1 + streamWritable.getStream1(); stream2 = stream2 + streamWritable.getStream2(); stream3 = stream3 + streamWritable.getStream3(); } streamWritable.set(stream1, stream2, stream3); context.write(key, streamWritable); } } public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName(WordCount.class.getSimpleName()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //如果没有配置,默认值是1 job.setNumReduceTasks(1); //指定map产生的数据按照什么规则分配到不同的reduce中,如果没有配置,默认是HashPartitioner.class job.setPartitionerClass(MyPartitioner.class); //FileInputFormat.getSplits决定map任务数量,XxxInputFormat.RecordReader处理每一个split,得到map输入的key、value //默认是TextInputFormat job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(StreamWritable.class); //reduce的输出类型一定要设置 job.setOutputKeyClass(Text.class); job.setOutputValueClass(StreamWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { int exit = ToolRunner.run(new WordCount(), args); System.exit(exit); } } //自定义Partitioner class MyPartitioner extends Partitioner<Text, StreamWritable> { @Override //返回值表示,分配到第几个reduce任务中 public int getPartition(Text key, StreamWritable value, int numPartitions) { //13500001234手机号码分到第1个reduce,其余的分到第二个reduce if("13500001234".equals(key.toString())) { return 0; } else { return 1; } } } //自定义序列化类[处理手机流量] //Serializable:Java序列化的信息非常臃肿,比如存在层层类继承的时候,继承关系序列化出去,还需要序列化回来。 //hadoop的Writable轻量很多 class StreamWritable implements Writable { private long stream1; private long stream2; private long stream3; public long getStream1() { return stream1; } public void setStream1(long stream1) { this.stream1 = stream1; } public long getStream2() { return stream2; } public void setStream2(long stream2) { this.stream2 = stream2; } public long getStream3() { return stream3; } public void setStream3(long stream3) { this.stream3 = stream3; } public StreamWritable() { } public StreamWritable(long stream1, long stream2, long stream3) { this.set(stream1, stream2, stream3); } public void set(long stream1, long stream2, long stream3) { this.stream1 = stream1; this.stream2 = stream2; this.stream3 = stream3; } @Override public void write(DataOutput out) throws IOException { out.writeLong(stream1);//写出顺序和读入顺序一一对应 out.writeLong(stream2); out.writeLong(stream3); } @Override public void readFields(DataInput in) throws IOException { this.stream1 = in.readLong();//写出顺序和读入顺序一一对应 this.stream2 = in.readLong(); this.stream3 = in.readLong(); } //输出的时候会调用toString方法 @Override public String toString() { return this.stream1+"\t"+this.stream2+"\t"+this.stream3; } }
以上是“hadoop如何实现x计数器、分区、序列化”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。