Flink作为流批统一的计算框架,在1.10中完成了大量batch相关的增强与改进。1.10可以说是第一个成熟的生产可用的Flink Batch SQL版本,它一扫之前Dataset的羸弱,从功能和性能上都有大幅改进,以下我从架构、外部系统集成、实践三个方面进行阐述。
首先来看下stack,在新的Blink planner中,batch也是架设在Transformation上的,这就意味着我们和Dataset完全没有关系了:
Batch模式就是在中间结果落盘,这个模式和典型的Batch处理是一致的,比如MapReduce/Spark/Tez。
Flink以前的网络模型也分为Batch和Pipeline两种,但是Batch模式只是支持上下游隔断执行,也就是说资源用量可以不用同时满足上下游共同的并发。但是另外一个关键点是Failover没有对接好,1.9和1.10在这方面进行了改进,支持了单点的Failover。
建议在Batch时打开:
jobmanager.execution.failover-strategy = region
为了避免重启过于频繁导致JobMaster太忙了,可以把重启间隔提高:
restart-strategy.fixed-delay.delay = 30 s
Batch模式的好处有:
Batch模式比较稳,适合传统Batch作业,大作业。
Pipeline模式是Flink的传统模式,它完全和Streaming作业用的是同一套代码,其实社区里Impala和Presto也是类似的模式,纯走网络,需要处理反压,不落盘,它主要的优缺点是:
有条件可以考虑开启Pipeline模式。
Flink on Yarn支持两种模式,Session模式和Per job模式,现在已经在调度层次高度统一了。
另外,如果想要更好的复用进程,可以考虑加大TaskManager的超时释放:
resourcemanager.taskmanager-timeout = 900000
先说说并发:
我们在Blink内部实现了基于统计信息来推断并发的功能,但是其实以上的策略在大部分场景就够用了。
目前一个TaskManager里面含有多个Slot,在Batch作业中,一个Slot里只能运行一个Task (关闭SlotShare)。
对内存来说,单个TM会把Manage内存切分成Slot粒度,如果1个TM中有n个Slot,也就是Task能拿到1/n的manage内存。
我们在1.10做了重大的一个改进就是:Task中chain起来的各个operators按照比例来瓜分内存,所以现在配置的算子内存都是一个比例值,实际拿到的还要根据Slot的内存来瓜分。
这样做的一个重要好处是:
当然,为了运行的效率,我们一般建议单个Slot的manage内存应该大于500MB。
另一个事情,在1.10后,我们去除了OnHeap的manage内存,所以只有off-heap的manage内存。
强烈推荐Hive Catalog + Hive,这也是目前批处理最成熟的架构。在1.10中,除了对以前功能的完善以外,其它做了几件事:
得益于流批统一的架构,目前的流Connectors也能在batch上使用,比如HBase的Lookup和Sink、JDBC的Lookup和Sink、Elasticsearch的Sink,都可以在Batch无缝对接使用起来。
在1.10中,SQL-CLI也做了大量的改动,比如把SQL-CLI做了stateful,里面也支持了DDL,还支持了大量的DDL命令,给SQL-CLI暴露了很多TableEnvironment的能力,这让用户可以方便得多。后续,我们也需要对接JDBC的客户端,让用户可以更好的对接外部工具。但是SQL-CLI仍然待继续改进,比如目前仍然只支持Session模式,不支持Per Job模式。
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings .newInstance() .useBlinkPlanner() .inBatchMode() .build());
老的BatchTableEnv因为绑定了Dataset,而且区分Java和Scala,是不干净的设计方式,所以Blink planner只支持新的TableEnv。
TableEnv注册的source, sink, connector, functions,都是temporary的,重启之后即失效了。如果需要持久化的object,考虑使用HiveCatalog。
tEnv.registerCatalog(“hive”, hiveCatalog); tEnv.useCatalog(“hive”);
可以通过tEnv.sqlQuery来执行DML,这样可以获得一个Table,我们也通过collect来获得小量的数据:
Table table = tEnv.sqlQuery(“SELECT COUNT(*) FROM MyTable”); List<Row> results = TableUtils.collectToList(table); System.out.println(results);
可以通过tEnv.sqlUpdate来执行DDL,但是目前并不支持创建hive的table,只能创建Flink类型的table:
tEnv.sqlUpdate( "CREATE TABLE myResult (" + " cnt BIGINT" ") WITH (" + " 'connector.type'='jdbc'," …… ")");
可以通过tEnv.sqlUpdate来执行insert语句,Insert到临时表或者Catalog表中,比如insert到上面创建的临时JDBC表中:
tEnv.sqlUpdate(“INSERT INTO myResult SELECT COUNT(*) FROM MyTable”); tEnv.execute(“MyJob”);
当结果表是Hive表时,可以使用Overwrite语法,也可以使用静态Partition的语法,这需要打开Hive的方言:
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
目前Flink batch SQL仍然在高速发展中,但是1.10已经是一个可用的版本了,它在功能上、性能上都有很大的提升,后续还有很多有意思的features,等待着大家一起去挖掘。
原文链接
本文为阿里云原创内容,未经允许不得转载。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。