本篇内容主要讲解“如何构建MapReduce程序的基础模板”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何构建MapReduce程序的基础模板”吧!
什么是开发数据集?
一个流行的开发策略是为生产环境中的大数据集建立一个较小的、抽样的数据子集,称为开发数据集。这个开发数据集可能只有几百兆字节。当你以单机或者伪分布式模式编写程序来处理它们时,你会发现开发周期很短,在自己的机器上运行程序也很方便,而且还可以在独立的环境中进行调试。
为什么选择专利引用数据做测试?
1、因为它们与你将来会遇到的大多数数据类型相似
2、专利引用数据所构成的关系图与网页链接以及社会网络图可谓大同小异
3、专利发布以时间为序,有些特性类似于时间序列
4、每个专利关联到一个人 (发明人) 和一个位置 (发明人的国家),你可以将之视为个人信息或地理数据
5、你可以将这些数据视为具有明确模式的普通数据库关系,而格式上简单地以逗号分开
数据集采用标准
数据集采用标准的逗号分隔取值 (comma-separated values, CSV) 格式。
构建MapReduce程序的基础模板
大多数MapReduce程序的编写都可以简单地依赖于一个模板及其变种,当撰写一个新得MapReduce程序时,我们通常会采用一个现有的MapReduce程序,并将其修改成我们所希望的样子。
典型的Hadoop程序的模板
public class MyJob extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
public void map (Text key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
output.collect(value, key);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String csv = "";
while (values.hasNext()) {
if (csv.length() > 0) csv += ",";
csv += values.next().toString();
}
output.collect(key, new Text(csv));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobConf job = new JobConf(conf, MyJob.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("MyJob");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.set("key.value.separator.in.input.line", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}
}
1、我们习惯用单个类来完整地定义每个MapReduce作业,这里成为MyJob类
2、Hadoop要求Mapper和Reducer必须是它们自身的静态类,这些类非常小,模板将它们包含在MyJob类中作为内部类,这样做的好处是可以把所有的东西放在一个文件内,简化代码管理
3、但是需要记住这些内部类是独立的,通常不与MyJob类进行交互
4、在作业执行期间,采用不同JVM的各类节点复制并运行Mapper和Reducer,而其他的作业类仅在客户机上执行
解释下run()方法
1、框架的核心在run()方法中,也称为driver
2、它实例化、配置并传递一个JobConf对象命名的作业给JobClient.runJob()以启动MapReduce作业(反过来,JobClient类与JobTracker通信让该作业在集群上启动)
3、JobConf对象将保持作业运行所需的全部配置参数
4、Driver需要在作业中为每个作业定制基本参数,包括输入路径、输出路径、Mapper类和Reducer类
5、每个作业可以重置默认的作业属性,例如,InputFormat、OutputFormat等,也可以调用JobConf对象中的set()方法填充任意的配置参数
6、一旦传递JobConf对象到JobClient.runJob(),他就被视为决定这个作业如何运行的蓝本
关于driver的配置的一些说明
1、JobConf对象有许多参数,但我们并不希望全部的参数都通过编写driver来设置,可以把Hadoop安装时的配置文件作为一个很好的起点
2、用户可能希望在命令行启动一个作业时传递额外的参数来改变作业配置
3、Driver可以通过自定义一组命令并自行处理用户参数,来支持用户修改其中的一些配置
4、因为经常需要做这样的任务,Hadoop框架便提供了ToolRunner、Tool和Configured来简化其实现。
5、当它们在上面的MyJob框架中被同时使用时,这些类使得作业可以理解用户提供的被GenericOptionParser支持的选项
比如下面的命令:
bin/hadoop jar playgroup/MyJob.jar MyJob input/cite75-99.txt output
如果我们运行作业仅仅是想看到mapper的输出 (处于调试的目的), 可以用选项 -D mapred.reduce.tasks=0将reducer的数目设置为0
bin/hadoop jar playgroup/MyJob.jar MyJob -D mapred.reduce.tasks=0 input/cite75-99.txt output
通过使用ToolRunner、MyJob可以自动支持一下选项
GenericOptionsParser支持的选项
选项 | 描述 |
-conf <configuration file> | 指定一个配置文件 |
-D <property=value> | 给JobConf属性赋值 |
-fs <local | namenode:port> | 指定一个NameNode,可以是 "local" |
-jt <local | jobtracker:port> | 指定一个JobTracker |
-files <list of files> | 指定一个以逗号分隔的文件列表,用于MapReduce作业。这些文件自动地分布到所有节点,使之可从本地获取 |
-libjars <list of jars> | 指定一个以逗号分隔的jar文件,使之包含在所有任务JVM的classpath中 |
-archives <list of archives> | 指定一个以逗号分隔的存档文件列表,使之可以在所有任务节点上打开 |
模板代码Mappper与Reducer
模板中习惯将Mapper类称为MapClass,而将Reducer类称为Reduce
Mapper和Reducer都是MapReduceBase的扩展
MapReduceBase是个小类,包含configure()和close(),我们使用上两个方法来建立和清除map(reduce)任务,除非是更高级的作业,通常我们并不需要覆盖它们
Mapper类和Reducer类模板说明
public static class MapClass extends MapReduceBase
implements Mapper<K1, V1, K2, V2> {
public void map (K1 key, V1 value,
OutputCollector<K2, V2> output,
Reporter reporter) throws IOException { }
}
public static class Reduce extends MapReduceBase
implements Reducer<K1, V2, K3, V3> {
public void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output,
Reporter reporter) throws IOException { }
}
Mapper类的核心操作为map()方法,Reduce类为reduce()方法。每一个map()方法的调用分别被赋予一个类型为K1和V1的键/值对。这个键/值对由mapper生成,并通过OutputCollector对象的collect()方法来输出。你需要在map()方法中的合适位置调用:
output.collect((K2) k, (V2) v);
在Reducer中reduce()方法的每次调用均被赋予K2类型的键,以及V2类型的一组值。注意它必须与Mapper中使用的K2和V2类型相同。Reduce()方法可能会循环遍历V2类型的所有值。
while (values.hasNext()) {
V2 v = values.next();
}
Reduce()方法还使用OutputCollector来搜集其键/值的输出,它们的类型为K3/V3。在reudce()方法中可以调用
output.collect((K3) k, (V3) v);
除了在Mapper和Reducer之间保持K2与V3的类型一致,还需要确保在Mapper和Reducer中使用的键值类型与在driver中设置的输入格式、输出键的类,以及输出值的类保持一致
使用KeyValueTextInputFormat意味着K1和V1必须均为Text类型
Driver则必须调用setOutputKeyClass()和setOutputValueClass()分别指定K2和V2的类
最终:
1、所有的键与值的类型必须是Writable的子类型,来确保Hadoop的序列化接口可以把数据在分布式集群上发送
2、键的类型实现了WritableComparable,它是Writable的子接口,键的类型还需额外支持compareTo()方法,因为在MapReduce框架中键会被用来进行排序
到此,相信大家对“如何构建MapReduce程序的基础模板”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。