这篇文章主要为大家展示了“hbase如何编写mapreduce”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“hbase如何编写mapreduce”这篇文章吧。
package com.hbase.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class HbaseMrTest {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
//配置conf
conf.set("hbase.zookeeper.quorum", "bigdata01,bigdata02,bigdata03");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(conf, "word-count");
//指定执行job的主类
job.setJarByClass(HbaseMrTest.class);
Scan scan = new Scan();
//定义mapper需要扫描的列
scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("words"));
//配置mapper
TableMapReduceUtil.initTableMapperJob("wordcount", scan,HMapper.class , Text.class, IntWritable.class, job);
//配置recuder
TableMapReduceUtil.initTableReducerJob("result", HReducer.class, job);
//提交job
System.exit(job.waitForCompletion(true)?0:1);
}
}
// Text, IntWritable 为输出类型
class HMapper extends TableMapper<Text, IntWritable>{
Text out = new Text();
IntWritable iw = new IntWritable(1);
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//通过result 直接过得content:words 的值
byte[] bytes = value.getValue(Bytes.toBytes("content"), Bytes.toBytes("words"));
if(bytes!=null) {
String words = Bytes.toString(bytes);
//对获得的一行单词进行分割
String[] ws = words.split(" ");
for(String wd : ws) {
out.set(wd);
//写出值,如: you 1
context.write(out, iw);
}
}
}
}
// Text, IntWritable 为mapper的输出类型
class HReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text text, Iterable<IntWritable> iter,
Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
int sum = 0 ;
//对iter遍历
for(IntWritable intw : iter) {
sum+= intw.get();
}
//new 一个put 构造函数内的值为row key
Put put = new Put(Bytes.toBytes(text.toString()));
//put添加columnfamily 和column
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("wordcnt"), Bytes.toBytes(String.valueOf(sum)));
//将每个单词当做row key 写出,put是相加的总和
context.write(new ImmutableBytesWritable(Bytes.toBytes(text.toString())), put);
}
}
最后将java文件export为RaunableJar放到linux java -jar hbase.jar com.hbase.test.HbaseMrTest 运行
原始数据:
运行结果:
以上是“hbase如何编写mapreduce”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2214248/