温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

HBase 之HFileOutputFormat

发布时间:2020-06-08 05:59:20 阅读:9318 作者:yyj0531 栏目:关系型数据库
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

   hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例
1. 创建HBase表t1

hbase(main):157:0* create 't1','f1' 0 row(s) in 1.3280 seconds  hbase(main):158:0> scan 't1' ROW                   COLUMN+CELL                                                0 row(s) in 1.2770 seconds 

2.写MR作业
HBaseHFileMapper.java

package com.test.hfile; import java.io.IOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;  public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {     private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();     @Override     protected void map(LongWritable key, Text value,             org.apache.hadoop.mapreduce.Mapper.Context context)             throws IOException, InterruptedException {         immutableBytesWritable.set(Bytes.toBytes(key.get()));         context.write(immutableBytesWritable, value);     } } 

HBaseHFileReducer.java

package com.test.hfile; import java.io.IOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;  public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {         protected void reduce(ImmutableBytesWritable key, Iterable<Text> values,             Context context)             throws IOException, InterruptedException {         String value="";         while(values.iterator().hasNext())         {             value = values.iterator().next().toString();             if(value != null && !"".equals(value))             {                 KeyValue kv = createKeyValue(value.toString());                 if(kv!=null)                     context.write(key, kv);             }         }     }     // str格式为row:family:qualifier:value 简单模拟下    private KeyValue createKeyValue(String str)     {         String[] strstrs = str.split(":");         if(strs.length<4)             return null;         String row=strs[0];         String family=strs[1];         String qualifier=strs[2];         String value=strs[3];         return new KeyValue(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(), Bytes.toBytes(value));     } } 

HbaseHFileDriver.java

package com.test.hfile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;  public class HbaseHFileDriver {     public static void main(String[] args) throws IOException,             InterruptedException, ClassNotFoundException {                  Configuration conf = new Configuration();         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();          Job job = new Job(conf, "testhbasehfile");         job.setJarByClass(HbaseHFileDriver.class);          job.setMapperClass(com.test.hfile.HBaseHFileMapper.class);         job.setReducerClass(com.test.hfile.HBaseHFileReducer.class);          job.setMapOutputKeyClass(ImmutableBytesWritable.class);         job.setMapOutputValueClass(Text.class);         // 偷懒, 直接写死在程序里了,实际应用中不能这样, 应从命令行获取        FileInputFormat.addInputPath(job, new Path("/home/yinjie/input"));         FileOutputFormat.setOutputPath(job, new Path("/home/yinjie/output"));          Configuration HBASE_CONFIG = new Configuration();         HBASE_CONFIG.set("hbase.zookeeper.quorum", "localhost");         HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");         HBaseConfiguration cfg = new HBaseConfiguration(HBASE_CONFIG);         String tableName = "t1";         HTable htable = new HTable(cfg, tableName);         HFileOutputFormat.configureIncrementalLoad(job, htable);          System.exit(job.waitForCompletion(true) ? 0 : 1);     } } 

/home/yinjie/input目录下有一个hbasedata.txt文件,内容为

[root@localhost input]# cat hbasedata.txt  r1:f1:c1:value1 r2:f1:c2:value2 r3:f1:c3:value3 

将作业打包,我的到处路径为/home/yinjie/job/hbasetest.jar
提交作业到hadoop运行:

[root@localhost job]# hadoop jar /home/yinjie/job/hbasetest.jar com.test.hfile.HbaseHFileDriver -libjars /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar 

作业运行完毕后查看下输出目录:

[root@localhost input]# hadoop fs -ls /home/yinjie/output Found 2 items drwxr-xr-x   - root supergroup          0 2011-08-28 21:02 /home/yinjie/output/_logs drwxr-xr-x   - root supergroup          0 2011-08-28 21:03 /home/yinjie/output/f1 

OK, 已经生成以列族f1命名的文件夹了。
接下去使用Bulk Load将数据导入到HBbase

[root@localhost job]# hadoop jar /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar completebulkload /home/yinjie/output t1 

导入完毕,查询hbase表t1进行验证

hbase(main):166:0> scan 't1' ROW                              COLUMN+CELL                                                                                   r1                              column=f1:c1, timestamp=1314591150788value=value1                                           r2                              column=f1:c2, timestamp=1314591150814value=value2                                           r3                              column=f1:c3, timestamp=1314591150815value=value3                                          3 row(s) in 0.0210 seconds 

数据已经导入!
 

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI

开发者交流群×