温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

flink中相关的知识点有哪些

发布时间:2021-12-31 13:46:49 来源:亿速云 阅读:135 作者:iii 栏目:大数据

本篇内容主要讲解“flink中相关的知识点有哪些”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink中相关的知识点有哪些”吧!

1.OperatorChain

1.1 OperatorChain的优点

1.1.1 减少线程切换 1,1.2 减少序列化与反序列化 1.1.3 减少数据在缓冲区的交换 1.1.4 减少延迟并且提高吞吐能力

1.2 OperatorChain组成条件

1.2.1 没有禁用Chain 1.2.2 上下游算子并行度一致 1.2.3 下游算子的入度为1(也就是说下游节点没有其他节点的输入) 1.2.4 上下游算子在同一个slot group 1.2.5 下游节点的chain策略为always(可以与上下游链接,map、flatmap、filter等默认是always) 1.2.6 上有节点的chain策略为always或head(只能与下游链接,不能与上有链接,source默认是head) 1.2.7 上下游算子之间没有数据shuffle(数据分区方式是forward)

1.3 禁用OperatorChain几种方式

1.3.1 DataStream的算子操作后调用startNewChain算子 1.3.2 DataStream调用disableChaining来关闭Chain 1.3.3 StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining() 全局关闭 1.3.4 DataStream.slotSharingGroup("name") 设置新的slotgrop名称 1.3.5 改变并行度

2.slot

2.1 slot与parallelism的关系

默认task slot数与join中task的最高并行度一致

2.2 共享slot

2.2.1 flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去 计算一个程序总共会起多少个task了 2.2.2 适当设置slotSharingGroup可以减少每个slot运行的线程数,从而整体上减少机器的负载

3.累加器和计数器

3.1 计数器是最简单的累加器 3.2 内置累加器有IntCounter,LongCounter,DoubleCounter 3.3 Histogram 柱状图

4.控制延迟

默认情况下,流中的元素并不会一个一个的在网络中传输(这会导致不必要的网络流量消耗) ,而是缓存起来,缓存的大小可以在Flink的配置文件、ExecutinEnvironment、设置某个算子 进行配置(默认100ms)这样控制的 好处:提高吞吐 坏处:增加了延迟

如何把握平衡: (1)为了最大吞吐量,可以设置setBufferTimeout(-1),这会移出timeout机制,缓存中的数据 一满就会被发送,不建议用,假如一条信息4 5个小时才来这时候延迟会非常高,会等整个buffer满了再处理 (2)为了最小延迟,可以将超时设置为接近0的数(例如5或者10ms) (3)缓存的超时不要设置0,因为会带来一些性能的损耗

5.min minby max maxby

min和minby的区别是min返回一个最小值,而minby返回的是其字段中包含的最小元素

6.interval join

在给定周期内,按照指定key对两个KeyedStream进行join操作,把符合join条件的两个event拉倒一起,然后怎么处理由用户自己定义 场景:把一定时间内的相关的分组数据拉成一个宽表

7.connect 和union

connect之后是connectedStreams,会对两个流的数据应用不同的处理方法,并且双流之间可以 共享状态(比如计数)。这在第一个流的输入会影响第二个流时会非常有用。 union合并多个流,新的流包含所有流的数据 union是DataStream->DataStream connect只能连接两个流,而union可以连接多余两个流 connect两个流类型可以不一致,而union连接的流类型必须一致

8.算子之间传递数据的方式

(1)One-to-one streams 保持元素的分区和顺序 (2)重新分区的方式 ,重新分区策略取决于使用的算子 keyby、broadcast、rebalance

dataStream.shuffle() 按均匀分布随机划分元素,网络开销往往比较大 dataStream.rebalance() 循环对元素进行分区,为每各分区创建相等负载,解决数据倾斜时非常有用 dataStream.rescale() 跟rebalance类似,但不是全局的,通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集 dataStream.broadcast() 将元素广播到每个分区上

9.flink三个时间的比较

9.1 EventTime

9.1.1 事件生成的时间,在进入Flink之气就存在,可以从event的字段中抽取 9.1.2 必须指定watermarks的生产方式 9.1.3 优势:确定性,乱序、延时、或者数据重放等情况,都能给出正确结果 9.1.4 弱点:处理无序事件时性能和延迟受到影响

9.2 IngerstTime

9.2.1 事件进入flink的时间,即source里获取的当前系统时间,后续统一使用该时间 9.2.2 不需要指定watermarks的生产范式(自动生成) 9.2.3 弱点:不能处理无序事件和延迟数据

9.3 ProcessingTime

9.3.1 执行操作的机器的当前系统时间(每个算子都不一样) 9.3.2 不需要流和机器之间的协调 9.3.3 优势:最佳的性能和最低的延迟 9.3.4 弱点:不确定性,容易受到各种因素影响(event产生的速度、到达flink的速度、算子之间传输速度),压根不管顺序和延迟

9.4 比较

性能:ProcessingTime>IngestTime>EventTime 延迟:ProcessingTime<IngestTime<EventTime 确定性:EventTime>IngestTime>ProcessIngTime

不设置time类型,默认是processingTime 通过 env.setStreamTimeCharacteristic()方法设置time类型

10.watermark

10.1 说明

10.1.1 通常情况下,watermark在source函数中生成,但也可以在source后任何阶段,如果指定多次 后面指定的会覆盖前面的值。source的每个sub task独立生成水位线。 10.1.2 watermark通过操作时会推进算子操作时的event time,同时会为下游生成一个新的watermark 10.1.3 多输入operator(union、keyby、partition)的当前event time是其输入流event time最小值

10.2 两种watermark

10.2.1 周期性 watermark

(1)基于时间 (2)ExecutionConfig.setAutoWatermarkInterval(msec) (默认200ms,设置watermarker发生的周期) (3)实现AssignerWithPeriodicWatermarks接口

10.2.2 间断的watermark

(1)基于某些时间出发watermark的生产和发送(由用户代码实现,例如遇到特殊情况) (2)实现AssignerWithPeriodicWatermarks接口

11 处理延迟数据

方式一:allowedLateness(),设定最大延迟时间,触发被延迟,不宜设置太大 方式二:sideOutputTag,提供了延迟数据获取的一种方式,这样就不会丢弃数据了,延迟数据单独处理。

同时侧输出流也是进行分流的一种方式,比如一个流可以分成多个不同的流sink到不同的目标端。

12 窗口

12.1 窗口的类型

12.1.1 Keyed Windows(在已经安装keyby分组的基础上(KeyedStream),再构建多任务并行window) stream.keyBy().window() 12.1.2 Non-Keyed Windwos(在未分组的DataStream上构建单任务Window,并行度是1,API都带ALL后缀) stream.windowAll()

12.2 窗口的生命周期

创建:当属于第一个元素到达时就会创建该窗口 销毁:当时间(event/process time)超过窗口的结束时间戳+用户指定的延迟时(allowedLateness<time>),窗口将会移除

13 触发器与驱逐器

13.1 触发器

触发器决定了一个窗口何时可以被窗口函数处理(条件满足时触发并发出信号) 每一个WindowAssigner都有一个默认的触发器,如果默认触发器不满足需要可以通过trigger()来指定

触发器有5个方法来允许触发器处理不同的事件(trigger) onElement()方法每个元素被添加到窗口是调用 onEvenTime() 当一个已注册的事件时间计时器启动时调用 onProcessingTime 当一个已注册的处理时间计时器启动时调用 onMerge 与状态触发器相关, 当使用session window时两个触发器对应的窗口合并,合并两个触发器的状态 clear相应窗口被清除时触发

13.2 驱逐器

evictor是可选的,WindowAssigner默认没有evictor evictor能够在Trigger触发之后以及在应用窗口函数执行前和/或后从窗口中删除无用的元素,类似filter作用 evictBefore在窗口之前应用 evictAfter在窗口后应用

14 如何允许延迟

14.1 当处理event-time的windwo时,可能会出现元素晚到的情况,即flink用来跟踪event-time进度的 watermark已经过了元素所属窗口的最后时间,属于当前窗口的数据才到达) 14.2 默认情况下,当watermark已经过了窗口的最后时间时,晚到的元素会被丢弃 14.3 Flink允许为窗口操作指定一个最大允许延时时长,Allowed lateness指定,默认情况是0 14.4 水位线已过了窗口最后时间才来的元素,如果还在未到窗口最后时间加延迟时间,任然可以在窗口中计算

特例:在使用GlobalWindows(全局window),不会考虑延迟,因为窗口的结束时间戳是Long.MAX_VALUE

15 state状态

Flink的状态:一般指一个具体的task/operator某时刻在内存中的的状态(例如某属性的值) 注意:State和checkpointing不要搞混 checkpoint 则表示了一个flink job ,在一个特定时一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态

15.1 状态的错用

15.1.1 增量计算 a)聚合操作 b)机器学习训练模型迭代运算时保持当前模型 15.1.2 容错 a)job故障重启 b)flink程序升级

15.2 状态的分类

15.2.1 Operator State 每个流普通的Operator的状态 15.2.2 Keyed State Keyed Streaming的状态 15.2.3 特殊的:Broadcast State(1.5开始)

Keyed State支持的数据结构 (1)ValueState (2)ListState (3)ReducingState (4)AggregatingState (5)FoldingState (6)MapState

注意: (1)状态不一定存储在内部,可能驻留在磁盘或其他地方 (2)状态是使用RunntimContext方法的,因此只能在Rich函数中访问

16 checkpoint状态容错

有了状态自然需要状态容错,否则就失去意义了,flink状态容错机制就是checkpoint checkpoint是通过分布式snapshot实现的,没有特殊声明时snapshot和checkpoint和back-up是一个意思

16.1 特点

(1)异步 (2)全量和增量都可以设置 (3)Barrier机制 (4)失败情况下可回滚到最近成功一次的checkpoint (5)周期性

16.2 使用checkpoint前置条件

(1)在一定时间内可回溯的datasource 例如:kafka、rabiitma、hdfs (2)可持久化存储state的存储系统,通常使用分布式文件系统,一般是hdfs,s3,nfs

checkmode:一般选择EXACTLY_ONCE,除非场景要求极低会选择AT_LEAST_ONCE(几毫秒)

16.3 checkpoint高级选项值保留策略

默认情况下检查点不会被保留,仅用于从故障中恢复作业。可以启用外部持久化检查点,同时指定保留策略 checkpointConfg.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) (1)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 在作业被取消时保留检查点。这种情况取消后必须手动清除检查点 (2)CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 在作业被取消(cancel)时会删除检查点,等于不启用。

setCheckpointTimeout 设置超时时间,超过时间没有完成checkpoint则被终止 setMinPauseBetweenCheckpoints 最小间隔,上一个checkpoint完成最少等待多久发出下一个checkpoint请求 setMaxConcurrentCheckpoints 指定运行中多少并行度进行checkpoint

16.4 使用checkpoint第二步 选择合适的State Backed

16.4.1 默认State保存在taskmanager的内存中 16.4.2 checkpoint机制会持久化所有状态的一致性快照 快照保存由State Backend来决定,目前flink自带三个State Backed: (1)MemoryStateBackend(默认) (2)FsStateBackend (3)RocksDBStateBackend

16.5 MemoryStateBackend

16.5.1 MemoryStateBackend是一个内部状态backend,用于维护Java堆上的状态。Key/value状态和窗口运算符包含存储值和计时器的哈希表 16.5.2 Checkpoint时,MemoryStateBackend会对state做一次快照,并像jobManager发送checkpoint确认完成的消息中带上此快照数据,然后快照会存储在JobManager的堆内存中 16.5.3 MemoeyStateBackend默认开启异步方式进行快照,推荐使用异步避免阻塞。如果要阻塞可以传false,如下 val memoryStateBackend:StateBackend=new MemoryStateBackend(1010241024,false) env.setStateBackend(memoryStateBackend) 16.5.4 限制:单个state默认5mb,可以在MemoryStateBackend的构造函数指定。不论如何设置,State大小无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小默认10mb)。Job Manager必须有足够内存 16.5.5 适用场景:本地开发和测试 小状态job,如只使用Map FlatMap Fliter或Kaka Consumer

16.6 FsStateBackend

16.6.1 FsStateBackend需要配置一个文件系统URL来,如hdfs://namenode:8080/flink/checkpoint 16.6.2 FsStateBackend在TaskManager的内存中持有正在处理的数据。checkpoint时将state snapshot写入文件系统目录下的文件中。 16.6.3 FsStateBackend默认开启异步方式进行快照,构造方法如下 val stateBackend:StateBackend=new FsStateBackend("hdfs://namenode:9000/flink/checkpoint",false) env.setStateBackend(stateBackend) 16.6.4 适用场景:大状态、长窗口、大键/值状态的job

16.7、RocksDBStateBackend

16.7.1 RocksDBStateBackend需要配置一个文件系统的URL。如hdfs://namenode:8080/flink/checkpoint 16.7.2 RocksDBStateBackend运行中的数据保存在RockDB数据库中,默认情况下存储在TaskManager数据目录中。 在Checkpoint时,整个RocksDB数据库将被checkpointed到配置的文件系统和目录中 16.7.3 RocksDBSateBackend 始终是异步 16.7.4 RocksDB JNI API是基于Byte[],因此key和value最大支持2^31个字节(2GB) 16.7.5 适用场景:超大窗口,超大状态,大键/值状态的job 16.7.6 只有RockDBStateBackend支持增量checkpoint 16.7.7 状态保存在数据块中,只受可用磁盘空间量限制,但开销更大(读/写需要反序列化与序列化),吞吐收到限制 使用需要导包:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
    val stateBackend:StateBackend=new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoint",true)
    env.setStateBackend(stateBackend)

配置重启策略 Flink支持不同的重启策略,这些策略控制在出现故障时如何重新启动job env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) (1)如果没用启动checkpoint,则使用无重启方案 (2)如果启用了checkpoint,但是没有配重启方案,则使用固定延迟策略,尝试次数是Integer.MAX_VALUE

到此,相信大家对“flink中相关的知识点有哪些”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI