本篇内容主要讲解“MapReduce怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“MapReduce怎么使用”吧!
什么是MR
MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。
MR执行过程
MR程序启动时,会把输入文件转化成<k1,v1>键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成<k2,v2>键值对。由<k2,v2>转变成reduce函数的输入<k2,{v2,.....}>的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。<k2,{v2...}>进过reduce函数处理,变成了最后的输出<k3,v3>。在到达reduce函数之前,键值对的数目是不变的。
Map阶段
(1).根据输入文件解析成<k1,v1>对,每一对调用一次map函数
(2).根据自己编写的map函数,将键值对处理,变成新的<k2,v2>键值对输出
(3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程
(4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。
(5).进行规约(可选)
Reduce阶段
(1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。
(2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理<k2,{v2..}>,转换成新的键值对输出
(3).输出保存文件
3.简单例子
Wordcount
public class WordCount { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ Text k2=new Text(); LongWritable v2=new LongWritable(); @Override protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException { String[] words=v1.toString().split("\t"); for (String string : words) { k2.set(string); v2.set(1L); context.write(k2, v2); } } } public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable>{ LongWritable v3=new LongWritable(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { long sum=0; for (LongWritable longWritable : v2s) { sum=sum+longWritable.get(); } v3.set(sum); context.write(k2, v3); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, WordCount.class.getSimpleName()); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4")); job.waitForCompletion(true); } }
4.MR的序列化
序列化就是把结构化的对象转换为字节流,在MR中,他没有用java自己的序列化,而是自己实现了一套序列化。因为相比较而言,hadoop的序列化有着诸多优点。在mr程序中,我们的参数和输出的键值对全都是实现了序列化的对象,当我们需要自订一个序列化对象,该如何操作呢?只需要实现Writable接口即可,当然key需要实现WritableComparable接口,因为需要根据key来排序和分组。
接着有个小例子来展示序列化。就是电信流量的处理例子。
public class LiuLiang { public static class MyMapper extends Mapper<LongWritable, Text, Text, MyArrayWritable>{ Text k2=new Text(); MyArrayWritable v2=new MyArrayWritable(); LongWritable v21=new LongWritable(); LongWritable v22=new LongWritable(); LongWritable v23=new LongWritable(); LongWritable v24=new LongWritable(); LongWritable[] values=new LongWritable[4]; @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] words=v1.toString().split("\t"); k2.set(words[1]); v21.set(Long.parseLong(words[6])); v22.set(Long.parseLong(words[7])); v23.set(Long.parseLong(words[8])); v24.set(Long.parseLong(words[9])); values[0]=v21; values[1]=v22; values[2]=v23; values[3]=v24; v2.set(values); context.write(k2, v2); } } public static class MyReduce extends Reducer<Text, MyArrayWritable, Text, Text>{ Text v3=new Text(); @Override protected void reduce(Text k2, Iterable<MyArrayWritable> v2s, Context context) throws IOException, InterruptedException { long sum1=0; long sum2=0; long sum3=0; long sum4=0; for (MyArrayWritable myArrayWritable : v2s) { Writable[] values= myArrayWritable.get(); sum1=sum1+((LongWritable)values[0]).get(); sum2=sum2+((LongWritable)values[1]).get(); sum3=sum3+((LongWritable)values[2]).get(); sum4=sum4+((LongWritable)values[3]).get(); } v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4); context.write(k2, v3); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName()); job.setJarByClass(LiuLiang.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MyArrayWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat")); FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3")); job.waitForCompletion(true); } } class MyArrayWritable extends ArrayWritable{ public MyArrayWritable(){ super(LongWritable.class); } public MyArrayWritable(String[] arg0) { super(arg0); } }
5.SequenceFile
在HDFS的学习中,提到了小文件的解决方案,其中一个便是这个SequenceFile。他是一种无序存储,将kv对序列化到文件中,从而合并许多小文件并且支持压缩。缺点是必须遍历才能查看里面各个小文件。
public class SequenceFileTest { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop"); //Write(conf, fileSystem); Read(conf, fileSystem); } private static void Read(Configuration conf, FileSystem fileSystem) throws IOException { Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf); Text key=new Text(); Text val=new Text(); while(reader.next(key, val)){ System.out.println(key.toString()+"----"+val.toString()); } IOUtils.closeStream(reader); } private static void Write(Configuration conf, FileSystem fileSystem) throws IOException { Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class); Collection<File> files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false); for (File file : files) { Text text = new Text(); text.set(FileUtils.readFileToString(file)); writer.append(new Text(file.getName()), text); } IOUtils.closeStream(writer); } }
到此,相信大家对“MapReduce怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。