这篇文章主要介绍“Flink集成iceberg在生产环境中的使用方法是什么”,在日常操作中,相信很多人在Flink集成iceberg在生产环境中的使用方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flink集成iceberg在生产环境中的使用方法是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在大数据处理领域,有一个非常常见但是很麻烦的问题,即hdfs小文件问题,我们也被这个问题困扰了很久。开始的时候我们是自己写的一个小文件压缩工具,定期的去合并,原理就是把待压缩数据写入一个新的临时的文件夹,压缩完,和原来的数据进行检验,数据一致之后,用压缩的数据覆盖原来的数据,但是由于无法保证事务,所以出现了很多的问题,比如压缩的同时又有数据写入了,检验就会失败,导致合并小文件失败,而且无法实时的合并,只能按照分区合并一天之前的。或者一个小时之前的,最新的数据仍然有小文件的问题,导致查询性能提高不了。
所以基于以上的一些问题,我调研了数据湖技术,由于我们的流式数据主要是flink为主,查询引擎是presto,而hudi强耦合了spark,对flink的支持还不太友好,所以综合考虑了一下,决定引入iceberg。在对iceberg进行功能测试和简单代码review之后,发现iceberg在flink这块还有一些需要优化和提升,不过我觉得应该能hold的住,不完善的地方和需要优化的地方我们自己来补全,所以最终引入了iceberg来解决小文件的问题。
除此之外,对于一些其他的问题,比如cdc数据的接入,以及根据查询条件删除数据等,后续也可以通过数据湖技术来解决。
我们的主要使用场景是使用flink将kafka的流式数据写入到Iceberg,为了代码的简洁以及可维护性,我们尽量将程序使用sql来编写,示例代码如下:
// create catalog CREATE CATALOG iceberg WITH ( 'type'='iceberg', 'catalog-type'='hive'," + 'warehouse'='hdfs://localhost/user/hive/warehouse', 'uri'='thrift://localhost:9083')// create table CREATE TABLE iceberg.tmp.iceberg_table ( id BIGINT COMMENT 'unique id', data STRING, d int) PARTITIONED BY (d)WITH ('connector'='iceberg','write.format.default'='orc')// insert into insert into iceberg.tmp.iceberg_table select * from kafka_table
提示:记得开启checkpoint
目前压缩小文件是采用的一个额外批任务来进行的,Iceberg提供了一个spark版本的action,我在做功能测试的时候发现了一些问题,此外我对spark也不是非常熟悉,担心出了问题不好排查,所以参照spark版本的自己实现了一个flink版本,并修复了一些bug,进行了一些功能的优化。
由于我们的iceberg的元数据都是存储在hive中的,所以压缩程序的逻辑是我把hive中所有的iceberg表全部都查出来,依次压缩。压缩没有过滤条件,不管是分区表还是非分区表,都进行全表的压缩。这样做是为了处理某些使用eventtime的flink任务,如果有延迟的数据的到来。就会把数据写入以前的分区,如果不是全表压缩只压缩当天分区的话,新写入的其他天的数据就不会被压缩。
代码示例参考:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles()//.maxParallelism(parallelism)//.filter(Expressions.equal("day", day))//.targetSizeInBytes(targetSizeInBytes).execute();
具体的压缩小文件相关的信息可以参考这篇文章[Flink集成iceberg数据湖之合并小文件]。
我们的快照过期策略,我是和压缩小文件的批处理任务写在一起的,压缩完小文件之后,进行表的快照过期处理,目前保留的时间是一个小时,这是因为对于有一些比较大的表,分区比较多,而且checkpoint比较短,如果保留的快照过长的话,还是会保留过多小文件,我们暂时没有查询历史快照的需求,所以我将快照的保留时间设置了一个小时。
long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); table.expireSnapshots() // .retainLast(20) .expireOlderThan(olderThanTimestamp) .commit();
写入了数据之后,有时候我想查看一下相应的快照下面有多少数据文件,直接查询hdfs你不知道哪个是有用的,哪个是没用的。所以需要有对应的管理工具。目前flink这块还不太成熟,我们可以使用spark3提供的工具来查看。
目前create table 这些操作我们是通过flink sql client来做的。其他相关的ddl的操作可以使用spark来做:
https://iceberg.apache.org/spark/#ddl-commands
一些相关的数据的操作,比如删除数据等可以通过spark来实现,presto目前只支持分区级别的删除功能。
在使用iceberg的过程中,有时候会有这样的情况,我提交了一个flink任务,由于各种原因,我把它给停了,这个时候iceberg还没提交相应的快照。还有由于一些异常导致程序失败,就会产生一些不在iceberg元数据里面的孤立的数据文件,这些文件对iceberg来说是不可达的,也是没用的。所以我们需要像jvm的垃圾回收一样来清理这些文件。
目前iceberg提供了一个spark版本的action来进行处理这些没用的文件,我们采取的策略和压缩小文件一样,获取hive中的所有的iceberg表。每隔一个小时执行一次定时任务来删除这些没用的文件。
SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
在程序运行过程中出现了正常的数据文件被删除的问题,经过调研,由于我的快照保留设置是一小时,这个清理程序清理时间也是设置一个小时,通过日志发现是这个清理程序删除了正常的数据。查了查代码,觉得应该是他们设置了一样的时间,在清理孤立文件的时候,有其他程序正在读写表,由于这个清理程序是没有事务的,导致删除了正常的数据。最后把这个清理程序的清理时间改成默认的三天,没有再出现删除数据文件的问题。当然,为了保险起见,我们可以覆盖原来的删除文件的方法,改成将文件到一个备份文件夹,检查没有问题之后,手工删除。
目前我们使用的版本是prestosql 346,这个版本安装的时候需要jdk11,presto查询iceberg比较简单。官方提供了相应的conncter,我们配置一下就行,
//iceberg.propertiesconnector.name=iceberg hive.metastore.uri=thrift://localhost:9083
目前查询iceberg的批处理任务,使用的flink的客户端,首先我们启动一个基于yarn session 的flink集群,然后通过sql客户端提交任务到集群。
主要的配置就是我们需要根据数据的大小设置sql任务执行的并行度,可以通过以下参数设置。
set table.exec.resource.default-parallelism = 100;
此外我在sql客户端的配置文件里配置了hive和iceberg相应的catalog,这样每次客户端启动的时候就不需要建catalog了。
catalogs: # empty list - name: iceberg type: iceberg warehouse: hdfs://localhost/user/hive2/warehouse uri: thrift://localhost:9083 catalog-type: hive cache-enabled: false - name: hive type: hive hive-conf-dir: /Users/user/work/hive/conf default-database: default
目前对于定时调度中的批处理任务,flink的sql客户端还没hive那样做的很完善,比如执行hive -f来执行一个文件。而且不同的任务需要不同的资源,并行度等。所以我自己封装了一个flinK程序,通过调用这个程序来进行处理,读取一个指定文件里面的sql,来提交批任务。在命令行控制任务的资源和并行度等。
/home/flink/bin/flink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql
批任务的查询这块,做了一些优化,比如limit下推,filter下推,查询并行度优化等,可以大大提高查询的速度,这些优化都已经推回给社区。
目前我们的所有数据都是存储在hive表的,在验证完iceberg之后,我们决定将hive的数据迁移到iceberg,所以我写了一个工具,可以使用hive的数据,然后新建一个iceberg表,为其建立相应的元数据,但是测试的时候发现,如果采用这种方式,就需要把写入hive的程序停止,因为如果iceberg和hive使用同一个数据文件,而压缩程序会不断地压缩iceberg表的小文件,压缩完之后,不会马上删除旧数据,所以hive表就会查到双份的数据。鉴于iceberg测试的时候还有一些不稳定,所以我们采用双写的策略,原来写入hive的程序不动,新启动一套程序写入iceberg,这样能对iceberg表观察一段时间。还能和原来hive中的数据进行比对,来验证程序的正确性。
经过一段时间观察,每天将近20亿数据的hive表和iceberg表,一条数据也不差。所以在最终对比数据没有问题之后,把hive表停止写入,使用新的iceberg表,然后把hive中的旧数据导入到iceberg。
到此,关于“Flink集成iceberg在生产环境中的使用方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。