温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

MapReduce on Hbase

发布时间:2020-06-21 06:43:10 来源:网络 阅读:755 作者:jethai 栏目:关系型数据库


org.apache.hadoop.hbase.mapreduce


TableMapper  TableReducer


一个region对应一个map

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;

public class HbaseMR {

    public class MyMapper extends TableMapper<Text, Text> {

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context) throws IOException, InterruptedException {
            // key代表rowkey
            Text k = new Text(Bytes.toString(key.get()));
            Text v = new Text(Bytes.toString(value.getValue(
                    "basicinfo".getBytes(), "age".getBytes())));

            context.write(v, k);

        }

    }

    public class MyReducer extends TableReducer<Text, Text, Text> {

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Put put = new Put(Bytes.toBytes(key.toString()));
            for (Text value : values) {
                put.add(Bytes.toBytes("f1"), Bytes.toBytes(value.toString()),
                        Bytes.toBytes(value.toString()));
            }
            context.write(null, put);
        }

    }

    public static void main(String[] args) {
        Configuration conf=    HBaseConfiguration.create();
        try {
            Job job=new Job(conf, "mapreduce on hbase");
            job.setJarByClass(HbaseMR.class);
            Scan scan=new Scan();
            scan.setCaching(1000);//
            TableMapReduceUtil.initTableMapperJob("students", scan, MyMapper.class, Text.class, Text.class, job);
            TableMapReduceUtil.initTableReducerJob("student-age",  MyReducer.class,  job);
            job.waitForCompletion(true);
        } catch (Exception e) {
            
            e.printStackTrace();
        }
    }

}


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI