这篇文章主要讲解了“怎么编写不同MapReudce程序”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么编写不同MapReudce程序”吧!
序列化就是把 内存中的对象的状态信息,转换成 字节序列以便于存储(持久化)和网络传输。而反序列化就是将收到 字节序列或者是硬盘的持久化数据,转换成内存中的对象。
其实在Java规范中,已经有了一套序列化的机制,某个面向对象的类实现Serializable接口就能实现序列化与反序列化,但是记得一定要加上序列化版本ID serialVersionUID .可是为什么Hadoop要自主研发序列化机制呢?它对比原生态的有什么特点和区别呢?
JDK在序列化的时候,算法会考虑这些事情:
所以我们只要implements Serializable接口,JDK会自动处理一切,Java的序列化机制相当复杂,能处理各种对象关系。 缺点:Java的序列化机制计算量开销大,且序列化的结果体积太大,有时能达到对象大小的数倍.引用机制也会导致大文件不能分割. 这些缺点对于Hadoop是非常致命的,因为在Hadoop集群之间需要通讯或者是RPC调用的话,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以Hadoop就自个玩了一套. |
Hadoop的序列化的特点是: 1 . 紧凑:由于带宽是集群中信息传递的最宝贵的资源所以我们必须想法设法缩小传递信息的大小,hadoop的序列化就 为了更好地坐到这一点而设计的。 2 . 对象可重用:JDK的反序列化会不断地创建对象,这肯定会造成一定的系统开销,但是在hadoop的反序列化中,能重复的利用一个对象的readField方法来重新产生不同的对象。 3 . 可扩展性:Hadoop的序列化有多中选择 a.可以利用实现hadoop框架中的Writable接口。(原生的) b.使用开源的序列化框架protocol Buffers,Avro等框架。 PS(网络来源):hadoop2.X之后是实现一个叫YARN,所有应用(如mapreduce,或者其他spark实时或者离线的计算框架都可以运行在YARN上),YARN还负责对资源的调度等等。YARN的序列化就是用Google开发的序列化框架protocol Buffers,目前支持支持三种语言C++,java,Python.所以RPC这一层我们就可以利用其他语言来做文章,满足其他语言开发者的需求。 接下来的话就是如何使用序列化机制,Writable介绍如下. |
Hadoop原生的序列化,hadoop原生的序列化类需要实现一个叫Writeable的接口,类似于Serializable接口。
还有hadoop也为我们提供了几个序列化类,他们都直接或者间接地实现了Writable接口。如:IntWritable,LongWritable,Text,org.apache.hadoop.io.WritableComparable<T>等等。
实现Writable接口必须实现两个方法:
public void write(DataOutput out) throws IOException ; public void readFields(DataInput in) throws IOException ;
实现WritableComparable接口必须实现三个方法,翻阅该接口的的源码,都已经给出demo了.篇幅原因,自己去看吧
案例1:数据如下图,统计电话号码相同的的上传下载流量和总流量.电话号码,上传流量,下载流量,总流量.(1,lastest-2,lastest-3)
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash3-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
package com.codewatching.fluxcount.bean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable { private String phoneNum; private long upFlow; private long downFlow; private long sumFlow; public FlowBean(){} public FlowBean(String phoneNum, long upFlow, long downFlow) { super(); this.phoneNum = phoneNum; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } public String getPhoneNum() { return phoneNum; } public void setPhoneNum(String phoneNum) { this.phoneNum = phoneNum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNum); out.writeLong(downFlow); out.writeLong(upFlow); out.writeLong(sumFlow); } @Override public void readFields(DataInput in) throws IOException { phoneNum = in.readUTF(); downFlow = in.readLong(); upFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } } 2. 编写Mapper,Reducer,Runner. package com.codewatching.fluxcount.hadoop; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import com.codewatching.fluxcount.bean.FlowBean; public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fileds = line.split("\t"); int length = fileds.length; String phoneNum = fileds[1]; long upFlow = Long.parseLong(fileds[length-3]); long downFlow = Long.parseLong(fileds[length-2]); FlowBean flowBean = new FlowBean(phoneNum, upFlow, downFlow); //以flowBean为value供reducer处理 context.write(new Text(phoneNum), flowBean); } } package com.codewatching.fluxcount.hadoop; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.codewatching.fluxcount.bean.FlowBean; public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long _downFlow = 0; long _upFlow = 0; for (FlowBean flowBean : values) { _downFlow += flowBean.getDownFlow(); _upFlow += flowBean.getUpFlow(); } FlowBean bean = new FlowBean(key.toString(), _upFlow, _downFlow); context.write(key, bean); } } package com.codewatching.fluxcount.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.codewatching.fluxcount.bean.FlowBean; public class FlowSumRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); configuration.set("mapreduce.job.jar", "fluxcount.jar"); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileSystem fileSystem = FileSystem.get(configuration); Path path = new Path(args[1]); if(fileSystem.exists(path)){ fileSystem.delete(path, true); } FileOutputFormat.setOutputPath(job, path); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new FlowSumRunner(), args); } } |
hadoop的map/reduce中支持对key进行分区,从而让map出来的数据均匀分布在reduce上.Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出结果.Mapper的结果,可能送到Combiner(下面回讲到)做合并, Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer。哪个key到哪个Reducer的分配过程,是由Partitioner规定的.说的真麻烦。如果我们去查阅Partitioner类的源码,就知道它是个抽象类,里面有个抽象方法:
/** * Get the partition number for a given key (hence record) given the total * number of partitions i.e. number of reduce-tasks for the job. * * <p>Typically a hash function on a all or a subset of the key.</p> * * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the <code>key</code>. */ public abstract int getPartition(KEY key, VALUE value, int numPartitions);
而在类的注释也是非常的全面,不得抱怨一句。洋文如果好一点的话,学起来会轻松多了.唉,老大难.
Partitioner controls the partitioning of the keys of the intermediate map-outputs.....省略..
案例2:在案例1的基础上,然后将号码进行分区,假设135是北京,139是江西...将各地区的统计出来,并且各地区单独存放文件.效果图如下:
package com.codewatching.fluxcount.hadoop; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import com.codewatching.fluxcount.bean.FlowBean; public class AreaPartitioner extends Partitioner<Text, FlowBean>{ private static Map<String,Integer> areaMap; static{ areaMap = new HashMap<String, Integer>(); areaMap.put("135", 0); //模拟分区,存在内存中。 areaMap.put("137", 1); areaMap.put("138", 2); areaMap.put("139", 3); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { int area = 4; //默认都是为4 String prefix = key.toString().substring(0,3); //判断是否在某个分区中 Integer index = areaMap.get(prefix); if(index!=null){ area = index; //如果存在,取相应的数字0,1,2,3 } return area; } } 2.在Runner中添加两行代码. 3.在Hadoop中的运行结果. |
其实上Hadoop已经提供了一个默认的实现类叫着HashPartitioner.看看它如何key分区的.
将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。
PS:这个简单算法得到的结果可能不均匀,因为key毕竟不会那么线性连续.
输入处理类:InputFormat的作用负责MR的输入部分
1、验证作业的输入是否规范。
2、把输入文件切分成InputSplit。
3、提供RecordReader的实现类,把InputSplit读到Mapper中进行处理.
最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越2个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储着2个数据块,因此分片中的部分数据需要通过网络传输到Map任务节点,与使用本地数据运行整个Map任务相比,这种方法显然效率更低。
PS:还可以编写自定义的输入处理类,继承InputFormat,重写相应的方法,当然,首先要知道方法的作用.--建议读源代码.
输出处理类:OutputFormat,在Ruduce处理之后.
编程时,输出输入处理类在哪使用指定:
感谢各位的阅读,以上就是“怎么编写不同MapReudce程序”的内容了,经过本文的学习后,相信大家对怎么编写不同MapReudce程序这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。