这篇文章主要介绍“Hadoop MapReduce是什么”,在日常操作中,相信很多人在Hadoop MapReduce是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop MapReduce是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
还有一个比较形象的语言解释MapReduce:
我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就越快。
现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。
MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
那什么是分而治之呢?
比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;
最终再汇总每个小任务的结果
MapReduce由两个阶段组 成:
Map阶段(切分成一个个小的任务)
Reduce阶段(汇总小任务的结果)
map阶段有一个关键的map()函数;
此函数的输入是键值对
输出是一系列键值对,输出写入本地磁盘。
reduce阶段有一个关键的函数reduce()函数
此函数的输入也是键值对(即map的输出(kv对))
输出也是一系列键值对,结果最终写入HDFS
Map&Reduce
mapReduce编程模型的总结:
MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤
第一步:设置inputFormat类,将我们的数据切分成key,value对,输入到第二步
第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出
第三步:对输出的key,value对进行分区。(相同key的数据属于同一分区)
第四步:对不同分区的数据按照相同的key进行排序
第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
第六步:对排序后的数据进行分组,分组的过程中,将相同key的value放到一个集合当中(每组数据调用一次reduce方法)
第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
第八步:设置outputformat将输出的key,value对数据进行保存到文件中。
hadoop没有沿用java当中基本的数据类型,而是自己进行封装了一套数据类型,其自己封装的类型与java的类型对应如下
下表常用的数据类型对应的Hadoop数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
byte[] | BytesWritable |
需求:现有数据格式如下,每一行数据之间都是使用逗号进行分割,求取每个单词出现的次数
hello,hello world,world hadoop,hadoop hello,world hello,flume hadoop,hive hive,kafka flume,storm hive,oozie
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh6.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh6.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh6.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0-cdh6.14.2</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>RELEASE</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <!-- <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build>
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 自定义mapper类需要继承Mapper,有四个泛型, * keyin: k1 行偏移量 Long * valuein: v1 一行文本内容 String * keyout: k2 每一个单词 String * valueout : v2 1 int * 在hadoop当中没有沿用Java的一些基本类型,使用自己封装了一套基本类型 * long ==>LongWritable * String ==> Text * int ==> IntWritable * */ public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> { /** * 继承mapper之后,覆写map方法,每次读取一行数据,都会来调用一下map方法 * @param key:对应k1 * @param value:对应v1 * @param context 上下文对象。承上启下,承接上面步骤发过来的数据,通过context将数据发送到下面的步骤里面去 * @throws IOException * @throws InterruptedException * k1 v1 * 0;hello,world * * k2 v2 * hello 1 * world 1 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取我们的一行数据 String line = value.toString(); String[] split = line.split(","); Text text = new Text(); IntWritable intWritable = new IntWritable(1); for (String word : split) { //将每个单词出现都记做1次 //key2 Text类型 //v2 IntWritable类型 text.set(word); //将我们的key2 v2写出去到下游 context.write(text,intWritable); } } }
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //第三步:分区 相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合 /** * 继承Reducer类之后,覆写reduce方法 * @param key * @param values * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int result = 0; for (IntWritable value : values) { //将我们的结果进行累加 result += value.get(); } //继续输出我们的数据 IntWritable intWritable = new IntWritable(result); //将我们的数据输出 context.write(key,intWritable); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* 这个类作为mr程序的入口类,这里面写main方法 */ public class WordCount extends Configured implements Tool{ /** * 实现Tool接口之后,需要实现一个run方法, * 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类 Configuration conf = super.getConf(); Job job = Job.getInstance(conf, "mrdemo1"); //实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包 //如果要打包到集群上面去运行,必须添加以下设置 job.setJarByClass(WordCount.class); //第一步:读取文件,解析成key,value对,k1:行偏移量 v1:一行文本内容 job.setInputFormatClass(TextInputFormat.class); //指定我们去哪一个路径读取文件 TextInputFormat.addInputPath(job,new Path("文件位置")); //第二步:自定义map逻辑,接受k1 v1 转换成为新的k2 v2输出 job.setMapperClass(MyMapper.class); //设置map阶段输出的key,value的类型,其实就是k2 v2的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //第三步到六步:分区,排序,规约,分组都省略 //第七步:自定义reduce逻辑 job.setReducerClass(MyReducer.class); //设置key3 value3的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //第八步:输出k3 v3 进行保存 job.setOutputFormatClass(TextOutputFormat.class); //一定要注意,输出路径是需要不存在的,如果存在就报错 TextOutputFormat.setOutputPath(job,new Path("输出文件位置")); //提交job任务 boolean b = job.waitForCompletion(true); return b?0:1; /*** * 第一步:读取文件,解析成key,value对,k1 v1 * 第二步:自定义map逻辑,接受k1 v1 转换成为新的k2 v2输出 * 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合 * 第四步:排序 对key2进行排序。字典顺序排序 * 第五步:规约 combiner过程 调优步骤 可选 * 第六步:分组 * 第七步:自定义reduce逻辑接受k2 v2 转换成为新的k3 v3输出 * 第八步:输出k3 v3 进行保存 * * */ } /* 作为程序的入口类 */ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("hello","world"); //提交run方法之后,得到一个程序的退出状态码 int run = ToolRunner.run(configuration, new WordCount(), args); //根据我们 程序的退出状态码,退出整个进程 System.exit(run); } }
到此,关于“Hadoop MapReduce是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。