这篇文章主要为大家展示了“hbase如何编写mapreduce”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“hbase如何编写mapreduce”这篇文章吧。
package com.hbase.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class HbaseMrTest { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); //配置conf conf.set("hbase.zookeeper.quorum", "bigdata01,bigdata02,bigdata03"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf, "word-count"); //指定执行job的主类 job.setJarByClass(HbaseMrTest.class); Scan scan = new Scan(); //定义mapper需要扫描的列 scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("words")); //配置mapper TableMapReduceUtil.initTableMapperJob("wordcount", scan,HMapper.class , Text.class, IntWritable.class, job); //配置recuder TableMapReduceUtil.initTableReducerJob("result", HReducer.class, job); //提交job System.exit(job.waitForCompletion(true)?0:1); } } // Text, IntWritable 为输出类型 class HMapper extends TableMapper<Text, IntWritable>{ Text out = new Text(); IntWritable iw = new IntWritable(1); @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException { //通过result 直接过得content:words 的值 byte[] bytes = value.getValue(Bytes.toBytes("content"), Bytes.toBytes("words")); if(bytes!=null) { String words = Bytes.toString(bytes); //对获得的一行单词进行分割 String[] ws = words.split(" "); for(String wd : ws) { out.set(wd); //写出值,如: you 1 context.write(out, iw); } } } } // Text, IntWritable 为mapper的输出类型 class HReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{ @Override protected void reduce(Text text, Iterable<IntWritable> iter, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { int sum = 0 ; //对iter遍历 for(IntWritable intw : iter) { sum+= intw.get(); } //new 一个put 构造函数内的值为row key Put put = new Put(Bytes.toBytes(text.toString())); //put添加columnfamily 和column put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("wordcnt"), Bytes.toBytes(String.valueOf(sum))); //将每个单词当做row key 写出,put是相加的总和 context.write(new ImmutableBytesWritable(Bytes.toBytes(text.toString())), put); } } 最后将java文件export为RaunableJar放到linux java -jar hbase.jar com.hbase.test.HbaseMrTest 运行
原始数据:
运行结果:
以上是“hbase如何编写mapreduce”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。