温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink1.8如何批量Sink到HBase

发布时间:2021-12-09 10:18:18 来源:亿速云 阅读:782 作者:小新 栏目:大数据

这篇文章主要为大家展示了“Flink1.8如何批量Sink到HBase”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Flink1.8如何批量Sink到HBase”这篇文章吧。

  1. 实现背景:

  2.     消费Kafka数据写入HBase时,单条处理效率太低。需要批量插入hbase,这里自定义时间窗口countWindowAll 实现100条hbase插入一次Hbase

  3. 前面我就不写了 直接上核心代码

/*每10秒一个处理窗口*/DataStream<List<Put>> putList = filterData.countWindowAll(Constants.windowCount).apply(new AllWindowFunction<String, List<Put>, GlobalWindow>() {    @Override    public void apply(GlobalWindow window, Iterable<String> message, Collector<List<Put>> out) throws Exception {        List<Put> putList=new ArrayList<Put>();        for (String value : message)        {            String rowKey=value.replace("::","_");            Put put = new Put(Bytes.toBytes(rowKey.toString()));            String[] column=value.split("::");            for (int i = 0; i < column.length; i++) {                put.addColumn(Bytes.toBytes(Constants.columnFamily),Bytes.toBytes(Constants.columnArray[i]),Bytes.toBytes(column[i]));            }            putList.add(put);        }        out.collect(putList);    }    }).setParallelism(4);
putList.addSink(new HBaseSinkFunction()).setParallelism(1);

这里sink需要继承Flink的RichSinkFunction接口,实现其中的三个比较重要的函数:

1.open()任务开始只调用一次

2.invoke()每接收一条记录调用一次,多条记录调用多次

3.close()任务关闭只调用一次

写HBase自定义Sink为

HBaseSinkFunction extends RichSinkFunction<List<Put>>{@Overridepublic void open(Configuration parameters) throws Exception {    super.open(parameters);    HbaseUtils.connectHbase();    TableName table=TableName.valueOf(Constants.tableNameStr);    Admin admin = HbaseUtils.connection.getAdmin();    if(!admin.tableExists(table)){        HTableDescriptor tableDescriptor = new HTableDescriptor(Constants.tableNameStr);        tableDescriptor.addFamily(new HColumnDescriptor(Constants.columnFamily));        admin.createTable(tableDescriptor);    }}@Overridepublic void invoke(List<Put> putList, Context context) throws Exception {    Table table=HbaseUtils.connection.getTable(TableName.valueOf(Constants.tableNameStr));    table.put(putList);}@Overridepublic void close() throws Exception {    super.close();    HbaseUtils.closeHBaseConnect();}}

以上是“Flink1.8如何批量Sink到HBase”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI