安徽做公司网站哪家好,做网站可以使用rem单位吗,手机网站一般多宽,动态域名申请1 . 简述什么是Apache Flink #xff1f;
Apache Flink 是一个开源的基于流的有状态计算框架。它是分布式地执行的#xff0c;具备低延迟、高吞吐的优秀性能#xff0c;并且非常擅长处理有状态的复杂计算逻辑场景
2 . 简述Flink 的核心概念 #xff1f;
Flink 的核心概念…1 . 简述什么是Apache Flink
Apache Flink 是一个开源的基于流的有状态计算框架。它是分布式地执行的具备低延迟、高吞吐的优秀性能并且非常擅长处理有状态的复杂计算逻辑场景
2 . 简述Flink 的核心概念
Flink 的核心概念主要有四个Event Streams、State、Time 和 Snapshots。
1Event Streams即事件流事件流可以是实时的也可以是历史的。Flink 是基于流的但它不止能处理流也能处理批而流和批的输入都是事件流差别在于实时与批量。 2StateFlink 擅长处理有状态的计算。通常的复杂业务逻辑都是有状态的它不仅要处理单一的事件而且需要记录一系列历史的信息然后进行计算或者判断。 3Time最主要处理的问题是数据乱序的时候一致性如何保证。 4Snapshots实现了数据的快照、故障的恢复保证数据一致性和作业的升级迁移等。
3 . 简述Flink运行时的架构组件
Flink运行时架构主要包括四个不同的组件它们会在运行流处理应用程序时协同工作作业管理器 JobManager、资源管理器ResourceManager、任务管理器TaskManager以及分发器 Dispatcher。因为Flink是用Java和Scala实现的所以所有组件都会运行在Java虚拟机上。每个组件的职责如下 1作业管理器JobManager 控制一个应用程序执行的主进程也就是说每个应用程序都会被一个不同的JobManager所控制执行。 JobManager会先接收到要执行的应用程序这个应用程序会包括作业图JobGraph、逻辑数据流图 logical dataflow graph和打包了所有的类、库和其它资源的JAR包。JobManager会把JobGraph转换成一个物理层面的数据流图这个图被叫做“执行图”ExecutionGraph包含了所有可以并发执行的任 务。JobManager会向资源管理器ResourceManager请求执行任务必要的资源也就是任务管理器 TaskManager上的插槽slot。一旦它获取到了足够的资源就会将执行图分发到真正运行它们的 TaskManager上。而在运行过程中JobManager会负责所有需要中央协调的操作比如说检查点 checkpoints的协调。 2资源管理器ResourceManager 主要负责管理任务管理器TaskManager的插槽slotTaskManger插槽是Flink中定义的处理资源单 元。Flink为不同的环境和资源管理工具提供了不同资源管理器比如YARN、Mesos、K8s以及 standalone部署。当JobManager申请插槽资源时ResourceManager会将有空闲插槽的TaskManager分配 给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求它还可以向资源提供 平台发起会话以提供启动TaskManager进程的容器。另外ResourceManager还负责终止空闲的TaskManager释放计算资源。 3任务管理器TaskManager Flink中的工作进程。通常在Flink中会有多个TaskManager运行每一个TaskManager都包含了一定数量的 插槽slots。插槽的数量限制了TaskManager能够执行的任务数量。启动之后TaskManager会向资源 管理器注册它的插槽收到资源管理器的指令后TaskManager就会将一个或者多个插槽提供给 JobManager调用。JobManager就可以向插槽分配任务tasks来执行了。在执行过程中一个 TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。 4分发器Dispatcher 可以跨作业运行它为应用提交提供了REST接口。当一个应用被提交执行时分发器就会启动并将应用 移交给一个JobManager。由于是REST接口所以Dispatcher可以作为集群的一个HTTP接入点这样就能 够不受防火墙阻挡。Dispatcher也会启动一个Web UI用来方便地展示和监控作业执行的信息。 Dispatcher在架构中可能并不是必需的这取决于应用提交运行的方式。
4 . 简述Flink任务提交流程
作业提交流程 1 一般情况下由客户端App通过分发器提供的 REST 接口将作业提交给JobManager。 2由分发器启动 JobMaster并将作业包含 JobGraph提交给 JobMaster。 3JobMaster 将 JobGraph 解析为可执行的 ExecutionGraph得到所需的资源数量然后向资源管理器请求资源slots 4资源管理器判断当前是否有足够的可用资源如果没有启动新的 TaskManager。 5TaskManager 启动之后向 ResourceManager 注册自己的可用任务槽slots。 6资源管理器通知 TaskManager 为新的作业提供 slots。 7TaskManager 连接到对应的 JobMaster提供 slots。 8JobMaster 将需要执行的任务分发给 TaskManager。 9TaskManager 执行任务互相之间可以交换数据。 独立模式 独立模式下由于TaskManager 是手动启动的所以当 ResourceManager 收到 JobMaster 的请求时会直接要求 TaskManager 提供资源因此第(4)步与提交流程不同不会启动新的TaskManager YARN模式 会话模式 在会话模式下YARN session创建Flink集群 作业提交流程如下 1客户端通过 REST 接口将作业提交给分发器。 2分发器启动 JobMaster并将作业包含 JobGraph提交给 JobMaster。 3JobMaster 向资源管理器请求资源slots。 4资源管理器向 YARN 的资源管理器请求 container 资源。 5YARN 启动新的 TaskManager 容器。 6TaskManager 启动之后向 Flink 的资源管理器注册自己的可用任务槽。 7资源管理器通知 TaskManager 为新的作业提供 slots。 8TaskManager 连接到对应的 JobMaster提供 slots。 9JobMaster 将需要执行的任务分发给 TaskManager执行任务。 可以看到在YARN的session模式下请求资源时要“上报”YARN 的资源管理器 单作业模式 1客户端将作业提交给 YARN 的资源管理器这一步中会同时将 Flink 的 Jar 包和配置上传到 HDFS以便后续启动 Flink 相关组件的容器。 2YARN 的资源管理器分配 Container 资源启动 Flink JobManager并将作业提交给JobMaster。这里省略了 Dispatcher 组件。 3JobMaster 向资源管理器请求资源slots。 4资源管理器向 YARN 的资源管理器请求 container 资源。 5YARN 启动新的 TaskManager 容器。 6TaskManager 启动之后向 Flink 的资源管理器注册自己的可用任务槽。 7资源管理器通知 TaskManager 为新的作业提供 slots。 8TaskManager 连接到对应的 JobMaster提供 slots。 9JobMaster 将需要执行的任务分发给 TaskManager执行任务。 可见区别只在于 JobManager 的启动方式以及省去了分发器。当第 2 步作业提交给JobMaster之后的流程就与会话模式完全一样了 应用模式 应用模式与单作业模式的区别在于初始时提交给YARN资源管理器的不是一个作业而是一个应用应用中可以包含多个作业每个作业都会启动相应的JobMaster
5 . 简述Flink的窗口了解哪些都有什么区别有哪几种如何定义
1、Window概述 streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎而无限数据集是指一种不断增长 的本质上无限的数据集而window是一种切割无限数据为有限块进行处理的手段。 Window是无限数据流处理的核心Window将一个无限的stream拆分成有限大小的“buckets”桶我们可 以在这些桶上做计算操作。
2、Window类型 Window可以分成两类 CountWindow按照指定的数据条数生成一个Window与时间无关。 TimeWindow按照时间生成Window。 对于TimeWindow可以根据窗口实现原理的不同分成三类滚动窗口Tumbling Window、滑动窗口 Sliding Window和会话窗口Session Window。 1滚动窗口Tumbling Windows 将数据依据固定的窗口长度对数据进行切片。 特点时间对齐窗口长度固定没有重叠。 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中滚动窗口有一个固定的大小并且不会 出现重叠。 例如如果你指定了一个5分钟大小的滚动窗口窗口的创建如下图所示
适用场景适合做BI统计等做每个时间段的聚合计算。 2滑动窗口Sliding Windows 滑动窗口是固定窗口的更广义的一种形式滑动窗口由固定的窗口长度和滑动间隔组成。 特点时间对齐窗口长度固定可以有重叠。 滑动窗口分配器将元素分配到固定长度的窗口中与滚动窗口类似窗口的大小由窗口大小参数来配 置另一个窗口滑动参数控制滑动窗口开始的频率。因此滑动窗口如果滑动参数小于窗口大小的话 窗口是可以重叠的在这种情况下元素会被分配到多个窗口中。 例如你有10分钟的窗口和5分钟的滑动那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据
适用场景对最近一个时间段内的统计求某接口最近5min的失败率来决定是否要报警。 3会话窗口Session Window 由一系列事件组合一个指定时间长度的timeout间隙组成类似于web应用的session也就是一段时间没 有接收到新数据就会生成新的窗口。 特点时间无对齐。 session窗口分配器通过session活动来对元素进行分组session窗口跟滚动窗口和滑动窗口相比不会有 重叠和固定的开始时间和结束时间的情况相反当它在一个固定的时间周期内不再收到元素即非活 动间隔产生那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置这个session间隔定 义了非活跃周期的长度当这个非活跃周期产生那么当前的session将关闭并且后续的元素将被分配到 新的session窗口中去。
3、Window API 1TimeWindow TimeWindow是将指定时间范围内的所有数据组成一个window一次对一个window里面的所有数据进行 计算。 1滚动窗口 Flink 默认的时间窗口根据 Processing Time 进行窗口的划分将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。
时间间隔可以通过 Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。 2滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的只是在传参数时需要传入两个参数一个是 window_size一个是 sliding_size。 下面代码中的 sliding_size 设置为了 5s也就是说窗口每 5s 就计算一次每一次计算的 window 范围是 15s 内的所有元素。
时间间隔可以通过 Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。2CountWindow CountWindow根据窗口中相同 key 元素的数量来触发执行执行时只计算元素数量达到窗口大小的 key 对应的结果。 注意CountWindow 的 window_size 指的是相同 Key 的元素的个数不是输入的所有元素的总数。 1滚动窗口 默认的 CountWindow 是一个滚动窗口只需要指定窗口大小即可当元素数量达到窗口大小时就会触发窗口的执行。
2滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的只是在传参数时需要传入两个参数一个是 window_size一个是 sliding_size。 下面代码中的 sliding_size 设置为了 2也就是说每收到两个相同 key 的数据就计算一次每一次计算的 window 范围是 5 个元素。
4、窗口函数 window function 定义了要对窗口中收集的数据做的计算操作主要可以分为两类 增量聚合函数incremental aggregation functions 每条数据到来就进行计算保持一个简单的状态。典型的增量聚合函数有ReduceFunction AggregateFunction。 全窗口函数full window functions 先把窗口所有数据收集起来等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。 5、其它可选API trigger() —— 触发器 定义 window 什么时候关闭触发计算并输出结果 evitor() —— 移除器 定义移除某些数据的逻辑 allowedLateness() —— 允许处理迟到的数据sideOutputLateData() —— 将迟到的数据放入侧输出流getSideOutput() —— 获取侧输出流
6 . 简述Flink 的容错机制checkpoint
Checkpoint容错机制是Flink可靠性的基石可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时能够将整个应用流图的状态恢复到故障之前的某一状态保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamport algorithm”算法。
每个需要Checkpoint的应用在启动时Flink的JobManager为其创建一个 CheckpointCoordinator(检查点协调器)CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器)CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器) 周期性的向该流应用的所有source算子发送 barrier(屏障)。 当某个source算子收到一个barrier时便暂停数据处理过程然后将自己的当前状态制作成快照并保存到指定的持久化存储中最后向CheckpointCoordinator报告自己快照制作情况同时向自身所有下游算子广播该barrier恢复数据处理 下游算子收到barrier之后会暂停自己的数据处理过程然后将自身的相关状态制作成快照并保存到指定的持久化存储中最后向CheckpointCoordinator报告自身快照情况同时向自身所有下游算子广播该barrier恢复数据处理。 每个算子按照步骤3不断制作快照并向下游广播直到最后barrier传递到sink算子快照制作完成。 当CheckpointCoordinator收到所有算子的报告之后认为该周期的快照制作成功; 否则如果在规定的时间内没有收到所有算子的报告则认为本周期快照制作失败
7 . 简述checkpoint机制详细
1、窗口函数window function window function 定义了要对窗口中收集的数据做的计算操作主要可以分为两类 增量聚合函数incremental aggregation functions 每条数据到来就进行计算保持一个简单的状态。典型的增量聚合函数有ReduceFunction AggregateFunction。 全窗口函数full window functions 先把窗口所有数据收集起来等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。 2、时间语义
在Flink的流式处理中会涉及到时间的不同概念 Event Time是事件创建的时间。它通常由事件中的时间戳描述例如采集的日志数据中每一条日志都会记录自己的生成时间Flink通过时间戳分配器访问事件时间戳。 Ingestion Time是数据进入Flink的时间。 Processing Time是每一个执行基于时间操作的算子的本地系统时间与机器相关默认的时间属性就是Processing Time。 一个例子——电影《星球大战》
例如一条日志进入Flink的时间为2017-11-12 10:00:00.123到达Window的系统时间为2017-11-12 10:00:01.234日志的内容如下 2017-11-02 18:37:15.624 INFO Fail over to rm2 对于业务来说要统计1min内的故障日志个数哪个时间是最有意义的—— eventTime因为我们要根据日志的生成时间进行统计。 1EventTime的引入 在Flink的流式处理中绝大部分的业务都会使用eventTime一般只在eventTime无法使用时才会被迫 使用ProcessingTime或者IngestionTime。 如果要使用EventTime那么需要引入EventTime的时间属性引入方式如下所示 1 val env StreamExecutionEnvironment.getExecutionEnvironment 2 // 从调用时刻开始给env创建的每一个stream追加时间特征 3 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
9 . 简述介绍下Flink的watermark水位线watermark需要实现哪个实现类 在何处定义有什么作用
1、Watermark介绍及作用 我们知道流处理从事件产生到流经source再到operator中间是有一个过程和时间的虽然大部 分情况下流到operator的数据都是按照事件产生的时间顺序来的但是也不排除由于网络、分布式等 原因导致乱序的产生所谓乱序就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
那么此时出现一个问题一旦出现乱序如果只根据eventTime决定window的运行我们不能明确数据 是否全部到位但又不能无限期的等下去此时必须要有个机制来保证一个特定的时间后必须触发 window去进行计算了这个特别的机制就是Watermark。 Watermark是一种衡量Event Time进展的机制。 Watermark是用于处理乱序事件的而正确的处理乱序事件通常用Watermark机制结合window来 实现。 数据流中的Watermark用于表示timestamp小于Watermark的数据都已经到达了因此window 的执行也是由Watermark触发的。 Watermark可以理解成一个延迟触发机制我们可以设置Watermark的延时时长t每次系统会校验 已经到达的数据中最大的maxEventTime然后认定eventTime小于maxEventTime - t的所有数据都已经到达如果有窗口的停止时间等于maxEventTime – t那么这个窗口被触发执行。
当Flink接收到数据时会按照一定的规则去生成Watermark这条Watermark就等于当前所有到达数据中 的maxEventTime - 延迟时长也就是说Watermark是基于数据携带的时间戳生成的一旦Watermark比当前未触发的窗口的停止时间要晚那么就会触发相应窗口的执行。由于event time是由数据携带的 因此如果运行过程中无法获取新的数据那么没有被触发的窗口将永远都不被触发。 上图中我们设置的允许最大延迟到达时间为2s所以时间戳为7s的事件对应的Watermark是5s时间 戳为12s的事件的Watermark是10s如果我们的窗口1是1s5s窗口2是6s10s那么时间戳为7s的事件到达 时的Watermarker恰好触发窗口1时间戳为12s的事件到达时的Watermark恰好触发窗口2。 Watermark 就是触发前一窗口的“关窗时间”一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。 只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。 2、Watermark的使用 Watermark的两种生成方式 1 SourceFunction中产生将Timestamp的分配(也就是上文提到的离散化)和watermark的生成放在上 游同时sourceFunction中也有两个方法生成watermark 通过collectwithTimestamp方法发送数据和调用emitWatermark产生watermark,我们可以看到调用 collectwithTimestamp需要传入两个参数第一个参数就是数据第二次参数就是数据对应的时间戳这 样就完成了timestamp的分配调用emitWatermark生成watermark。 override def run(ctx: SourceContext[MyType]): Unit { while (/* condition */) { val next: MyType getNext() ctx.collectWithTimestamp(next if (next.hasWatermarkTime) { ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } }
2 DataStream API指定调用assignTimestampsAndWatermarks方法用于某些sourceFunction不支持的情况它能够接收不同的timestamp和watermark生成器说白了就是函数里面参数不同。 定期生成
val resultData logData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Long, String, Long)] { val maxOutOfOrderness 10000L var currentMaxTimestamp: Long _
override def getCurrentWatermark: Watermark { new Watermark(currentMaxTimestamp - maxOutOfOrderness) } // 根据数据本身的 Event time 来获取 override def extractTimestamp(element: (Long, String, Long), previousElementTimestamp: Long): Long { val timestamp element._1 currentMaxTimestamp Math.max(timestamp, currentMaxTimestamp) timestamp } })
标记生成 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long 60 * 1000 class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long 60 * 1000 } 区别定期指的是定时调用逻辑生成watermark而标记不是根据时间而是看到特殊记录表示接下来 的数据可能发不过来了分配timestamp 调用用户实现的watermark方法。 建议越靠近源端处理更容易进行判断。
10 . 简述Flink的窗口实现机制
1.窗口概述 在流处理应用中数据是连续不断的因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次但有时我们需要做一些聚合类的处理例如在过去的1分钟内有多少用户点击了我们的网页。在这种情况下我们必须定义一个时间窗口用来收集最近一分钟内的数据并对这个窗口内的数据进行计算。所以窗口就算将无限数据切割成有限的“数据块”进行处理。
流式计算是一种被设计用于处理无限数据集的数据处理引擎而无限数据集是指一种不断增长的本质上无限的数据集而Window窗口是一种切割无限数据为有限块进行处理的手段。
在Flink中, 窗口(window)是处理无界流的核心窗口把流切割成有限大小的多个存储桶(bucket), 我们在这些桶上进行计算
2.窗口分类 窗口分为两大类:
基于时间的窗口 时间窗口以时间点到来定义窗口的开始start和结束end所以截取出的就是某一时间段的数据。到达时间时窗口不再收集数据触发计算输出结果并将窗口关闭销毁 窗口大小 结束时间 - 开始时间 基于元素个数 基于元素的个数来截取数据到达固定的个数时就触发计算并关闭窗口 只需指定窗口大小就可以把数据分配到对应的窗口中
2-1.基于时间的窗口(时间驱动) 时间窗口包含一个开始时间戳和结束时间戳(前闭后开) 这两个时间戳一起限制了窗口的尺寸。
在代码中, Flink使用TimeWindow这个类来表示基于时间的窗口。这个类提供了key查询开始时间戳和结束时间戳的方法还提供了针对给定的窗口获取它允许的最大时间戳的方法maxTimestamp() 时间窗口有分为滚动窗口滑动窗口会话窗口。
2-1-1.滚动窗口(Tumbling Windows) 滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙。例如指定一个长度为5分钟的滚动窗口当前窗口开始计算每5分钟启动一个新的窗口。 滚动窗口能将数据流切分成不重叠的窗口每一个事件只能属于一个窗口。
tumbling-window:滚动窗口:sizeslide,如:每隔10s统计最近10s的数据
代码示例实验使用工具类BigdataUtil
package com.zenitera.bigdata.util; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List;
public class BigdataUtil { public static List toList(Iterable it) { List list new ArrayList(); for (T t : it) { list.add(t); } return list; }
public static String toDateTime(long ts) { return new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”).format(ts); } } 代码示例Time - Tumbling Windows
package com.zenitera.bigdata.window; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
import java.util.List;
/** •Time - Tumbling Windows */ public class Flink01_Window_Time_01 { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );
}) .keyBy(WaterSensor::getId) // 定义一个长度为5的滚动窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction() { //ProcessWindowFunction
Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {
List list BigdataUtil.toList(elements);
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(窗口: stt edt “, key:” key list);
} }) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} }
/* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 p1,3,10 w1,5,20 w1,5,20 w1,5,20 w1,5,20
窗口: 2023-03-22 14:52:05 2023-03-22 14:52:10, key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 14:52:20 2023-03-22 14:52:25, key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] 窗口: 2023-03-22 14:52:25 2023-03-22 14:52:30, key:p1 [WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 14:52:55 2023-03-22 14:53:00, key:w1 [WaterSensor(idw1, ts5, vc20)] 窗口: 2023-03-22 14:53:00 2023-03-22 14:53:05, key:w1 [WaterSensor(idw1, ts5, vc20), WaterSensor(idw1, ts5, vc20), WaterSensor(idw1, ts5, vc20)] */ 2-1-2.滑动窗口(Sliding Windows) 与滚动窗口一样 滑动窗口也是有固定的长度。另外一个参数我们叫滑动步长用来控制滑动窗口启动的频率。
如果滑动步长小于窗口长度滑动窗口会重叠 这种情况下一个元素可能会被分配到多个窗口中。
例如滑动窗口长度10分钟滑动步长5分钟 则每5分钟会得到一个包含最近10分钟的数据。 sliding-window:滑动窗口:sizeslide,如:每隔5s统计最近10s的数据 代码示例Time - Sliding Windows package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
import java.util.List;
/** •Time - Sliding Windows */ public class Flink01_Window_Time_02 { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );
}) .keyBy(WaterSensor::getId) //定义一个滑动窗口: 长度是5s, 滑动是2秒 .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .process(new ProcessWindowFunction() { //ProcessWindowFunction
Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {
List list BigdataUtil.toList(elements);
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(窗口: stt edt “, key:” key list);
} }) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} } /* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10
窗口: 2023-03-22 14:59:26 2023-03-22 14:59:31, key:a1 [WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 14:59:28 2023-03-22 14:59:33, key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 14:59:30 2023-03-22 14:59:35, key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 14:59:32 2023-03-22 14:59:37, key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 14:59:38 2023-03-22 14:59:43, key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] 窗口: 2023-03-22 14:59:40 2023-03-22 14:59:45, key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] 窗口: 2023-03-22 14:59:42 2023-03-22 14:59:47, key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] 窗口: 2023-03-22 14:59:52 2023-03-22 14:59:57, key:p1 [WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 14:59:54 2023-03-22 14:59:59, key:p1 [WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 15:00:04 2023-03-22 15:00:09, key:p1 [WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 15:00:06 2023-03-22 15:00:11, key:p1 [WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 15:00:08 2023-03-22 15:00:13, key:p1 [WaterSensor(idp1, ts3, vc10)] */ 2-1-3.会话窗口(Session Windows) 会话窗口分配器会根据活动的元素进行分组。会话窗口不会有重叠与滚动窗口和滑动窗口相比会话窗口也没有固定的开启和关闭时间。
如果会话窗口有一段时间没有收到数据会话窗口会自动关闭这段没有收到数据的时间就是会话窗口的gap(间隔)。
我们可以配置静态的gap也可以通过一个gap extractor 函数来定义gap的长度。当时间超过了这个gap当前的会话窗口就会关闭后序的元素会被分配到一个新的会话窗口。
创建原理 因为会话窗口没有固定的开启和关闭时间所以会话窗口的创建和关闭与滚动,滑动窗口不同。在Flink内部每到达一个新的元素都会创建一个新的会话窗口如果这些窗口彼此相距比较定义的gap小则会对他们进行合并。为了能够合并会话窗口算子需要合并触发器和合并窗口函数: ReduceFunction, AggregateFunction, or ProcessWindowFunction
代码示例Time - Session Windows package com.zenitera.bigdata.window; import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
import java.util.List;
/** •Time - Session Windows */ public class Flink01_Window_Time_03 { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”); return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) );
}) .keyBy(WaterSensor::getId) // 定义一个session窗口: gap是3s .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) .process(new ProcessWindowFunction() {
Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {
List list BigdataUtil.toList(elements);
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(窗口: stt edt “, key:” key list);
} }) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} } /* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10
窗口: 2023-03-22 15:04:59 2023-03-22 15:05:04, key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] 窗口: 2023-03-22 15:05:07 2023-03-22 15:05:12, key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] 窗口: 2023-03-22 15:05:16 2023-03-22 15:05:22, key:p1 [WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10)] 窗口: 2023-03-22 15:05:23 2023-03-22 15:05:26, key:p1 [WaterSensor(idp1, ts3, vc10)]
Process finished with exit code -1 */
2-2.基于元素个数的窗口(数据驱动) 按照指定的数据条数生成一个Window与时间无关 2-2-1.滚动窗口 默认的CountWindow是一个滚动窗口只需要指定窗口大小即可当元素数量达到窗口大小时就会触发窗口的执行。
代码示例
package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;
import java.util.List; /** •基于元素个数 - 滚动窗口 */ public class Flink02_Window_Count_01 { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1); env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”);
return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) // 定义长度为3的基于个数的滚动窗口 .countWindow(3) .process(new ProcessWindowFunction() { Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {
List list BigdataUtil.toList(elements); out.collect( key: key list);
} }) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} }
/* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10 p1,3,10 p1,3,10 w1,5,20 w1,5,20
key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] key:p1 [WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10)] key:p1 [WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10)] */
2-2-2.滑动窗口 滑动窗口和滚动窗口的函数名是完全一致的只是在传参数时需要传入两个参数一个是window_size一个是sliding_size。下面代码中的sliding_size设置为了2也就是说每收到两个相同key的数据就计算一次每一次计算的window范围最多是3个元素
代码示例
package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector;
import java.util.List;
/** •基于元素个数 - 滑动窗口 */ public class Flink02_Window_Count_02 { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”);
return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) // 定义长度为3(窗口内元素的最大个数), 滑动步长为2的的基于个数的滑动窗口 .countWindow(3, 2) .process(new ProcessWindowFunction() { Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception {
List list BigdataUtil.toList(elements); out.collect( key: key list);
} }) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} }
/* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10 p1,3,10 w1,5,20 w1,5,20 w2,6,22
key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] key:a1 [WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3), WaterSensor(ida1, ts1, vc3)] key:u1 [WaterSensor(idu1, ts2, vc4), WaterSensor(idu1, ts2, vc4)] key:p1 [WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10)] key:p1 [WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10), WaterSensor(idp1, ts3, vc10)] key:w1 [WaterSensor(idw1, ts5, vc20), WaterSensor(idw1, ts5, vc20)] */
2-3.全局窗口(Global Windows)(自定义触发器) 全局窗口分配器会分配相同key的所有元素进入同一个 Global window。这种窗口机制只有指定自定义的触发器时才有用。否则不会做任何计算因为这种窗口没有能够处理聚集在一起元素的结束点。
3.窗口函数 前面指定了窗口的分配器接着我们需要来指定如何计算这事由window function来负责。一旦窗口关闭window function 去计算处理窗口中的每个元素。 window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种。 ReduceFunctionAggregateFunction更加高效原因就是Flink可以对到来的元素进行增量聚合。ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器以及这些元素所属窗口的一些元数据信息。 ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前需要在内部缓存这个窗口上所有的元素。
3-1ProcessWindowFunction 代码示例
package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
/** •ProcessWindowFunction */ public class Flink03_Window_ProcessFunction { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”);
return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce( (ReduceFunction) (value1, value2) - { value1.setVc(value1.getVc() value2.getVc()); return value1; }, new ProcessWindowFunction() { Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { WaterSensor result elements.iterator().next();
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(stt edt result); } } ) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} } /* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10
2023-03-22 16:05:20 2023-03-22 16:05:25 WaterSensor(ida1, ts1, vc6) 2023-03-22 16:05:25 2023-03-22 16:05:30 WaterSensor(ida1, ts1, vc3) 2023-03-22 16:05:30 2023-03-22 16:05:35 WaterSensor(idu1, ts2, vc12) 2023-03-22 16:05:40 2023-03-22 16:05:45 WaterSensor(idp1, ts3, vc10) 2023-03-22 16:05:45 2023-03-22 16:05:50 WaterSensor(idp1, ts3, vc20) */
3-2.ReduceFunction 代码示例
package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
/** •ReduceFunction */ public class Flink03_Window_ReduceFunction { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”);
return new WaterSensor( data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce( (ReduceFunction) (value1, value2) - { value1.setVc(value1.getVc() value2.getVc()); return value1; }, new ProcessWindowFunction() { Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { WaterSensor result elements.iterator().next();
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(stt edt result); } } ) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
} } /* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10
2023-03-22 16:13:05 2023-03-22 16:13:10 WaterSensor(ida1, ts1, vc3) 2023-03-22 16:13:10 2023-03-22 16:13:15 WaterSensor(ida1, ts1, vc6) 2023-03-22 16:13:15 2023-03-22 16:13:20 WaterSensor(idu1, ts2, vc4) 2023-03-22 16:13:20 2023-03-22 16:13:25 WaterSensor(idu1, ts2, vc8) 2023-03-22 16:13:25 2023-03-22 16:13:30 WaterSensor(idp1, ts3, vc30) */
3-3.AggregateFunction 代码示例
package com.zenitera.bigdata.window;
import com.zenitera.bigdata.bean.WaterSensor; import com.zenitera.bigdata.util.BigdataUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;
public class Flink03_Window_AggregateFunction { public static void main(String[] args) { Configuration conf new Configuration(); conf.setInteger(“rest.port”, 2000); StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(1);
env .socketTextStream(“localhost”, 6666) .map(line - { String[] data line.split(“,”);
return new WaterSensor( String.valueOf(data[0]), Long.valueOf(data[1]), Integer.valueOf(data[2]) ); }) .keyBy(WaterSensor::getId) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate( new AggregateFunction() { Override public Avg createAccumulator() { return new Avg(); }
Override public Avg add(WaterSensor value, Avg acc) { acc.sum value.getVc(); acc.count; return acc; }
Override public Double getResult(Avg acc) { return acc.sum * 1.0 / acc.count; }
Override public Avg merge(Avg a, Avg b) { return null; } }, new ProcessWindowFunction() { Override public void process(String key, Context ctx, Iterable elements, Collector out) throws Exception { Double result elements.iterator().next();
String stt BigdataUtil.toDateTime(ctx.window().getStart()); String edt BigdataUtil.toDateTime(ctx.window().getEnd());
out.collect(key stt edt result); } } ) .print();
try { env.execute(); } catch (Exception e) { e.printStackTrace(); }
}
public static class Avg { public Integer sum 0; public Long count 0L; } }
/* D:\netcat-win32-1.12nc64.exe -lp 6666 a1,1,3 a1,1,3 a1,1,3 u1,2,4 u1,2,4 u1,2,4 p1,3,10 p1,3,10 p1,3,10
a1 2023-03-22 16:19:45 2023-03-22 16:19:50 3.0 a1 2023-03-22 16:19:50 2023-03-22 16:19:55 3.0 u1 2023-03-22 16:19:55 2023-03-22 16:20:00 4.0 u1 2023-03-22 16:20:00 2023-03-22 16:20:05 4.0 p1 2023-03-22 16:20:05 2023-03-22 16:20:10 10.0 p1 2023-03-22 16:20:10 2023-03-22 16:20:15 10.0 */
11 . 一个 Flink 任务中可以既有事件时间窗口又有处理时间窗口吗
结论一个 Flink 任务可以同时有事件时间窗口又有处理时间窗口。 那么有些小伙伴们问了为什么我们常见的 Flink 任务要么设置为事件时间语义要么设置为处理时间语义 确实在生产环境中我们的 Flink 任务一般不会同时拥有两种时间语义的窗口。 那么怎么解释开头所说的结论呢 这里从两个角度进行说明 1.⭐ 我们其实没有必要把一个 Flink 任务和某种特定的时间语义进行绑定。对于事件时间窗口来说我们只要给它 watermark能让 watermark 一直往前推进让事件时间窗口能够持续触发计算就行。对于处理时间来说更简单只要窗口算子按照本地时间按照固定的时间间隔进行触发就行。无论哪种时间窗口主要满足时间窗口的触发条件就行。 2.⭐ Flink 的实现上来说也是支持的。Flink 是使用一个叫做 TimerService 的组件来管理 timer 的我们可以同时注册事件时间和处理时间的 timerFlink 会自行判断 timer 是否满足触发条件如果是则回调窗口处理函数进行计算。需求数据源用户心跳日志uidtimetype。计算分 AndroidiOS 的 DAU最晚一分钟输出一次当日零点累计到当前的结果。 3.⭐ 实现方式 1cumulate 窗口 优点如果是曲线图的需求可以完美回溯曲线图。 缺点大窗口之间如果有数据乱序有丢数风险并且由于是 watermark 推动产出所以数据产出会有延迟。 1.⭐ 实现方式 2Deduplicate 优点计算快。 缺点任务发生 failover曲线图不能很好回溯。没法支持 cube 计算。 1.⭐ 实现方式 3group agg 优点计算快支持 cube 计算。 缺点任务发生 failover曲线图不能很好回溯
12 . 简述作业在很多情况下有可能会失败。失败之后重新去运行时我们如何保证数据的一致性
Fink 基于 Chandv-Lampot 算法会把分布式的每一个节点的状态保存到分布式文件系统里面作为 Checkpoint(检点)过程大致如下。首先从数据源端开始注入 Checkpoint Barrier它是一种比较特殊的消息。 然后它会跟普通的事件一样随着数据流去流动当 Barrier 到达算子之后这个算子会把它当前的本地状态进行快照保存当 Barrier流动到 Sink所有的状态都保存完整了之后它就形成一个全局的快照。 这样当作业失败之后就可以通过远程文件系统里面保存的 Checkpoint 来进行回滚:先把 Source 回滚到 Checkpoint 记录的ofset然后把有状态节点当时的状态回滚到对应的时间点进行重新计算。这样既可以不用从头开始计算又能保证数据语义的一致性。
13 . 简述Flink的CEP
CEP的概念 Ø复杂事件处理Complex Event Processing用于识别输入流中符合指定规则的事件并按照指定方式输出。 Ø起床—洗漱—吃饭—上班一系列串联起来的事件流形成的模式 Ø浏览商品—加入购物车—创建订单—支付完成—发货—收货事件流形成的模式。 通过概念可以了解CEP主要是识别输入流中用户指定的一些基本规则的事件然后将这些事件再通过指定方式输出。 如下图所示 我们指定“方块、圆”为基本规则的事件在输入的原始流中将这些事件作为一个结果流输出来。 CEP的使用场景 像用户异常检测我们指定异常操作事件为要输出的结果流策略营销指定符合要求的事件为结果流运维监控指定一定范围的指标为结果流银行卡盗刷指定同一时刻在两个地方被刷两次为异常结果流。 在Flink CEP API中主要通过Pattern 类来进行实现。CEP模式主要分为三种模式 1、个体模式 Ø单例模式只接收一个事件 p 触发条件 (.where()/.or()/.until()) Ø循环模式可以接收一个或多个事件 p单例 量词在个体模式后追加量词指定循环次数 个体模式分为单例模式和循环模式 单例模式只接收一个事件主要通过(.where()/.or()/.until())等条件触发使用规则如下start为定义的变量。 循环模式可以接收一个或多个事件使用规则为 单例量词如下图所示 2、组合模式多个个体模式的组合 Ø严格连续(next) Ø中间没有任何不匹配的事件 Ø宽松连续(followBy) Ø忽略匹配的事件之间的不匹配的事件 Ø不确定的宽松连续(followByAny) Ø一个匹配的事件能够再次使用 组合模式主要分为三种使用规则但是在使用组合模式之前必须以初始模式开始使用begin() 控制如下图 1严格连续通过next()方法控制这句话是指用户定义的基本事件如最上图中的方块必须是连续都是方块不能出圆圈。可以通过下图理解。 2宽松连续通过followBy()控制中间可以有不匹配的事件 3不确定的宽松连续通过followByAny()方法控制表示一个匹配的事件可以多次被使用 组合模式还包含“不希望出现某种连续关系” .notNext()—— 不想让某个事件严格近邻前一个事件发生 .notFollowedBy()——不想让某个事件在两个事件之间发生 组合模式注意事项 所有组合模式必须以.begin()开始组合模式不能以.notFollowedBy()结束not类型的模式不能被optional所修饰此外还可以为模式指定时间约束用来要求在多长时间内匹配有效。 3、模式组 Ø一个组合模式作为条件嵌套在个体模式里pPattern(abc) 为例帮助大家更好的理解CEP的API使用接下里通过两个案例对CEP的使用进行讲解。 a电商案例——条件创建订单之后15分钟之内一定要付款否则取消订单 案例介绍在电商系统当中经常会发现有些订单下单之后没有支付就会有一个倒计时的时间值提示你在15分钟内完成支付如果没有完成支付那么该订单就会被取消主要是因为拍下订单就会减库存但是如果一直没有支付那么就会造成库存没有了后面购买的时候买不到。 在CEP概念介绍时引入过一条链路 Ø浏览商品—加入购物车—创建订单—支付完成—发货—收货事件流形成的模式。 上述模式是标准的电商事件流我们案例中定义的CEP规则是 创建订单—支付完成 这两个环节要求是判断15分钟内是否付款否则订单取消。 代码设计 Ø1定义实体类订单编号订单状态1 创建订单等待支付2支付订单完成3取消订单申请退款4已发货5确认收货已经完成订单创建时间订单金额 Ø2创建运行环境设置流时间特性为事件时间设置并行度 Ø3设置Source源 Ø4定义Pattern模式指定条件 在Pattern模式中我们使用到了个体模式组合模式 首先使用begin()方法以模式作为开始 其次由于创建的CEP规则是 创建订单—支付完成 则在单例模式下通过where条 件触发订单的状态已将处于 1 创建订单等待支付状态 接着通过宽松连续followBy()方法忽略15分钟内的产生的其他订单所以使用宽松连续。 然后通过where条件触发订单的状态是否已将处于 2 支付完成。 最后通过within()方法判断第二次触发的订单是否在支付的15分钟内完成。 Ø5订单超时检测 Ø6运行结果对比 查看最终的运行结果可以得知201608041140… 这个订单的状态为1且没有其他状态最终被系统判断为超时订单。 b系统登录案例——当2秒内出现两次登录失败“fail”时输出异常报警信息 同样根据上述信息我们做以下代码设计 Ø1设置实体类用户ID登录IP事件类型事件时间 Ø2设置环境指定source源传入实体类 Ø3定义pattern匹配模式规则指定条件 begin: 第一次where: 出现 failnext 紧接着第二次出现 failtimes: 出现一次 within:表示在2秒内。 Ø4结果对比 查看最终的运行结果可以得知服务器编号为1连续2秒在两个服务器登录登录失败服务器编号为3连续2秒在一台服务器登录登录失败
14 . 简述作业在很多情况下有可能会失败。失败之后重新去运行时我们如何保证数据的一致性
Flink 基于 Chandy-Lamport 算法会把分布式的每一个节点的状态保存到分布式文件系统里面作为 Checkpoint检查点过程大致如下。首先从数据源端开始注入 Checkpoint Barrier它是一种比较特殊的消息。
然后它会跟普通的事件一样随着数据流去流动当 Barrier 到达算子之后这个算子会把它当前的本地状态进行快照保存当 Barrier 流动到 Sink所有的状态都保存完整了之后它就形成一个全局的快照。
这样当作业失败之后就可以通过远程文件系统里面保存的 Checkpoint 来进行回滚先把 Source 回滚到 Checkpoint 记录的 offset然后把有状态节点当时的状态回滚到对应的时间点进行重新计算。这样既可以不用从头开始计算又能保证数据语义的一致性。
15 . 简述Flink Checkpoint与 Spark 的相比Flink 有什么区别或优势吗
Spark Streaming 的 Checkpoint 仅仅是针对 Driver 的故障恢复做了数据和元数据的 Checkpoint。而 Flink 的 Checkpoint 机制要复杂了很多它采用的是轻量级的分布式快照实现了每个算子的快照及流动中的数据的快照
16 . 简述Flink 中的 Time 有哪几种
Flink中的时间有三种类型
Event Time是事件创建的时间。它通常由事件中的时间戳描述例如采集的日志数据中每一条日志都会记录自己的生成时间Flink通过时间戳分配器访问事件时间戳。 Ingestion Time是数据进入Flink的时间。 Processing Time是每一个执行基于时间操作的算子的本地系统时间与机器相关默认的时间属性就是Processing Time。 例如一条日志进入Flink的时间为2021-01-22 10:00:00.123到达Window的系统时间为2021-01-22 10:00:01.234日志的内容如下 2021-01-06 18:37:15.624 INFO Fail over to rm2
对于业务来说要统计1min内的故障日志个数哪个时间是最有意义的—— eventTime因为我们要根据日志的生成时间进行统计。
17 . 简述Flink对于迟到数据是怎么处理的
Flink中 WaterMark 和 Window 机制解决了流式数据的乱序问题对于因为延迟而顺序有误的数据可以根据eventTime进行业务处理对于延迟的数据Flink也有自己的解决办法主要的办法是给定一个允许延迟的时间在该时间范围内仍可以接受处理延迟数据
设置允许延迟的时间是通过allowedLateness(lateness: Time)设置 保存延迟数据则是通过sideOutputLateData(outputTag: OutputTag[T])保存 获取延迟数据是通过DataStream.getSideOutput(tag: OutputTag[X])获取
18 . 简述Flink 的运行必须依赖 Hadoop 组件吗
Flink可以完全独立于Hadoop在不依赖Hadoop组件下运行。但是做为大数据的基础设施Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件例如Yarn、Hbase、HDFS等等。例如Flink可以和Yarn集成做资源调度也可以读写HDFS或者利用HDFS做检查点
19 . 简述Flink集群有哪些角色各自有什么作用
有以下三个角色
JobManager处理器
也称之为Master用于协调分布式执行它们用来调度task协调检查点协调失败时恢复等。Flink运行时至少存在一个master处理器如果配置高可用模式则会存在多个master处理器它们其中有一个是leader而其他的都是standby。
TaskManager处理器
也称之为Worker用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和data stream的交换Flink运行时至少会存在一个worker处理器。
Clint客户端
Client是Flink程序提交的客户端当用户提交一个Flink程序时会首先创建一个Client该Client首先会对用户提交的Flink程序进行预处理并提交到Flink集群中处理所以Client需要从用户提交的Flink程序配置中获取JobManager的地址并建立到JobManager的连接将Flink Job提交给JobManager
20 . 简述Flink 资源管理中 Task Slot 的概念
在Flink中每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过task slot任务槽来进行控制一个worker至少有一个task slot
21 . 简述Flink的重启策略了解吗
Flink支持不同的重启策略这些重启策略控制着job失败后如何重启
1固定延迟重启策略 固定延迟重启策略会尝试一个给定的次数来重启Job如果超过了最大的重启次数Job最终将失败。在连续的两次重启尝试之间重启策略会等待一个固定的时间。
2失败率重启策略 失败率重启策略在Job失败后会重启但是超过失败率后Job会最终被认定失败。在两个连续的重启尝试之间重启策略会等待一个固定的时间。
3无重启策略 Job直接失败不会尝试进行重启。
22 . 简述Flink 是如何保证 Exactly-once 语义的
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤
开始事务beginTransaction创建一个临时文件夹来写把数据写入到这个文件夹里面 预提交preCommit将内存中缓存的数据写入文件并关闭 正式提交commit将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟丢弃abort丢弃临时文件 若失败发生在预提交成功后正式提交前。可以根据状态来提交预提交的数据也可删除预提交的数据。
23 . 简述Flink如何保证精确一次性消费
Flink 保证精确一次性消费主要依赖于两种Flink机制
1、Checkpoint机制
2、二阶段提交机制
Checkpoint机制 主要是当Flink开启Checkpoint的时候会往Source端插入一条barrir然后这个barrir随着数据流向一直流动当流入到一个算子的时候这个算子就开始制作checkpoint制作的是从barrir来到之前的时候当前算子的状态将状态写入状态后端当中。然后将barrir往下流动当流动到keyby 或者shuffle算子的时候例如当一个算子的数据依赖于多个流的时候这个时候会有barrir对齐也就是当所有的barrir都来到这个算子的时候进行制作checkpoint依次进行流动当流动到sink算子的时候并且sink算子也制作完成checkpoint会向jobmanager 报告 checkpoint n 制作完成。
二阶段提交机制 Flink 提供了CheckpointedFunction与CheckpointListener这样两个接口CheckpointedFunction中有snapshotState方法每次checkpoint触发执行方法通常会将缓存数据放入状态中可以理解为一个hook这个方法里面可以实现预提交CheckpointListyener中有notifyCheckpointComplete方法checkpoint完成之后的通知方法这里可以做一些额外的操作。例如FLinkKafkaConumerBase使用这个来完成Kafka offset的提交在这个方法里面可以实现提交操作。在2PC中提到如果对应流程例如某个checkpoint失败的话那么checkpoint就会回滚不会影响数据一致性那么如果在通知checkpoint成功的之后失败了那么就会在initalizeSate方法中完成事务的提交这样可以保证数据的一致性。最主要是根据checkpoint的状态文件来判断的。
24 . 简述Flink的状态可以用来做什么
Flink状态主要有两种使用方式
1checkpoint的数据恢复 2逻辑计算
25 . 简述Flink的Checkpoint底层如何实现的savepoint和checkpoint有什么区别
1、Checkpoint Flink中基于异步轻量级的分布式快照技术提供了Checkpoints容错机制分布式快照可以将同一时间点 Task/Operator的状态数据全局统一快照处理包括Keyed State和Operator StateState后面也接着介绍了。如下图所示Flink会在输入的数据集上间隔性地生成checkpoint barrier通过栅栏barrier将间隔时间段内的数据划分到相应的checkpoint中。当应用出现异常时Operator就能够从上一次快照中 恢复所有算子之前的状态从而保证数据的一致性。例如在Kafka Consumer算子中维护Ouset状态当系统出现问题无法从Kafka中消费数据时可以将Ouset记录在状态中当任务重新恢复时就能够从指定 的偏移量开始消费数据。对于状态占用空间比较小的应用快照产生过程非常轻量高频率创建且对 Flink任务性能影响相对较小。checkpoint过程中状态数据一般被保存在一个可配置的环境中通常是在 JobManager节点或HDFS上。
checkpoint原理就是连续绘制分布式的快照而且非常轻量级可以连续绘制并且不会对性能产生太 大影响。
默认情况下Flink不开启检查点的用户需要在程序中通过调用enable-Checkpointing(n)方法配置和开启 检查点其中n为检查点执行的时间间隔单位为毫秒。 2、savepoint和checkpoint区别 Savepoints是检查点的一种特殊实现底层其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint并将结果持久化到指定的存储路径中其主要目的是帮助用户在升级和维 护集群过程中保存系统中的状态数据避免因为停机运维或者升级应用等正常终止应用的操作而导致系 统无法恢复到原有的计算状态的情况从而无法实现端到端的Excatly-Once语义保证。 下面这张来自Flink 1.1版本文档的图示出了checkpoint和savepoint的关系。
总结如下所示 checkpoint的侧重点是“容错”即Flink作业意外失败并重启之后能够直接从早先打下的 checkpoint恢复运行且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”即Flink作业需 要在人工干预下手动重启、升级、迁移或A/B测试时先将状态整体写入可靠存储维护完毕之后 再从savepoint恢复现场。 savepoint是“通过checkpoint机制”创建的所以savepoint本质上是特殊的checkpoint。 checkpoint面向Flink Runtime本身由Flink的各个TaskManager定时触发快照并自动清理一般不需要用户干预savepoint面向用户完全根据用户的需要触发与清理。 checkpoint的频率往往比较高因为需要尽可能保证作业恢复的准确度所以checkpoint的存储 格式非常轻量级但作为trade-ou牺牲了一切可移植portable的东西比如不保证改变并行度 和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据执行起来比较慢而且 “贵”但是能够保证portability如并行度改变或代码升级之后仍然能正常恢复。 checkpoint是支持增量的通过RocksDB特别是对于超大状态的作业而言可以降低写入成本。 savepoint并不会连续自动触发所以savepoint没有必要支持增量
26 . 简述Flink的Checkpoint流程
Checkpoint由JM的Checkpoint Coordinator发起 第一步Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint
第二步source 节点向下游广播 barrier这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。
第三步当 task 完成 state 备份后会将备份数据的地址state handle通知给 Checkpoint coordinator。
这里分为同步和异步如果开启的话两个阶段 1同步阶段task执行状态快照并写入外部存储系统根据状态后端的选择不同有所区别 执行快照的过程 对state做深拷贝 将写操作封装在异步的FutureTask中 FutureTask的作用包括 打开输入流 写入状态的元数据信息 写入状态关闭输入流 2异步阶段 执行同步阶段创建的FutureTask 向Checkpoint Coordinator发送ACK响应
第四步下游的 sink 节点收集齐上游两个 input 的 barrier 之后会执行本地快照这里特地展示了RocksDB incremental Checkpoint 的流程首先 RocksDB 会全量刷数据到磁盘上红色大三角表示 然后 Flink 框架会从中选择没有上传的文件进行持久化备份紫色小三角。 同样的sink 节点在完成自己的 Checkpoint 之后会将 state handle 返回通知 Coordinator。
最后当 Checkpoint coordinator 收集齐所有 task 的 state handle就认为这一次的 Checkpoint 全局完成了向持久化存储中再备份一个 Checkpoint meta 文件。
27 . 简述Flink Checkpoint的作用
Checkpoint 某一时刻Flink中所有的Operator的当前State的全局快照一般存在磁盘上。 CheckPoint是通过快照SnapShot的方式使得将历史某些时刻的状态保存下来当Flink任务意外挂掉 之后重新运行程序后默认从最近一次保存的完整快照处进行恢复任务。 Flink中的Checkpoint底层使用了 Chandy-Lamport algorithm 分布式快照算法可以保证数据的在分布式环境下的一致性。
28 . 简述Flink中Checkpoint超时原因
1计算量大CPU密集性导致TM内线程一直在processElement而没有时间做Checkpoint 解决方案过滤掉部分数据增大并行度 修改实现逻辑进行批流分开计算比如离线数据每半个小时进行一次计算而实时计算只需要计算最 近半小时内的数据即可。总之两个方法一、减少源数据量过滤黑名单或者非法IDwindow聚合 二、简化处理逻辑特别是减少遍历。 2数据倾斜 解决方案 第一两阶段聚合第二重新设置并行度改变KeyGroup的分布 3频繁FULL GC‘Checkpoint Duration (Async)’时间长 当StateSize达到200M以上Async的时间会超过1min。 这种情况比较少见。 4出现反压 包含barrier的event buuer一直不到 subTaskCheckpointCoordinator做不了Checkpoint就会超时
29 . 简述Flink的Exactly Once语义怎么保证
下级存储支持事务Flink可以通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤 1开始事务beginTransaction创建一个临时文件夹来写把数据写入到这个文件夹里面 2 预提交preCommit将内存中缓存的数据写入文件并关闭 3正式提交commit将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟 4丢弃abort丢弃临时文件 5若失败发生在预提交成功后正式提交前。可以根据状态来提交预提交的数据也可删除预提交的 数据。 下级存储不支持事务 具体实现是幂等写入需要下级存储具有幂等性写入特性
30 . 简述Flink的端到端Exactly Once
1、Flink的Exactly-Once语义 对于Exactly-Once语义指的是每一个到来的事件仅会影响最终结果一次。就算机器宕机或者软件崩 溃即没有数据重复也没有数据丢失。 这里有一个关于checkpoint算法的简要介绍这对于了解更广的主题来说是十分必要的。 一个checkpoint是Flink的一致性快照它包括 程序当前的状态输入流的位置 Flink通过一个可配置的时间周期性的生成checkpoint将它写入到存储中例如S3或者HDFS。写入到 存储的过程是异步的意味着Flink程序在checkpoint运行的同时还可以处理数据。 在机器或者程序遇到错误重启的时候Flink程序会使用最新的checkpoint进行恢复。Flink会恢复程序的 状态将输入流回滚到checkpoint保存的位置然后重新开始运行。这意味着Flink可以像没有发生错误 一样计算结果。 在Flink 1.4.0版本之前Flink仅保证Flink程序内部的Exactly-Once语义没有扩展到在Flink数据处理完成后存储的外部系统。 Flink程序可以和不同的接收器sink交互开发者需要有能力在一个组件的上下文中维持Exactly-Once 语义。 为了提供端到端Exactly-Once语义除了Flink应用程序本身的状态Flink写入的外部存储也需要满足这 个语义。也就是说这些外部系统必须提供提交或者回滚的方法然后通过Flink的checkpoint来协调。 在分布式系统中协调提交和回滚的通用做法是两阶段提交。Flink的TwoPhaseCommitSinkFunction使 用两阶段提交协议来保证端到端的Exactly-Once语义。 2、Flink程序端到端的Exactly-Once语义
Kafka是一个流行的消息中间件经常被拿来和Flink一起使用Kafka 在最近的0.11版本中添加了对事务的支持。这意味着现在Flink读写Kafka有了必要的支持使之能提供端到端的Exactly-Once语义。 Flink对端到端的Exactly-Once语义不仅仅局限在Kafka你可以使用任一输入输出源source、sink只 要他们提供了必要的协调机制。例如Pravega 来自DELL/EMC的流数据存储系统通过Flink的 TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。 在这个示例程序中有 从Kafka读取数据的data sourceKafkaConsumer在Flink中 窗口聚合 将数据写回到Kafka的data sinkKafkaProducer在Flink中 在data sink中要保证Exactly-Once语义它必须将所有的写入数据通过一个事务提交到Kafka。在两个 checkpoint之间一个提交绑定了所有要写入的数据。 这保证了当出错的时候写入的数据可以被回滚。 然而在分布式系统中通常拥有多个并行执行的写入任务简单的提交和回滚是效率低下的。为了保证 一致性所有的组件必须先达成一致才能进行提交或者回滚。Flink使用了两阶段提交协议以及预提交 阶段来解决这个问题。 在checkpoint开始的时候即两阶段提交中的预提交阶段。首先Flink的JobManager在数据流中注入一 个checkpoint屏障它将数据流中的记录分割开一些进入到当前的checkpoint另一些进入下一个checkpoint。 屏障通过operator传递。对于每一个operator它将触发operator的状态快照写入到state backend data source保存了Kafka的ouset之后把checkpoint屏障传递到后续的operator。
这种方式仅适用于operator有他的内部状态。内部状态是指Flink state backends保存和管理的内容-举例来说第二个operator中window聚合算出来的sum。当一个进程有它的内部状态的时候除了在 checkpoint之前将需要将数据更改写入到state backend不需要在预提交阶段做其他的动作。在 checkpoint成功的时候Flink会正确的提交这些写入在checkpoint失败的时候会终止提交 然而当一个进程有外部状态的时候需要用一种不同的方式来处理。外部状态通常由需要写入的外部 系统引入例如Kafka。因此为了提供Exactly-Once保证外部系统必须提供事务支持借此和两阶段 提交协议交互。 在这个例子中由于需要将数据写到Kafkadata sink有外部的状态。因此在预提交阶段除了将状态写入到state backend之外data sink必须预提交自己的外部事务。
当checkpoint屏障在所有operator中都传递了一遍以及它触发的快照写入完成预提交阶段结束。这个 时候快照成功结束整个程序的状态包括预提交的外部状态是一致的。万一出错的时候我们可以 通过checkpoint重新初始化。 下一步是通知所有operatorcheckpoint已经成功了。这时两阶段提交中的提交阶段Jobmanager为程 序中的每一个operator发起checkpoint已经完成的回调。data source和window operator没有外部的状态在提交阶段中这些operator不会执行任何动作。data sink拥有外部状态所以通过事务提交外部写入。
对上述的知识点汇总一下 一旦所有的operator完成预提交就提交一个commit。 如果至少有一个预提交失败其他的都会失败这时回滚到上一个checkpoint保存的位置。 预提交成功后提交的commit也需要保障最终成功-operator和外部系统需要提供这个保障。如果 commit失败了比如网络中断引起的故障整个flink程序也因此失败它会根据用户的重启策 略重启可能还会有一个尝试性的提交。这个过程非常严苛因为如果提交没有最终生效会导致 数据丢失。 因此我们可以确定所有的operator同意checkpoint的最终结果要么都同意提交数据要么提交被终止 然后回滚。
31 . 简述Flink的水印Watermark有哪几种
水印Watermark用于处理乱序事件而正确地处理乱序事件通常用Watermark机制结合窗口来实 现。 从流处理原始设备产生事件到Flink读取到数据再到Flink多个算子处理数据在这个过程中会受到 网络延迟、数据乱序、背压、Failover等多种情况的影响导致数据是乱序的。虽然大部分情况下没有问 题但是不得不在设计上考虑此类异常情况为了保证计算结果的正确性需要等待数据这带来了计 算的延迟。对于延迟太久的数据不能无限期地等下去所以必须有一个机制来保证特定的时间后一 定会触发窗口进行计算这个触发机制就是Watermark。 在DataStream和Flink Table SQL模块中使用了各自的Watermark生成体系。 1、DataStream Watermark生成 通常Watermark在Source Function中生成如果是并行计算的任务在多个并行执行的Source Function 中相互独立产生各自的Watermark。而Flink提供了额外的机制允许在调用DataStream API操作如 map、filter等之后根据业务逻辑的需要使用时间戳和Watermark生成器修改数据记录的时间戳和 Watermark。 1 Source Function中生成Watermark Source Function可以直接为数据元素分配时间戳同时也会向下游发送Watermark。在Source Function中为数据分配了时间戳 和Watermark就不必在DataStream API中使用了。需要注意的是:如果一个timestamp分配器被使用的话 由源提供的任何Timestamp和Watermark都会被重写。
为了通过SourceFunction直接为一个元素分配一个时间戳SourceFunction需要调用SourceContext中的collectWithTimestamp…方法。为了生成Watermark源需要调用emitWatermarkWatermark方 法如以下代码所示。
2 DataStream API中生成Watermark DataStream API中使用的TimestampAssigner 接口定义了时间戳的提取行为其有两个不同接口 AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks分别代表了不同的Watermark生 成策略。TimestampAssigner接口体系如下图所示。
AssignerWithPeriodicWatermarks 是周期性生成Watermark策略的顶层抽象接口该接口的实现类周期性地生成Watermark而不会针对每一个事件都生成。 AssignerWithPunctuatedWatermarks 对每一个事件都会尝试进行Watermark的生成但是如果生成的Watermark是null或者Watermark小于之前的Watermark则该Watermark不会发往下游因为发往下游 也不会有任何效果不会触发任何窗口的执行。 2、Flink SQL Watermark生成 Flink SQL没有DataStram API开发那么灵活其Watermark的生成主要是在TableSource中完成的其定义了3类Watermark生成策略。其Watermark生成策略体系如下图所示。
Watermark的生成机制分为如下3类。 1周期性Watermark策略
周期性Watermark策略在Flink中叫作 PeriodicWatermarkAssigner 周期性一定时间间隔或者达到一定的记录条数地产生一个Watermark。在实际的生产中使用周期性Watermark策略的时候必须注意 时间和数据量结合时间和积累条数两个维度继续周期性产生Watermark否则在极端情况下会有很大 的延时。 1 AscendingTimestamps递增Watermark作用在Flink SQL中的Rowtime属性上Watermark当前收到的数据元素的最大时间戳-1此处减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃。 2 BoundedOutOfOrderTimestamps固定延迟Watermark作用在Flink SQL的Rowtime属性上 Watermark当前收到的数据元素的最大时间戳-固定延迟。 2每事件Watermark策略 每事件Watermark策略在Flink中叫作 PuntuatedWatamarkAssigner 数据流中每一个递增的 EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上会对下游算子造成压力所以只有在实时性要求非常高的场景下才会选择Punctuated的方式进行Watermark的生成。 3无为策略 无为策略在Flink中叫作 PreserveWatermark 。在Flink中可以使用DataStream API和Table SQL混合编程所以Flink SQL中不设定Watermark策略使用底层DataStream中的Watermark策略也是可以的这时Flink SQL的Table Source中不做处理。 3、多流的Watermark 在实际的流计算中一个作业中往往会处理多个Source的数据对Source的数据进行GroupBy分组那么 来自不同Source的相同key值会shuule到同一个处理节点并携带各自的WatermarkFlink内部要保证Watermark保持单调递增多个Source的Watermark汇聚到一起时可能不是单调自增的对于这样的情况
Flink内部实现每一个边上只能有一个递增的Watermark当出现多流携带EventTime汇聚到一起 GroupBy或Union时Apache Flink会选择所有流入的EventTime中最小的一个向下游流出从而保证 Watermark的单调递增和数据的完整性。
Watermark是在Source Function中生成或者在后续的DataStream API中生成的。Flink作业一般是并行执行的作业包含多个Task每个Task运行一个或一组算子OperatorChain实例Task在生成Watermark 的时候是相互独立的也就是说在作业中存在多个并行的Watermark。 Watermark在作业的DAG从上游向下游传递算子收到上游Watermark后会更新其Watermark。如果新的 Watermark大于算子的当前Watermark则更新算子的Watermark为新Watermark并发送给下游算子。 某些算子会有多个上游输入如Union或keyBy、partition之后的算子。在Flink的底层执行模型上多流 输入会被分解为多个双流输入所以对于多流Watermark的处理也就是双流Watermark的处理无论是哪 一个流的Watermark进入算子都需要跟另一个流的当前算子进行比较选择较小的Watermark即 Min(input1Watermark,intput2Watermark)与算子当前的Watermark比较如果大于算子当前的 Watermark则更新算子的Watermark为新的Watermark并发送给下游如以下代码所示。 //AbstractStreamOperator.java public voidprocessWatermark1(Watermark mark)throws Exception { inputlWatermark mark.getTimestamp(); longnewMinMath.min(input1Watermark,input2Watermark); if(newMin combinedWatermark){ combinedWatermarknewMin; processWatermark(new Watermark(combinedwatermark)); } } public voidprocessWatermark2(Watermark mark)throws Exception { input2Watermarkmark.getTimestamp(); longnewMinMath.min(inputlWatermark,input2Watermark); if(newMincombinedWatermark){ combinedwatermarknewMin; processWatermark(new Watermark(combinedwatermark)); } }
多流Watermark中使用了事件时间。
在上图中Source算子产生各自的Watermark并随着数据流流向下游的map算子map算子是无状态计 算所以会将Watermark向下透传。window算子收到上游两个输入的Watermark后选择其中较小的一 个发送给下游window1算子比较Watermark 29和Watermark 14选择Watermark 14作为算子当前Watermark并将Watermark 14发往下游window2算子也采用相同的逻辑
32 . 简述什么是Flink的时间语义
在流处理中时间是一个非常核心的概念是整个系统的基石。比如我们经常会遇到这样的需求给定一个时间窗口比如一个小## 时统计时间窗口的内数据指标。那如何界定哪些数据将进入这个窗口呢在窗口的定义之前首先需要确定一个应用使用什么样的时间语义。本文将介绍Flink的Event Time、Processing Time和Ingestion Time三种时间语义。 一、处理时间(process time) 处理时间是指的执行操作的各个设备的时间。 对于运行在处理时间上的流程序, 所有的基于时间的操作(比如时间窗口)都是使用的设备时钟.比如, 一个长度为1个小时的窗口将会包含设备时钟表示的1个小时内所有的数据。 假设应用程序在 9:15am分启动, 第1个小时窗口将会包含9:15am到10:00am所有的数据, 然后下个窗口是10:00am-11:00am, 等等。 处理时间是最简单时间语义, 数据流和设备之间不需要做任何的协调. 他提供了最好的性能和最低的延迟。 但是, 在分布式和异步的环境下, 处理时间没有办法保证确定性, 容易受到数据传递速度的影响: 事件的延迟和乱序。 在使用窗口的时候, 如果使用处理时间, 就指定时间分配器为处理时间分配器。 二、摄取时间(ingestion time 摄取时间是指Flink 读取事件时记录的时间。 Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间因此不会被算子处理速度影响。 Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。 比起Event TimeIngestion Time可以不需要设置复杂的Watermark因此也不需要太多缓存延迟较低。 比起Processing TimeIngestion Time的时间是Souce赋值的一个事件在整个处理过程从头至尾都使用这个时间而且后续算子不受前序算子处理速度的影响计算结果相对准确一些但计算成本稍高。 三、事件时间(event time) 事件时间是指的这个事件发生的时间。 在event进入Flink之前, 通常被嵌入到了event中, 一般作为这个event的时间戳存在。 在事件时间体系中, 时间的进度依赖于数据本身, 和任何设备的时间无关. 事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟)。 在理想情况下不管事件时间何时到达或者他们的到达的顺序如何, 事件时间处理将产生完全一致且确定的结果. 事件时间处理会在等待无序事件(迟到事件)时产生一定的延迟。由于只能等待有限的时间因此这限制了确定性事件时间应用程序的可使用性。 假设所有数据都已到达事件时间操作将按预期方式运行即使在处理无序或迟到的事件或重新处理历史数据时也会产生正确且一致的结果。例如每小时事件时间窗口将包含带有事件时间戳的所有记录该记录落入该小时无论它们到达的顺序或处理时间。 注意: 在1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间。 四、总结 Event Time的优势是结果的可预测性缺点是缓存较大增加了延迟且调试和定位问题更复杂。 Processing Time只依赖当前执行机器的系统时钟不需要依赖Watermark无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义但是在分布式和异步的环境下Processing Time 不能提供确定性因为它容易受到事件到达系统的速度例如从消息队列、事件在系统内操作流动的速度以及中断的影响。 Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。 在 Flink 流处理真实场景中大部分的业务需求都会使用事件时间语义但还是以具体的业务需求择选不同的时间语义。
33 . 简述Flink相比于其它流式处理框架的优点
1、同时支持高吞吐、低延迟、高性能 Flink是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式数据处理框架。Apache Spark也只能兼顾高吞吐和高性能特点主要是因为Spark Streaming流式计算中无法做到低延迟保障而流式计算框架Apache Storm只能支持低延迟和高性能特性但是无法满足高吞吐的要求。而满足高吞吐、低延迟、高性能这三个目标对分布式流式计算框架来说是非常重要的。 2、支持事件时间Event Time 在流式计算领域中窗口计算的地位举足轻重但目前大多数框架窗口计算采用的都是系统时间 Process Time也是事件传输到计算框架处理时系统主机的当前时间。Flink能够支持基于事件时间Event Time)语义进行窗口计算也就是使用事件产生的时间这种基于事件驱动的机制使得事件即使乱序到达流系统也能够计算出精确的结果保证了事件原本产生时的时序性尽可能避免网络传输 或硬件系统的影响。 3、支持有状态计算 Flink在1.4版本中实现了状态管理所谓状态就是在流式计算过程中将算子的中间结果数据保存在内存或 文件系统中等下一个事件进入算子后可以从之前的状态中获取中间结果计算当前的结果从而无需 每次都基于全部的原始数据来统计结果这种方式极大地提升了系统的性能并降低了数据计算过程的 资源消耗。对于数据量大且运算逻辑非常复杂的流式计算场景有状态计算发挥了非常重要的作用。 4、支持高度灵活的窗口Window操作 在流处理应用中数据是连续不断的需要通过窗口的方式对流数据进行一定范围的聚合计算例如统 计在过去1分钟内有多少用户点击某一网页在这种情况下我们必须定义一个窗口用来收集最近一 分钟内的数据并对这个窗口的数据进行再计算。 Flink将窗口划分为基于Time、Count、Session以及Data-Driven等类型的窗口操作窗口可以用灵活的 出发条件定制化来达到对复杂的流传输模式的支持用户可以定义不同的窗口出发机制来满足不同的需 求。 5、基于轻量级分布式快照CheckPoint实现的容错
Flink能够分布式运行在上千个节点上将一个大型计算任务的流程拆解成晓得计算过程然后将task分 布到并行节点上处理。在任务执行过程中能够自动发现事件处理过程中的错误而导致数据不一致的问 题比如节点宕机、网络传输问题或是由于用户升级或修复问题而导致计算服务重启等。在这些情 况下通过基于分布式快照技术的Checkpoints将执行过程中的状态信息进行持久化恢复以确保数据 在处理过程中的一致性Exactly-Once。 6、基于JVM实现独立的内存管理 内存管理是所有计算框架需要重点考虑的部分尤其对于计算量比较大的计算场景数据在内存中该如 何进行管理显得至关重要。针对内存管理FLink实现了自身管理内存的机制尽可能减少JVM GC对系统的影响。另外FLink通过序列化/反序列化方法将所有的数据对象转换成二进制在内存中存储降低 数据存储的大小的同事能够更加有效地对内存空间进行利用降低GC带来的性能下降或任务异常的风 险因此Flink较其他分布式处理的框架会显得更加稳定不会因为JVM GC等问题而影响整个应用的运行 7、Savepoints保存点 对于7*24小时运行的流式应用数据源源不断的接入在一段时间内应用的终止有可能导致数据的丢失 或者极端结果的不准确例如进行集群版本的升级、停机运维操作等操作。值得一提的是FLink通过 Save Points技术将任务执行的快照保存在存储介质上当任务重启的时候可以直接从事先保存的Save Points恢复原有的计算状态是的任务继续按照停机之前的状态运行Save Points技术可以烫用户更好地管理和运维实时流式应用。
34 . 简述Flink和Spark的区别什么情况下使用Flink有什么优点
1、Flink和Spark的区别 数据模型 Flink基本数据模型是数据流以及事件序列。 Spark采用RDD模型Spark Streaming的DStream实际上也就是一组组小批数据RDD的集合。运行时架构 Flink是标准的流执行模式一个事件在一个节点处理完后可以直接发往下一个节点进行处理。 Spark是批计算将DAG划分为不同的Stage一个完成后才可以计算下一个。 2、Flink应用场景 在实际生产的过程中大量数据在不断地产生例如金融交易数据、互联网订单数据、GPS定位数据、 传感器信号、移动终端产生的数据、通信信号数据等以及我们熟悉的网络流量监控、服务器产生的日 志数据这些数据最大的共同点就是实时从不同的数据源中产生然后再传输到下游的分析系统。针对 这些数据类型主要包括实时智能推荐、复杂事件处理、实时欺诈检测、实时数仓与ETL类型、流数据分 析类型、实时报表类型等实时业务场景而Flink对于这些类型的场景都有着非常好的支持。 1实时智能推荐 智能推荐会根据用户历史的购买行为通过推荐算法训练模型预测用户未来可能会购买的物品。对个 人来说推荐系统起着信息过滤的作用对Web/App服务端来说推荐系统起着满足用户个性化需求 提升用户满意度的作用。推荐系统本身也在飞速发展除了算法越来越完善对时延的要求也越来越苛 刻和实时化。利用Flink流计算帮助用户构建更加实时的智能推荐系统对用户行为指标进行实时计算 对模型进行实时更新对用户指标进行实时预测并将预测的信息推送给Wep/App端帮助用户获取想 要的商品信息另一方面也帮助企业提升销售额创造更大的商业价值。 2复杂事件处理
对于复杂事件处理比较常见的案例主要集中于工业领域例如对车载传感器、机械设备等实时故障检 测这些业务类型通常数据量都非常大且对数据处理的时效性要求非常高。通过利用Flink提供的 CEP复杂事件处理进行事件模式的抽取同时应用Flink的Sql进行事件数据的转换在流式系统中构 建实时规则引擎一旦事件触发报警规则便立即将告警结果传输至下游通知系统从而实现对设备故 障快速预警监测车辆状态监控等目的。 3实时欺诈检测 在金融领域的业务中常常出现各种类型的欺诈行为例如信用卡欺诈、信贷申请欺诈等而如何保证 用户和公司的资金安全是来近年来许多金融公司及银行共同面对的挑战。随着不法分子欺诈手段的不 断升级传统的反欺诈手段已经不足以解决目前所面临的问题。以往可能需要几个小时才能通过交易数 据计算出用户的行为指标然后通过规则判别出具有欺诈行为嫌疑的用户再进行案件调查处理在这 种情况下资金可能早已被不法分子转移从而给企业和用户造成大量的经济损失。而运用Flink流式计算 技术能够在毫秒内就完成对欺诈判断行为指标的计算然后实时对交易流水进行规则判断或者模型预 测这样一旦检测出交易中存在欺诈嫌疑则直接对交易进行实时拦截避免因为处理不及时而导致的 经济损失。 4实时数仓与ETL 结合离线数仓通过利用流计算诸多优势和SQL灵活的加工能力对流式数据进行实时清洗、归并、结 构化处理为离线数仓进行补充和优化。另一方面结合实时数据ETL处理能力利用有状态流式计算技 术可以尽可能降低企业由于在离线数据计算过程中调度逻辑的复杂度高效快速地处理企业需要的统 计结果帮助企业更好地应用实时数据所分析出来的结果。 5流数据分析 实时计算各类数据指标并利用实时结果及时调整在线系统相关策略在各类内容投放、无线智能推送 领域有大量的应用。流式计算技术将数据分析场景实时化帮助企业做到实时化分析Web应用或者App 应用的各项指标包括App版本分布情况、Crash检测和分布等同时提供多维度用户行为分析支持日 志自主分析助力开发者实现基于大数据技术的精细化运营、提升产品质量和体验、增强用户黏性。 6实时报表分析 实时报表分析是近年来很多公司采用的报表统计方案之一其中最主要的应用便是实时大屏展示。利用 流式计算实时得出的结果直接被推送到前端应用实时显示出重要指标的变换情况。最典型的案例便是 淘宝的双十一活动每年双十一购物节除疯狂购物外最引人注目的就是天猫双十一大屏不停跳跃的 成交总额。在整个计算链路中包括从天猫交易下单购买到数据采集、数据计算、数据校验最终落到双 十一大屏上展现的全链路时间压缩在5秒以内顶峰计算性能高达数三十万笔订单/秒通过多条链路流 计算备份确保万无一失。而在其他行业企业也在构建自己的
35 . 简述Flink backPressure反压机制指标监控你是怎么做的
背压是指系统在一个临时负载峰值期间接收数据的速率大于其处理速率的一种场景备注:就是处理速 度慢接收速度快系统处理不了接收的数据。许多日常情况都会导致背压。例如垃圾回收卡顿可 能导致流入的数据堆积起来或者数据源可能出现发送数据过快的峰值。如果处理不当背压会导致资 源耗尽甚至导致数据丢失。 1、Flink反压
每个子任务都有自己的本地缓存池收到的数据以及发出的数据都会序列化之后放入到缓冲池里。 然后两个TaskManager之间只会建立一条物理链路底层使用Netty通讯所有子任务之间的通讯都由这条链路承担。
当任何一个子任务的发送缓存不管是子任务自己的本地缓存还是底层传输时Netty的发送缓存耗 尽时发送方就会被阻塞产生背压同样任何任务接收数据时如果本地缓存用完了都会停止从底层Netty那里读取数据这样很快上游的数据很快就会占满下游的底层接收缓存从而背压到发送 端形成对上游所有的任务的背压。 很显然这种思路有个明显的问题任何一个下游子任务的产生背压都会影响整条TaskManager之间 的链路导致全链路所有子任务背压。
为了解决上节的单任务背压影响全链路的问题在Flink 1.5之后引入了Credit-based Flow Control基于信用点的流量控制。 这种方法首先把每个子任务的本地缓存分为两个部分独占缓存Exclusive Buuers和浮动缓存 Floating Buuers 然后独占缓存的大小作为信用点发给数据发送方发送方会按照不同的子任务分别记录信用点并发 送尽可能多数据给接收方发送后则降低对应信用点的大小 当信用点为0时则不再发送起到背压的作用。在发送数据的同时发送方还会把队列中暂存排队的 数据量发给接收方接收方收到后根据本地缓存的大小决定是否去浮动缓存里请求更多的缓存来加 速队列的处理起到动态控制流量的作用。整个过程参考上图。
通过这样的设计就实现了任务级别的背压任意一个任务产生背压只会影响这个任务并不会对 TaskManger上的其它任务造成影响。 2、Flink监控指标 Flink任务提交到集群后接下来就是对任务进行有效的监控。Flink将任务监控指标主要分为系统指标和 用户指标两种系统指标主要包括Flink集群层面的指标例如CPU负载各组件内存使用情况等用户 指标主要包括用户在任务中自定义注册的监控指标用于获取用户的业务状况等信息。Flink中的监控指 标可以通过多种方式获取例如可以从Flink UI中直接查看也可以通过Rest Api或Reporter获取
36 . 简述Flink如何保证一致性
Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下应用程序的状 态将会从最近一次完成的检查点恢复并继续处理。尽管如此可以使用检查点来重置应用程序的状态 无法完全达到令人满意的一致性保证。相反source和sink的连接器需要和Flink的检查点和恢复机制进 行集成才能提供有意义的一致性保证。 为了给应用程序提供恰好处理一次语义的状态一致性保证应用程序的source连接器需要能够将source 的读位置重置到之前保存的检查点位置。当处理一次检查点时source操作符将会把source的读位置持 久化并在恢复的时候从这些读位置开始重新读取。支持读位置的检查点的source连接器一般来说是基 于文件的存储系统如文件流或者Kafka source检查点会持久化某个正在消费的topic的读偏移 量。如果一个应用程序从一个无法存储和重置读位置的source连接器摄入数据那么当任务出现故障 的时候数据就会丢失。也就是说我们只能提供at-most-once的一致性保证。 Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用可以保证应用程序不会丢失任何 数据。尽管如此应用程序可能会发出两次计算结果因为从上一次检查点恢复的应用程序所计算的结 果将会被重新发送一次一些结果已经发送出去了这时任务故障然后从上一次检查点恢复这些结 果将被重新计算一次然后发送出去。所以可重置读位置的source和Flink的恢复机制不足以提供端到 端的恰好处理一次语义即使应用程序的状态是恰好处理一次一致性级别。 Flink 中的一个大的特性就是exactly-once的特性我们在一般的流处理程序中会有三种处理语义 AT-MOST-ONCE(最多一次)当故障发生的时候什么都不干。就是说每条消息就只消费一次。 AT-LEAST-ONCE(至少一次)为了确保数据不丢失确保每个时间都得到处理一些时间可能会被 处理多次。 EXACTLY-ONCE(精确一次)每个时间都精确处理一次端到端的保证 内部保证— checkpoint source端—可重设数据的读取位置 sink端—从故障恢复时数据不会重复写入外部系统 Flink(checkpoint)和source端(Kafka)可以保证不出问题。但一个志在提供端到端恰好处理一次语义一致性 的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次 一致性语义幂等性写入和事务性写入。
1、幂等性写入 一个幂等操作无论执行多少次都会返回同样的结果。例如重复的向hashmap中插入同样的key-value对 就是幂等操作因为头一次插入操作之后所有的插入操作都不会改变这个hashmap因为hashmap已经 包含这个key-value对了。另一方面append操作就不是幂等操作了因为多次append同一个元素将会 导致列表每次都会添加一个元素。在流处理程序中幂等写入操作是很有意思的因为幂等写入操作可 以执行多次但不改变结果。所以它们可以在某种程度上缓和Flink检查点机制带来的重播计算结果的效 应。 需要注意的是依赖于幂等性sink来达到exactly-once语义的应用程序必须保证在从检查点恢复以后 它将会覆盖之前已经写入的结果。例如一个包含有sink操作的应用在sink到一个key-value存储时必须 保证它能够确定的计算出将要更新的key值。同时从Flink程序sink到的key-value存储中读取数据的应 用在Flink从检查点恢复的过程中可能会看到不想看到的结果。当重播开始时之前已经发出的计算 结果可能会被更早的结果所覆盖因为在恢复过程中。所以一个消费Flink程序输出数据的应用可 能会观察到时间回退例如读到了比之前小的计数。也就是说当流处理程序处于恢复过程中时流处 理程序的结果将处于不稳定的状态因为一些结果被覆盖掉而另一些结果还没有被覆盖。一旦重播完 成也就是说应用程序已经通过了之前出故障的点结果将会继续保持一致性。
2、事务性写入 实现端到端的恰好处理一次一致性语义的方法基于事务性写入。其思想是只将最近一次成功保存的检查 点之前的计算结果写入到外部系统中去。这样就保证了在任务故障的情况下端到端恰好处理一次语 义。应用将被重置到最近一次的检查点而在这个检查点之后并没有向外部系统发出任何计算结果。通 过只有当检查点保存完成以后再写入数据这种方法事务性的方法将不会遭受幂等性写入所遭受的重播 不一致的问题。尽管如此事务性写入却带来了延迟因为只有在检查点完成以后我们才能看到计算 结果。 Flink提供了两种构建模块来实现事务性sink连接器write-ahead-logWAL预写式日志sink和两阶段 提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中等接到检查点完成的通知才会将 计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段所以WAL可以使用在任何外部sink 系统上。尽管如此WAL还是无法提供刀枪不入的恰好处理一次语义的保证再加上由于要缓存数据带 来的状态后段的状态大小的问题WAL模型并不十分完美。 与之形成对比的2PC sink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点sink开始一个事务然后将所有的接收到的数据都添加到事务中并将这些数据写入到sink系统但并没有提交commit它们。当事务接收到检查点完成的通知时事务将被commit数据将被 真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务并在应用程序从 一次故障中恢复以后再commit的能力。
2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知所有操作符自己的检查点 成功的通知是它们可以commit的投票而作业管理器通知一个检查点成功的消息是commit事务的指令。于WAL sink形成对比的是2PC sinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的2PC sink不断的将数据写入到sink系统中而WAL写模型就会有之前所述的问题。 事务写的方式能提供端到端的Exactly-Once一致性它的代价也是非常明显的就是牺牲了延迟。输出 数据不再是实时写入到外部系统而是分批次地提交。目前来说没有完美的故障恢复和Exactly-Once 保障机制对于开发者来说需要在不同需求之间权衡
37 . 简述Flink支持JobMaster的HA啊原理是怎么样的
1、JobManager 高可用(HA) jobManager协调每个flink任务部署。它负责任务调度和资源管理。 默认情况下每个flink集群只有一个JobManager这将导致一个单点故障(SPOF)如果JobManager挂 了则不能提交新的任务并且运行中的程序也会失败。 使用JobManager HA集群可以从JobManager故障中恢复从而避免SPOF(单点故障) 。 用户可以在 standalone或 YARN集群 模式下配置集群高可用。 Standalone集群的高可用 Standalone模式独立模式下JobManager的高可用性的基本思想是任何时候都有一个 Master JobManager 并且多个Standby JobManagers 。 Standby JobManagers可以在Master JobManager 挂掉的情况下接管集群成为Master JobManager。 这样保证了没有单点故障一旦某一个Standby JobManager 接管集群程序就可以继续运行。 Standby JobManager和Master JobManager实例之间没有明确区别。每个JobManager都可以成为Master或Standby节点 2、Yarn 集群高可用 flink on yarn的HA 其实主要是利用yarn自己的job恢复机制
38 . 简述如何确定Flink任务的合理并行度
task的parallelism可以在Flink的不同级别上指定。 四种级别是算子级别、执行环境ExecutionEnvironment级别、客户端命令行级别、配置文件 flink-conf.yaml级别。 每个operator、data source或者data sink都可以通过调用setParallelism()方法来指定 运行环境的默认并发数可以通过调用setParallelism()方法来指定。env.setParallelism(3);运行环境的 并发数可以被每个算子确切的并发数配置所覆盖。 对于CLI客户端并发参数可以通过-p来指定 影响所有运行环境的系统级别的默认并发度可以在./conf/flink-conf.yaml的parallelism.defaul项中指 定。不建议。 当然也可以设置最大的并行度通过调用setMaxParallelism()方法来设置最大并发度。Flink如何确定TaskManager个数Job的最大并行度除以每个TaskManager分配的任务槽数。Flink on YARN时TaskManager的数量就是max(parallelism) / yarnslots向上取整。 例如一个最大并行度为10每个TaskManager有两个任务槽的作业就会启动5个TaskManager
39 . 简述link任务如何实现端到端一致
Source端数据从上游进入Flink必须保证消息严格一次消费。同时Source 端必须满足可重放 replay。否则 Flink 计算层收到消息后未计算却发生 failure 而重启消息就会丢失。 Flink计算层利用 Checkpoint 机制把状态数据定期持久化存储下来Flink程序一旦发生故障的时候可以选择状态点恢复避免数据的丢失、重复。
Sink端Flink将处理完的数据发送到Sink端时通过 两阶段提交协议 即 TwoPhaseCommitSinkFunction 函数。该 SinkFunction 提取并封装了两阶段提交协议中的公共逻辑保证Flink 发送Sink端时实现严格一次处理语义。 同时Sink端必须支持事务机制能够进行数据回滚或者满足幂等性。 回滚机制即当作业失败后能够将部分写入的结果回滚到之前写入的状态。 幂等性就是一个相同的操作无论重复多少次造成的结果和只操作一次相等。即当作业失败后写 入部分结果但是当重新写入全部结果时不会带来负面结果重复写入不会带来错误结果
40 . 简述Flink如何处理背反压
1、什么原因导致背压 流系统中消息的处理速度跟不上消息的发送速度导致消息的堆积。如果系统能感知消息堆积并调整 消息发送的速度使消息的处理速度和发送速度相协调就是有背压感知的系统。 背压如果不能得到正确地处理可能会导致资源被耗尽或者甚至出现更糟的情况导致数据丢失。flink就 是一个有背压感知的基于流的分布式消息处理系统。 消息发送的太快消息接受的太慢产生消息拥堵。发生消息拥堵后系统会自动降低消息发送的速度。
举例说明 正常情况下消息处理速度消息的发送速度不会发送消息拥堵系统运行流畅 异常情况下消息处理速度消息的发送速度发生消息堵塞系统运行不流畅 消息拥堵可以采用两种方案 将拥堵的消息直接删除将导致数据丢失在精确度要求高的场景非常不合适。
将拥堵的消息缓存起来并告知消息发送者减缓消息发送的速度。 将消息缓存起来并将缓冲区持久化以方便在处理失败的情况下进行数据重复。有些source本身提供 持久化机制可以优先考虑。例如Kafka就是一个很不错的选择可以背压从sink到source的整个pipeline同时对source进行限流来适配整合pipeline中最慢组件的速度从而获得系统的稳定状态。
2、Flink中的背压 Flink 没有使用任何复杂的机制来解决背压问题因为根本不需要那样的方案它利用自身作为纯数据流引擎的优势来优雅地响应背压问题。 Flink 在运行时主要由 operators 和 streams 两大组件构成。每个 operator 会消费中间态的流并在流上进行转换然后生成新的流。对于 Flink 的网络机制一种形象的类比是Flink 使用了高效有界的分布式阻塞队列就像 Java 通用的阻塞队列BlockingQueue一样。使用 BlockingQueue 的话一个较慢的接受者会降低发送者的发送速率因为一旦队列满了有界队列发送者会被阻塞。Flink 解决背压的方案就是这种感觉。 在 Flink 中这些分布式阻塞队列就是这些逻辑流而队列容量是通过缓冲池来LocalBuuerPool实现的。每个被生产和被消费的流都会被分配一个缓冲池。缓冲池管理着一组缓冲(Buuer)缓冲在被消费后 可以被回收循环利用。这很好理解你从池子中拿走一个缓冲填上数据在数据消费完之后又把缓 冲还给池子之后你可以再次使用它。 3、网络传输中的内存管理 在解释 Flink 的反压原理之前我们必须先对 Flink 中网络传输的内存管理有个了解。 如下图所示展示了 Flink 在网络传输场景下的内存管理。网络上传输的数据会写到 Task 的InputGateIG 中经过 Task 的处理后再由 Task 写到 ResultPartitionRS 中。每个 Task 都包括了输入和输入输入和输出的数据存在 Buuer 中都是字节数据。Buuer 是 MemorySegment 的包装 类。 1 TaskManagerTM在启动时会先初始化NetworkEnvironment对象TM 中所有与网络相关的东西都由该类来管理如 Netty 连接其中就包括NetworkBuuerPool。根据配置Flink 会在 NetworkBuuerPool 中生成一定数量默认2048的内存块 MemorySegment内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBuuerPool 是 Task 之间共享的每个 TM 只会实例化一个。 2 Task 线程启动时会向 NetworkEnvironment 注册NetworkEnvironment 会为 Task 的 InputGateIG和 ResultPartitionRP 分别创建一个 LocalBuuerPool缓冲池并设置可申请的 MemorySegment内存块数量。IG 对应的缓冲池初始的内存块数量与 IG 中 InputChannel 数量一致 RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition 数量一致。不过每当创建或销毁缓冲池时NetworkBuuerPool 会计算剩余空闲的内存块数量并平均分配给已创建的缓冲池。注意这个过程只是指定了缓冲池所能使用的内存块数量并没有真正分配内存块只有当需要时才分配。为什么 ## 要动态地为缓冲池扩容呢因为内存越多意味着系统可以更轻松地应对瞬时压力如GC不会频繁 地进入反压状态所以我们要利用起那部分闲置的内存块。 3在 Task 线程执行过程中当 Netty 接收端收到数据时为了将 Netty 中的数据拷贝到 Task 中 InputChannel实际是 RemoteInputChannel会向其对应的缓冲池申请内存块上图中的①。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限则会向 NetworkBuuerPool 申请内存块 上图中的②并交给 InputChannel 填上数据上图中的③和④。如果缓冲池已申请的数量达到上限了呢或者 ## NetworkBuuerPool 也没有可用内存块了呢这时候Task 的 Netty Channel 会暂停读取上游的发送端会立即响应停止发送拓扑会进入反压状态。当 Task 线程写数据到 ResultPartition 时也会向缓冲池请求内存块如果没有可用内存块时会阻塞在请求内存块的地方达到暂停写入的目的。 4当一个内存块被消费完成之后在输入端是指内存块中的字节被反序列化成对象了在输出端是指 内存块中的字节写入到 Netty Channel 了会调用 Buuer.recycle() 方法会将内存块还给LocalBuuerPool 上图中的⑤。如果LocalBuuerPool中当前申请的数量超过了池子容量由于上文提到的动态容量由于新注册的 Task 导致该池子容量变小则LocalBuuerPool会将该内存块回收给NetworkBuuerPool上图中的⑥。如果没超过池子容量则会继续留在池子中减少反复申请的开 销。 4、背压过程 举例说明Flink背压的过程 下图有一个简单的flow它由两个task组成
1记录A进入Flink然后Task1处理 2 Task1处理后的结果被序列化进缓存区 3 Task2从缓存区内读取一些数据缓存区内将有更多的空间 4如果Task2处理的较慢Task1的缓存区很快填满发送速度随之下降。 不要忘记记录能被Flink处理的前提必须有空闲可用的缓存区Buuer。 结合上面两张图看Task 1 在输出端有一个相关联的 LocalBuuerPool称缓冲池1Task 2 在输入端也有一个相关联的 LocalBuuerPool称缓冲池2。如果缓冲池1中有空闲可用的 buuer 来序列化记录“A”我们就序列化并发送该 buuer。 这里我们需要注意两个场景
本地传输如果 Task 1 和 Task 2 运行在同一个 worker 节点TaskManager该 buuer 可以直接交给下一个 Task。一旦 Task 2 消费了该 buuer则该 buuer 会被缓冲池1回收。如果 Task 2 的速度比 1 慢 那么 buuer 回收的速度就会赶不上 Task 1 取 buuer 的速度导致缓冲池1无可用的 buuerTask 1 等待在可用的 buuer 上。最终形成 Task 1 的降速。 远程传输如果 Task 1 和 Task 2 运行在不同的 worker 节点上那么 buuer 会在发送到网络TCP Channel后被回收。在接收端会从 LocalBuuerPool 中申请 buuer然后拷贝网络中的数据到 buuer 中。如果没有可用的 buuer会停止从 TCP 连接中读取数据。在输出端通过 Netty 的水位值机制来保证不往网络中写入太多数据后面会说。如果网络中的数据Netty输出缓冲中的字节数超过了高水 位值我们会等到其降到低水位值以下才继续写入数据。这保证了网络中不会有太多的数据。如果接收 端停止消费网络中的数据由于接收端缓冲池没有可用 buuer网络中的缓冲数据就会堆积那么发送端也会暂停发送。另外这会使得发送端的缓冲池得不到回收writer 阻塞在向 LocalBuuerPool 请求buuer阻塞了 writer 往 ResultSubPartition 写数据。 这种固定大小缓冲池就像阻塞队列一样保证了 Flink 有一套健壮的反压机制使得 Task 生产数据的速度不会快于消费的速度。我们上面描述的这个方案可以从两个 Task 之间的数据传输自然地扩展到更复杂的 pipeline 中保证反压机制可以扩散到整个 pipeline。
41 . 简述Flink解决数据延迟的问题
Flink数据延迟的原因有很多可能是程序自身存在问题也可能是外部因素造成的下面列举一些可能的原因和相应的处理方案 数据输入环节问题可能是数据来源的数据增长速度过快导致flink消费者处理数据的速度跟不上数据生成的速度。解决方案增加flink消费者的并发度使用分区和并行流的方式来处理数据以保证消费者可以快速地处理大量的数据。 数据输出环节问题可能是flink消费者完成数据计算之后输出数据的过程速度过慢导致数据延迟。解决方案优化输出数据的方式可以使用缓存和批处理的方式输出数据以提高输出速度。 中间处理环节问题可能是flink计算模块自身出现问题例如程序过度消耗资源、任务堆积、程序过于复杂等。解决方案优化flink程序自身去除重复代码尽量避免程序出现任务堆积、大循环等问题并使用合适的检测工具来监测程序性能和运行状态。 外部因素问题可能是计算集群内存不足、网络问题、硬件故障等因素造成的。解决方案根据具体情况进行调整例如增加计算集群内存、优化网络连接、处理硬件故障等。 总结来说在处理flink数据延迟时需要针对不同的具体场景确定问题所在并进行相应的优化和解决方案。通过不断优化、调整和监测整个flink系统的运行环境可以保证flink系统运行的效率和准确性。
使用代码举例 下面是使用flink Stream API实现基于水印watermark的数据延迟处理的代码示例
public class DataDelayAnalysisJob {
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境 final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kafka 中读取数据 Properties properties new Properties(); properties.setProperty(“bootstrap.servers”, “localhost:9092”); properties.setProperty(“group.id”, “test”); FlinkKafkaConsumer kafkaConsumer new FlinkKafkaConsumer(“topic-name”, new SimpleStringSchema(), properties); DataStream input env .addSource(kafkaConsumer) .assignTimestampsAndWatermarks(new WatermarkStrategy() { Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator() { private long maxTimestamp; Override public void onEvent(String event, long eventTimestamp, WatermarkOutput output) { maxTimestamp Math.max(maxTimestamp, eventTimestamp); } Override public void onPeriodicEmit(WatermarkOutput output) { long maxOutOfOrderness 5000; // 5 seconds output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness)); } }; } });
// 处理数据和计算 DataStream delayed input .filter(new FilterFunction() { Override public boolean filter(String value) { // 过滤出延迟时间超过 5s 的数据 long eventTime Long.parseLong(value.split(“\t”)[0]); long now System.currentTimeMillis(); return now - eventTime 5000; // 5 seconds } });
// 将延迟数据输出到外部存储 delayed.writeToSocket(“localhost”, 9999, new SimpleStringSchema());
// 启动 Flink 执行环境 env.execute(“Data Delay Analysis Job”); } }
在上述代码中对数据进行了流式处理并使用基于水印watermark的方式判断数据是否存在延迟若延迟时间超过 5s则将该数据输出到外部存储并保存以后进行分析和处理。这样便通过代码实现了对flink数据延迟的处理方案。
42 . 简述使用flink-client消费kafka数据还是使用flink-connector消费
Flink 是通过Connector与具体的source 和 sink进行通信的
43 . 简述如何动态修改Flink的配置前提是Flink不能重启
1、Flink/Spark 如何实现动态更新作业配置 由于实时场景对可用性十分敏感实时作业通常需要避免频繁重启因此动态加载作业配置变量是 实时计算里十分常见的需求比如通常复杂事件处理 (CEP) 的规则或者在线机器学习的模型。尽管常见实现起来却并没有那么简单其中最难点在于如何确保节点状态在变更期间的一致性。目前来说一 般有两种实现方式 轮询拉取方式即作业算子定时检测在外部系统的配置是否有变更若有则同步配置。 控制流方式即作业除了用于计算的一个或多个普通数据流以外还有提供一个用于改变作业算子 状态的元数据流也就是控制流。 轮询拉取方式基于 pull 模式一般实现是用户在 Stateful 算子(比如 RichMap)里实现后台线程定时从外部系统同步变量。这种方式对于一般作业或许足够但存在两个缺点分别限制了作业的实时性和准确性的 进一步提高首先轮询总是有一定的延迟因此变量的变更不能第一时间生效其次这种方式依赖 于节点本地时间来进行校准。如果在同一时间有的节点已经检测到变更并更新状态而有的节点还没有 检测到或者还未更新就会造成短时间内的不一致。 控制流方式基于 push 模式变更的检测和节点更新的一致性都由计算框架负责从用户视角看只需要定义如何更新算子状态并负责将控制事件丢入控制流后续工作计算框架会自动处理。控制流不同于其 他普通数据流的地方在于控制流是以广播形式流动的否则在有 Keyby 或者 rebalance 等提高并行度分流的算子的情况下就无法将控制事件传达给所有的算子。 以目前最流行的两个实时计算框架 Spark Streaming 和 Flink 来说前者是以类似轮询的方式来实现实时作业的更新而后者则是基于控制流的方式。
2、Spark Streaming Broadcast Variable Spark Streaming 为用户提供了 Broadcast Varialbe可以用于节点算子状态的初始化和后续更新。Broacast Variable 是一组只读的变量它在作业初始化时由 Spark Driver 生成并广播到每个 Executor 节点随后该节点的 Task 可以复用同一份变量。 Broadcast Variable 的设计初衷是为了避免大文件比如 NLP 常用的分词词典随序列化后的作业对象一起分发造成重复分发的网络资源浪费和启动时间延长。这类文件的更新频率是相对低的扮演的角色 类似于只读缓存通过设置 TTL 来定时更新缓存过期之后 Executor 节点会重新向 Driver 请求最新的变量。 Broadcast Variable 并不是从设计理念上就支持低延迟的作业状态更新因此用户想出了不少 Hack 的方法其中最为常见的方式是一方面在 Driver 实现后台线程不断更新 Broadcast Variavle另一方面在作业运行时通过显式地删除 Broadcast Variable 来迫使 Executor 重新从 Driver 拉取最新的 Broadcast Variable。这个过程会发生在两个 micro batch 计算之间以确保每个 micro batch 计算过程中状态是一致的。 比起用户在算子内访问外部系统实现更新变量这种方式的优点在于一致性更有保证。因为 Broadcast Variable 是统一由 Driver 更新并推到 Executor 的这就保证不同节点的更新时间是一致的。然而相对地缺点是会给 Driver 带来比较大的负担因为需要不断分发全量的 Broadcast Variable (试想下一个巨大的 Map每次只会更新少数 Entry却要整个 Map 重新分发)。在 Spark 2.0 版本以后Broadcast Variable 的分发已经从 Driver 单点改为基于 BitTorrent 的 P2P 分发这一定程度上缓解了随着集群规模提升 Driver 分发变量的压力但我个人对这种方式能支持到多大规模的部署还是持怀疑态度。另外一点是重新分发 Broadcast Variable 需要阻塞作业进行这也会使作业的吞吐量和延迟受到比较大的影响。
3、Flink Broadcast State Stream Broadcast Stream 是 Flink 1.5.0 发布的新特性基于控制流的方式实现了实时作业的状态更新。Broadcast Stream 的创建方式与普通数据流相同例如从 Kafka Topic 读取特别之处在于它承载的是控制事件流会以广播形式将数据发给下游算子的每个实例。Broadcast Stream 需要在作业拓扑的某个节点和普通数据流 (Main Stream) join 到一起。 该节点的算子需要同时处理普通数据流和控制流一方面它需要读取控制流以更新本地状态 (Broadcast State)另外一方面需要读取 Main Stream 并根据 Broadcast State 来进行数据转换。由于每个算子实例读到的控制流都是相同的它们生成的 Broadcast State 也是相同的从而达到通过控制消息来更新所有算子实例的效果。 目前 Flink 的 Broadcast Stream 从效果上实现了控制流的作业状态更新不过在编程模型上有点和一般直觉不同。原因主要在于 Flink 对控制流的处理方式和普通数据流保持了一致最为明显的一点是控制流除了改变本地 State 还可以产生 output这很大程度上影响了 Broadcast Stream 的使用方式。Broadcast Stream 的使用方式与普通的 DataStream 差别比较大即需要和 DataStream 连接成为BroadcastConnectedStream 后再通过特殊的 BroadcastProcessFunction 来处理而 BroadcastProcessFunction 目前只支持 类似于 RichCoFlatMap 效果的操作。RichCoFlatMap 可以间接实现对 Main Stream 的 Map 转换返回一只有一个元素的集合和 Filter 转换返回空集合但无法实现 Window 类计算。这意味着如果用户希望改变 Window 算子的状态那么需要将状态管理提前到上游的BroadcastProcessFunction然后再通过 BroadcastProcessFunction 的输出来将影响下游 Window 算子的行为。
4、总结 实时作业运行时动态加载变量可以令大大提升实时作业的灵活性和适应更多应用场景目前无论是 Flink 还是 Spark Streaming 对动态加载变量的支持都不是特别完美。Spark Streaming 受限于 Micro Batch 的计算模型虽然现在 2.3 版本引入 Continuous Streaming 来支持流式处理但离成熟还需要一定时间 将作业变量作为一致性和实时性要求相对低的节点本地缓存并不支持低延迟地、低成本地更新作业变 量。Flink 将变量更新视为特殊的控制事件流符合 Even Driven 的流式计算框架定位目前在业界已有比较成熟的应用。不过美中不足的是编程模型的易用性上有提高空间控制流目前只能用于和数据流的join这意味着下游节点无法继续访问控制流或者需要把控制流数据插入到数据流中这种方式并不优 雅从而降低了编程模型的灵活性。最好的情况是大部分的算子都可以被拓展为具有BroadcastOperator就像 RichFunction 一样它们可以接收一个数据流和一个至多个控制流并维护对应的 BroadcastState这样控制流的接入成本将显著下降。
44 . 简述什么是Flink流批一体
在大数据处理计算领域有离线计算和实时计算两种模式。一般都是用mapreduce / hive / sparkSQL来处理离线场景用 sparkStreaming / flink处理实时场景但是这种lambda架构会导致一个问题进行更改时要同时更改两套代码进行同步。 flink流批一体横空处理为大数据处理带来了一套新的解决方案。 双11中Flink流批一体开始在阿里最核心的数据业务场景崭露头角并扛住了40亿/秒的实时计算峰值。
其实流批一体的技术里面最早提出于2015年它的初衷是让大数据开发人员能够用同一套接口实现大数 据的流计算和批计算进而保证处理过程与结果的一致性。spark、flink都陆续提出了自己的解决方案。 虽然spark是最早提出流批一体理念的计算引擎之一但其本质还是用批来实现流用的是微批次的思 想有秒级的延迟而且无法正确处理时间语义数据在分布式传输过程中顺序发生改变先生产的数 据反而后到导致计算不准确的一种现象所以难以满足复杂、大规模的实时计算场景迟迟无法落 地。而2019年阿里收购flink后投入大量研发力量同时公司也面临离线和实时数据统计口径不一致的 问题影响广告、商务甚至是公司的运行决策业务的迫切要求技术力量的不断加入都促进了flink 向流批一体的发展。
在流处理引擎之上Flink 有以下机制 检查点机制和状态机制用于实现容错、有状态的处理 水印机制用于实现事件时钟 窗口和触发器用于限制计算范围并定义呈现结果的时间。 在同一个流处理引擎之上Flink 还存在另一套机制用于实现高效的批处理。 用于调度和恢复的回溯法由 Microsoh Dryad 引入现在几乎用于所有批处理器 用于散列和排序的特殊内存数据结构可以在需要时将一部分数据从内存溢出到硬盘上 优化器尽可能地缩短生成结果的时间。 两套机制分别对应各自的APIDataStream API 和 DataSet API在创建 Flink 作业时并不能通过将两者混合在一起来同时 利用 Flink 的所有功能。 在最新的版本中Flink 支持两种关系型的 APITable API 和 SQL。这两个 API 都是批处理和流处理统一的 API这意味着在无边界的实时数据流和有边界的历史记录数据流上关系型 API 会以相同的语义执行查询并产生相同的结果。Table API 和 SQL 借助了 Apache Calcite 来进行查询的解析校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成并支持用户自定义的标量函数聚合函数以及表值函数。 Table API / SQL 正在以流批统一的方式成为分析型用例的主要 API。 DataStream API 是数据驱动应用程序和数据管道的主要API。 从长远来看DataStream API应该通过有界数据流完全包含DataSet API
45 . 简述什么是Flink的check和barrier
1、Flink Check Flink Check 是 Apache Flink 的一个基于属性的测试库它扩展了 ScalaCheck 的线性时序逻辑运算符适用于测试 Flink 数据流转换。 基于属性的测试(PBT)是一种自动的黑盒测试技术它通过生成随机输入并检查获得的输出是否满足给定 的属性来测试功能。 Flink Check 提供了一个有边界的时间逻辑用于生成函数的输入和声明属性。这个逻辑是为流媒体系统设计的它允许用户定义流如何随时间变化以及哪些属性应该验证相应的输出。 Flink Check随机生成指定数量的有限输入流前缀并对Flink运行时产生的输出流进行评估。 Flink Check 是基于 sscheck这是Spark 的一个基于属性的测试库所以它依赖于 sscheck-core 项目其中包含了 sscheck 和 Flink Check 共同的代码特别是系统所基于的 LTLss 逻辑的实现。 LTLss 是一种有限字的离散时间线性时序逻辑在 Spark Streaming 的 Property-based testing for Spark Streaming 的论文中有详细介绍。 2、barrier Flink提供了容错机制能够在应用失败的时候重新恢复任务。这个机制主要就是通过持续产生快照的方 式实现的。Flink快照主要包括两部分数据一部分是数据流的数据另一部分是operator的状态数据。对 应的快照机制的实现有主要两个部分组成一个是屏障(Barrier一个是状态(State)。 Flink 分布式快照里面的一个核心的元素就是流屏障stream barrier。这些屏障会被插入(injected)到数据流中并作为数据流的一部分随着数据流动。屏障并不会持有任何数据而是和数据一样线性的流 动。可以看到屏障将数据流分成了两部分数据实际上是多个连续的部分一部分是当前快照的数 据一部分下一个快照的数据。每个屏障会带有它的快照ID。这个快照的数据都在这个屏障的前面。从 图上看数据是从左向右移动右边的先进入系统那么快照n包含的数据就是右侧到下一个屏障 n-1截止的数据图中两个灰色竖线之间的部分也就是part of checkpoint n。另外屏障并不会打断数的流动,因而屏障是非常轻量的。在同一个时刻多个快照可以在同一个数据流中这也就是说多个快 照可以同时产生。
46 . 简述Flink状态机制
一、前言 有状态的计算是流处理框架要实现的重要功能因为稍复杂的流处理场景都需要记录状态然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能
数据流中的数据有重复想对重复数据去重需要记录哪些数据已经流入过应用当新数据流入时根据已流入过的数据来判断去重。 检查输入流是否符合某个特定的模式需要将之前流入的元素以状态的形式缓存下来。比如判断一个温度传感器数据流中的温度是否在持续上升。 对一个时间窗口内的数据进行聚合分析分析一个小时内某项指标的75分位或99分位的数值。 一个状态更新和获取的流程如下图所示一个算子子任务接收输入流获取对应的状态根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和那么当算子子任务接收到新元素时会获取已经存储在状态中的数值然后将当前输入加到状态上并将状态数据更新。
二、状态类型 Flink有两种基本类型的状态托管状态Managed State和原生状态Raw State。
两者的区别Managed State是由Flink管理的Flink帮忙存储、恢复和优化Raw State是开发者自己管理的需要自己序列化。
具体区别有
从状态管理的方式上来说Managed State由Flink Runtime托管状态是自动存储、自动恢复的Flink在存储管理和持久化上做了一些优化。当横向伸缩或者说修改Flink应用的并行度时状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。 从状态的数据结构上来说Managed State支持了一系列常见的数据结构如ValueState、ListState、MapState等。Raw State只支持字节任何上层数据结构需要序列化为字节数组。使用时需要用户自己序列化以非常底层的字节数组形式存储Flink并不知道存储的是什么样的数据结构。 从具体使用场景来说绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类在里面使用Managed State。Raw State是在已有算子和Managed State不够用时用户自定义算子时使用。 对Managed State继续细分它又有两种类型Keyed State(键控状态)和Operator State(算子状态)。
为了自定义Flink的算子可以重写Rich Function接口类比如RichFlatMapFunction。使用Keyed State时通过重写Rich Function接口类在里面创建和访问状态。对于Operator State还需进一步实现CheckpointedFunction接口。
2.1、Keyed State Flink 为每个键值维护一个状态实例并将具有相同键的所有数据都分区到同一个算子任务中这个任务会维护和处理这个key对应的状态。当任务处理一条数据时它会自动将状态的访问范围限定为当前数据的key。因此具有相同key的所有数据都会访问相同的状态。
需要注意的是键控状态只能在 KeyedStream 上进行使用可以通过 stream.keyBy(…) 来得到 KeyedStream(键控流) 。
Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State)
ValueState存储单值类型的状态。可以使用 update(T)进行更新并通过 T value()进行检索。 ListState存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素并通过 get() 获得整个列表。 ReducingState用于存储经过 ReduceFunction 计算后的结果使用 add(T) 增加元素。 AggregatingState用于存储经过 AggregatingState 计算后的结果使用 add(IN) 添加元素。 FoldingState已被标识为废弃会在未来版本中移除官方推荐使用 AggregatingState 代替。 MapState维护 Map 类型的状态。 假设我们正在开发一个监控系统当监控数据超过阈值一定次数后需要发出报警信息
import java.util
import org.apache.commons.compress.utils.Lists import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector
/** •author w1992wishes 2020/7/20 19:45 */ class ThresholdWarning(threshold: Long, numberOfTimes: Int) extends RichFlatMapFunction[(String, Long), (String, util.ArrayList[Long])] {
// 通过ListState来存储非正常数据的状态 private var abnormalData: ListState[Long] _
override def open(parameters: Configuration): Unit { // 创建StateDescriptor val abnormalDataStateDescriptor new ListStateDescriptor[Long](“abnormalData”, classOf[Long]) // 通过状态名称(句柄)获取状态实例如果不存在则会自动创建 abnormalData getRuntimeContext.getListState(abnormalDataStateDescriptor) }
override def flatMap(value: (String, Long), out: Collector[(String, util.ArrayList[Long])]): Unit { val inputValue value._2 // 如果输入值超过阈值则记录该次不正常的数据信息 if (inputValue threshold) abnormalData.add(inputValue) val list Lists.newArrayList(abnormalData.get.iterator) // 如果不正常的数据出现达到一定次数则输出报警信息 if (list.size numberOfTimes) { out.collect((value._1 超过指定阈值 , list)) // 报警信息输出后清空状态 abnormalData.clear() } } }
object KeyedStateDetailTest extends App { val env StreamExecutionEnvironment.getExecutionEnvironment val dataStreamSource env.fromElements( (“a”, 50L), (“a”, 80L), (“a”, 400L), (“a”, 100L), (“a”, 200L), (“a”, 200L), (“b”, 100L), (“b”, 200L), (“b”, 200L), (“b”, 500L), (“b”, 600L), (“b”, 700L))
dataStreamSource .keyBy(_._1) .flatMap(new ThresholdWarning(100L, 3)) // 超过100的阈值3次后就进行报警 .printToErr() env.execute(“Managed Keyed State”) } 2.2、Operator State Operator State可以用在所有算子上每个算子子任务或者说每个算子实例共享一个状态流入这个算子子任务的数据可以访问和更新这个状态。
算子状态不能由相同或不同算子的另一个实例访问。
Flink为算子状态提供三种基本数据结构
ListState存储列表类型的状态。 UnionListState存储列表类型的状态与 ListState 的区别在于如果并行度发生变化ListState 会将该算子的所有并发的状态实例进行汇总然后均分给新的 Task而 UnionListState 只是将所有并发的状态实例汇总起来具体的划分行为则由用户进行定义。 BroadcastState用于广播的算子状态。如果一个算子有多项任务而它的每项任务状态又都相同那么这种特殊情况最适合应用广播状态。 假设此时不需要区分监控数据的类型只要有监控数据超过阈值并达到指定的次数后就进行报警 import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._
object OperatorStateDetail extends App { val env StreamExecutionEnvironment.getExecutionEnvironment // 开启检查点机制 env.enableCheckpointing(1000) env.setParallelism(1) // 设置并行度为1 val dataStreamSource env.fromElements( (“a”, 50L), (“a”, 80L), (“a”, 400L), (“a”, 100L), (“a”, 200L), (“a”, 200L), (“b”, 100L), (“b”, 200L), (“b”, 200L), (“b”, 500L), (“b”, 600L), (“b”, 700L)) dataStreamSource .flatMap(new OperatorStateDetailThresholdWarning(100L, 3)) .printToErr() env.execute(“Managed Operator State”) }
class OperatorStateDetailThresholdWarning(threshold: Long, numberOfTimes: Int) extends RichFlatMapFunction[(String, Long), (String, ListBuffer[(String, Long)])] with CheckpointedFunction {
// 正常数据缓存 private var bufferedData: ListBuffer[(String, Long)] ListBuffer(String, Long)
// checkPointedState private var checkPointedState: ListState[(String, Long)] _
override def flatMap(value: (String, Long), out: Collector[(String, ListBuffer[(String, Long)])]): Unit { val inputValue value._2 // 超过阈值则进行记录 if (inputValue threshold) { bufferedData value } // 超过指定次数则输出报警信息 if (bufferedData.size numberOfTimes) { // 顺便输出状态实例的hashcode out.collect((checkPointedState.hashCode() “阈值警报”, bufferedData)) bufferedData.clear() } }
override def snapshotState(context: FunctionSnapshotContext): Unit { // 在进行快照时将数据存储到checkPointedState checkPointedState.clear() for (element - bufferedData) { checkPointedState.add(element) } }
override def initializeState(context: FunctionInitializationContext): Unit { // 注册ListStateDescriptor val descriptor new ListStateDescriptor[(String, Long)]( “buffered-abnormalData”, TypeInformation.of(new TypeHint(String, Long) {}) )
// 从FunctionInitializationContext中获取OperatorStateStore进而获取ListState checkPointedState context.getOperatorStateStore.getListState(descriptor)
// 如果是作业重启读取存储中的状态数据并填充到本地缓存中 if (context.isRestored) { for (element - checkPointedState.get()) { bufferedData element } } } }
三、状态横向扩展 状态的横向扩展问题主要是指修改Flink应用的并行度确切的说每个算子的并行实例数或算子子任务数发生了变化应用需要关停或启动一些算子子任务某份在原来某个算子子任务上的状态数据需要平滑更新到新的算子子任务上。
Flink的Checkpoint就是一个非常好的在各算子间迁移状态数据的机制。算子的本地状态将数据生成快照snapshot保存到分布式存储如HDFS上。横向伸缩后算子子任务个数变化子任务重启相应的状态从分布式存储上重建restore。
对于Keyed State和Operator State这两种状态他们的横向伸缩机制不太相同。由于每个Keyed State总是与某个Key相对应当横向伸缩时Key总会被自动分配到某个算子子任务上因此Keyed State会自动在多个并行子任务之间迁移。对于一个非KeyedStream流入算子子任务的数据可能会随着并行度的改变而改变。如上图所示假如一个应用的并行度原来为2那么数据会被分成两份并行地流入两个算子子任务每个算子子任务有一份自己的状态当并行度改为3时数据流被拆成3支或者并行度改为1数据流合并为1支此时状态的存储也相应发生了变化。对于横向伸缩问题Operator State有两种状态分配方式一种是均匀分配另一种是将所有状态合并再分发给每个实例上。
四、检查点机制 为了使 Flink 的状态具有良好的容错性Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制Flink 定期在数据流上生成 checkpoint barrier 当某个算子收到 barrier 时即会基于当前状态生成一份快照然后再将该 barrier 传递到下游算子下游算子接收到该 barrier 后也基于当前状态生成一份快照依次传递直至到最后的 Sink 算子上。当出现异常后Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。
4.1、开启检查点 默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n)来启用 checkpoint里面的 n 是进行 checkpoint 的间隔单位毫秒。
Checkpoint 其他的属性包括
精确一次exactly-once对比至少一次at-least-once你可以选择向 enableCheckpointing(long interval, CheckpointingMode mode) 方法中传入一个模式来选择使用两种保证等级中的哪一种。对于大多数应用来说精确一次是较好的选择。至少一次可能与某些延迟超低始终只有几毫秒的应用的关联较大。 checkpoint 超时如果 checkpoint 执行的时间超过了该配置的阈值还在进行中的 checkpoint 操作就会被抛弃。 checkpoints 之间的最小时间该属性定义在 checkpoint 之间需要多久的时间以确保流应用在 checkpoint 之间有足够的进展。如果值设置为了 5000无论 checkpoint 持续时间与间隔是多久在前一个 checkpoint 完成时的至少五秒后会才开始下一个 checkpoint。 并发 checkpoint 的数目: 默认情况下在上一个 checkpoint 未完成失败或者成功的情况下系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间从而影响正常的处理流程。不过允许多个 checkpoint 并行进行是可行的对于有确定的处理延迟例如某方法所调用比较耗时的外部服务但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说是有意义的。 externalized checkpoints: 你可以配置周期存储 checkpoint 到外部系统中。Externalized checkpoints 将他们的元数据写到持久化存储上并且在 job 失败的时候不会被自动删除。这种方式下如果你的 job 失败你将会有一个现有的 checkpoint 去恢复。更多的细节请看 Externalized checkpoints 的部署文档。 在 checkpoint 出错时使 task 失败或者继续进行 task他决定了在 task checkpoint 的过程中发生错误时是否使 task 也失败使失败是默认的行为。 或者禁用它时这个任务将会简单的把 checkpoint 错误信息报告给 checkpoint coordinator 并继续运行。 优先从 checkpoint 恢复prefer checkpoint for recovery该属性确定 job 是否在最新的 checkpoint 回退即使有更近的 savepoint 可用这可以潜在地减少恢复时间checkpoint 恢复比 savepoint 恢复更快。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000); // 高级选项 // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 开启在 job 中止后仍然保留的 externalized checkpoints env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 允许在有更近 savepoint 时回退到 checkpoint env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
4.2、保存点机制 保存点机制 (Savepoints) 是检查点机制的一种特殊的实现它允许通过手工的方式来触发 Checkpoint并将结果持久化存储到指定路径中主要用于避免 Flink 集群在重启或升级时导致状态丢失。示例如下
触发指定id的作业的Savepoint并将结果存储到指定目录下 bin/flink savepoint :jobId [:targetDirectory] 1 2 五、状态后端 Flink 提供了多种 state backends它用于指定状态的存储方式和位置。
状态可以位于 Java 的堆或堆外内存。取决于 state backendFlink 也可以自己管理应用程序的状态。为了让应用程序可以维护非常大的状态Flink 可以自己管理内存如果有必要可以溢写到磁盘。默认情况下所有 Flink Job 会使用配置文件 flink-conf.yaml 中指定的 state backend。
但是配置文件中指定的默认 state backend 会被 Job 中指定的 state backend 覆盖。
5.1、状态管理器分类 MemoryStateBackend 默认的方式即基于 JVM 的堆内存进行存储主要适用于本地开发和调试。
FsStateBackend 基于文件系统进行存储可以是本地文件系统也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend 但正在进行的数据仍然是存储在 TaskManager 的内存中的只有在 checkpoint 时才会将状态快照写入到指定文件系统上。
RocksDBStateBackend RocksDBStateBackend 是 Flink 内置的第三方状态管理器采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时再将其中的数据持久化到指定的文件系统中所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低但比起全文件系统的方式其读取速率更快比起全内存的方式其存储空间更大因此它是一种比较均衡的方案。
5.2、配置方式 Flink 支持使用两种方式来配置后端管理器
第一种方式基于代码方式进行配置只对当前作业生效
// 配置 FsStateBackend env.setStateBackend(new FsStateBackend(“hdfs://namenode:40010/flink/checkpoints”)); // 配置 RocksDBStateBackend env.setStateBackend(new RocksDBStateBackend(“hdfs://namenode:40010/flink/checkpoints”)); 配置 RocksDBStateBackend 时需要额外导入下面的依赖
org.apache.flink flink-statebackend-rocksdb_2.11 1.9.0
第二种方式基于 flink-conf.yaml 配置文件的方式进行配置对所有部署在该集群上的作业都生效 state.backend: filesystem state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints 1 2 六、状态一致性 6.1、端到端end-to-end 在真实应用中流处理应用除了流处理器以外还包含了数据源例如 Kafka和输出到持久化系统。
端到端的一致性保证意味着结果的正确性贯穿了整个流处理应用的始终每一个组件都保证了它自己的一致性整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下
内部保证依赖checkpoint source 端需要外部源可重设数据的读取位置 sink 端需要保证从故障恢复时数据不会重复写入外部系统。 而对于sink端又有两种具体的实现方式
幂等Idempotent写入所谓幂等操作是说一个操作可以重复执行很多次但只导致一次结果更改也就是说后面再重复执行就不起作用了。 事务性Transactional写入需要构建事务来写入外部系统构建的事务对应着 checkpoint等到 checkpoint 真正完成的时候才把所有对应的结果写入 sink 系统中。 对于事务性写入具体又有两种实现方式预写日志WAL和两阶段提交2PC。Flink DataStream API 提供了GenericWriteAheadSink 模板类和 TwoPhaseCommitSinkFunction 接口可以方便地实现这两种方式的事务性写入。
6.2、FlinkKafka 实现端到端的 exactly-once语义 端到端的状态一致性的实现需要每一个组件都实现对于Flink ## Kafka的数据管道系统Kafka进、Kafka出而言各组件怎样保证exactly-once语义呢
内部利用checkpoint机制把状态存盘发生故障的时候可以恢复保证内部的状态一致性 sourcekafka consumer作为source可以将偏移量保存下来如果后续任务出现了故障恢复的时候可以由连接器重置偏移量重新消费数据保证一致性 sinkkafka producer作为sink采用两阶段提交 sink需要实现一个TwoPhaseCommitSinkFunction内部的checkpoint机制。 Flink由JobManager协调各个TaskManager进行checkpoint存储checkpoint保存在 StateBackend中默认StateBackend是内存级的也可以改为文件级的进行持久化保存。
当 checkpoint 启动时JobManager 会将检查点分界线barrier注入数据流barrier会在算子间传递下去。
每个算子会对当前的状态做个快照保存到状态后端。对于source任务而言就会把当前的offset作为状态保存起来。下次从checkpoint恢复时source任务可以重新提交偏移量从上次保存的位置开始重新消费数据。
每个内部的 transform 任务遇到 barrier 时都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka这些数据都属于预提交的事务还不能被消费当遇到 barrier 时把状态保存到状态后端并开启新的预提交事务。
当所有算子任务的快照完成也就是这次的 checkpoint 完成时JobManager 会向所有任务发通知确认这次 checkpoint 完成。当sink 任务收到确认通知就会正式提交之前的事务kafka 中未确认的数据就改为“已确认”数据就真正可以被消费了。
所以看到执行过程实际上是一个两段式提交每个算子执行完成会进行“预提交”直到执行完sink操作会发起“确认提交”如果执行失败预提交会放弃掉。
具体的两阶段提交步骤总结如下
第一条数据来了之后开启一个 kafka 的事务transaction正常写入 kafka 分区日志但标记为未提交这就是“预提交” jobmanager 触发 checkpoint 操作barrier 从 source 开始向下传递遇到 barrier 的算子将状态存入状态后端并通知 jobmanager sink 连接器收到 barrier保存当前状态存入 checkpoint通知 jobmanager并开启下一阶段的事务用于提交下个检查点的数据 jobmanager 收到所有任务的通知发出确认信息表示 checkpoint 完成 sink 任务收到 jobmanager 的确认信息正式提交这段时间的数据 外部kafka关闭事务提交的数据可以正常消费了。 所以也可以看到如果宕机需要通过StateBackend进行恢复只能恢复所有确认提交的操作
47 . 简述Flink广播流
其实在上一题中在Operator States也有介绍广播流Broadcast State。 广播状态Broadcast State是 Apache Flink 中支持的第三种类型的Operator State。Broadcast State使得 Flink 用户能够以容错、一致、可扩缩容地将来自广播的低吞吐的事件流数据存储下来被广播到某个 operator 的所有并发实例中然后与另一条流数据连接进行计算。 广播状态与其他 operator state 之间有三个主要区别 Map 的格式 有一条广播的输入流 operator 可以有多个不同名字的广播状态 1、注意事项 在使用广播状态时要记住以下4个重要事项
1使用广播状态operator task 之间不会相互通信 这也是为什么(Keyed)-BroadcastProcessFunction上只有广播的一边可以修改广播状态的内容。用户必须 保证所有 operator 并发实例上对广播状态的修改行为都是一致的。或者说如果不同的并发实例拥有不同的广播状态内容将导致不一致的结果。 2广播状态中事件的顺序在各个并发实例中可能不尽相同 虽然广播流的元素保证了将所有元素最终都发给下游所有的并发实例但是元素的到达的顺序可能 在并发实例之间并不相同。因此对广播状态的修改不能依赖于输入数据的顺序。 3所有 operator task 都会快照下他们的广播状态 在 checkpoint 时所有的 task 都会 checkpoint 下它们的广播状态并不仅仅是其中一个即使所有 task 在广播状态中存储的元素是一模一样的。这是一个设计倾向为了避免在恢复期间从单个文件读取而造成热点。然而随着并发度的增加checkpoint 的大小也会随之增加这里会存在一个并发因子p 的权衡。Flink保证了在恢复/扩缩容时不会出现重复数据和少数据。在以相同或更小并行度恢复时每 个 task 会读取其对应的检查点状态。在已更大并行度恢复时每个 task 读取自己的状态剩余的 task p_newp_old会以循环方式round-robin读取检查点的状态。 4 RocksDBStateBackend状态后端目前还不支持广播状态 广播状态目前在运行时保存在内存中。因为当前RocksDB状态后端还不适用于operator state。Flink 用户应该相应地为其应用程序配置足够的内存。 2、广播状态模式的应用 一般来说广播状态的主要应用场景如下 动态规则动态规则是一条事件流要求吞吐量不能太高。例如当一个报警规则时触发报警信息等。 我们将这个规则广播到计算的算子的所有并发实例中。 数据丰富例如将用户的详细信息作业广播状态进行广播对包含用户ID的交易数据流进行数据丰富
48 . 简述什么是Flink实时topN
TopN 是统计报表和大屏非常常见的功能主要用来实时计算排行榜。流式的TopN可以使业务方在内存中按照某个统计指标如出现次数计算排名并快速出发出更新后的排行榜。我们以统计词 频为例展示一下如何快速开发一个计算TopN的Flink程序。
49 . 简述什么是Flink的Savepoint
Savepoint在Flink中叫做保存点是基于Flink检查点机制的应用完整快照备份机制用来保存状态可 以在另一个集群或者另一个时间点从保存的状态中将作业恢复回来适用于应用升级、集群迁移、 Flink集群版本更新、A/B测试以及假定场景、暂停和重启、归档等场景。保存点可以视为一个 算子 ID→State的Map对于每一个有状态的算子Key是算子IDValue是算子的State。 在作业恢复方面Flink提供了应用自动容错机制可以减少人为干预降低运维复杂度。同时为了提高 灵活度也提供了手动恢复。Flink提供了外部检查点和保存点两种手动作业恢复方式。这里说下保存 点恢复方式。 保存点恢复方式 用户通过命令触发由用户手动创建、清理。使用了标准化格式存储允许作业升级或者配置变更。用 户在恢复时需要提供用于恢复作业状态的保存点路径。
其实从保存点恢复作业并不简单尤其是在作业变更如修改逻辑、修复bug的情况下需要考虑 如下几点。 1算子的顺序改变 如果对应的UID没变则可以恢复如果对应的UID变了则恢复失败。 2作业中添加了新的算子 如果是无状态算子没有影响可以正常恢复如果是有状态的算子跟无状态的算子一样处理。 3从作业中删除了一个有状态的算子 默认需要恢复保存点中所记录的所有算子的状态如果删除了一个有状态的算子从保存点恢复的时候 被删除的OperatorID找不到所以会报错可以通过在命令中添加-allowNonRestoredState short: -n 跳过无法恢复的算子。 4添加和删除无状态的算子 如果手动设置了UID则可以恢复保存点中不记录无状态的算子如果是自动分配的UID那么有状态 算子的UID可能会变Flink使用一个单调递增的计数器生成UIDDAG改版计数器极有可能会变很 有可能恢复失败。 5恢复的时候调整并行度 Flink1.2.0及以上版本,如果没有使用作废的API则没问题1.2.0以下版本需要首先升级到1.2.0才可以
50 . 简述为什么用Flink不用别的微批考虑过吗
mini-batch模式的处理过程如下 在数据流中收集记录 收集若干记录后调度一个批处理作业进行数据处理 在批处理运行的同时收集下一批次的记录。 也就是说Spark为了处理一个mini-batch需要调度一个批处理作业相比于Flink延迟较大Spark的处 理延迟在秒级。而Flink只需启动一个流计算拓扑处理持续不断的数据Flink的处理延迟在毫秒级 别。如果计算中涉及到多个网络ShuuleSpark Streaming和Flink之间的延迟差距会进一步拉大
51 . 简述解释一下啥叫背压
什么是背压 在流式处理系统中如果出现下游消费的速度跟不上上游生产数据的速度就种现象就叫做背压(backpressure,有人叫反压不纠结本篇叫背压)。本篇主要以Flink作为流式计算框架来简单背压机制为了更好理解只做简单分享。
背压产生的原因 下游消费的速度跟不上上游生产数据的速度可能出现的原因如下
(1)节点有性能瓶颈可能是该节点所在的机器有网络、磁盘等等故障机器的网络延迟和磁盘不足、频繁GC、数据热点等原因。 (2)数据源生产数据的速度过快计算框架处理不及时。比如消息中间件kafka生产者生产数据过快下游flink消费计算不及时。 (3)flink算子间并行度不同下游算子相比上游算子过小。
背压导致的影响 首先背压不会直接导致系统的崩盘只是处在一个不健康的运行状态。
(1)背压会导致流处理作业数据延迟的增加。 (2)影响到Checkpoint导致失败导致状态数据保存不了如果上游是kafka数据源在一致性的要求下可能会导致offset的提交不上。 原理: 由于Flink的Checkpoint机制需要进行Barrier对齐如果此时某个Task出现了背压Barrier流动的速度就会变慢导致Checkpoint整体时间变长如果背压很严重还有可能导致Checkpoint超时失败。 (3)影响state的大小还是因为checkpoint barrier对齐要求。导致state变大。
原理接受到较快的输入管道的barrier后它后面数据会被缓存起来但不处理直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面导致state变大。
52 . 简述Flink分布式快照
分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理包括Keyed State和Operator State。 Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。 核心思想是在 input source 端插入 barrier控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。
53 . 简述Flink SQL解析过程
首先先了解下Calcite是什么。 1、Apache Calcite是什么 Apache Calcite是一个动态数据管理框架它具备很多典型数据库管理系统的功能如SQL解析、SQL校验、SQL查询优化、SQL生成以及数据连接查询等但是又省略了一些关键的功能如Calcite并不存储相 关的元数据和基本数据不完全包含相关处理数据的算法等。 Calcite采用的是业界大数据查询框架的一种通用思路它的目标是“one size fits all一种方案适应所有需求场景”希望能为不同计算平台和数据源提供统一的查询引擎。 Calcite作为一个强大的SQL计算引擎在Flink内部的SQL引擎模块就是基于Calcite。 2、Calcite的特点 支持标准SQL语言 独立于编程语言和数据源可以支持不同的前端和后端 支持关系代数、可定制的逻辑规则和基于成本模型优化的查询引擎 支持物化视图materialized view的管理创建、丢弃、持久化和自动识别 基于物化视图的Lattice和Tile机制以应用于OLAP分析 支持对流数据的查询。 3、Calcite的功能
1 SQL解析
Calcite的SQL解析是通过JavaCC实现的使用JavaCC编写SQL语法描述文件将SQL解析成未经校验的 AST语法树。 2 SQL效验 校验分两部分 无状态的校验即验证SQL语句是否符合规范。 有状态的校验即通过与元数据结合验证SQL中的Schema、Field、Function是否存在输入输出类 型是否匹配等。 3 SQL查询优化 对上个步骤的输出RelNode逻辑计划树进行优化得到优化后的物理执行计划。优化有两种基 于规则的优化和基于代价的优化。 4 SQL生成 将物理执行计划生成为在特定平台/引擎的可执行程序如生成符合MySQL或Oracle等不同平台规则的 SQL查询语句等。 5数据连接与执行 通过各个执行平台执行查询得到输出结果。 在Flink或者其他使用Calcite的大数据引擎中一般到SQL查询优化即结束由各个平台结合Calcite的SQL 代码生成和平台实现的代码生成将优化后的物理执行计划组合成可执行的代码然后在内存中编译执 行。 4、Flink SQL结合CalciteFlink SQL解析
一条SQL从提交到Calcite解析优化到最后的Flink执行一般分以下过程 1Sql Parser 将sql语句通过java cc解析成AST语法树在calcite中用SqlNode表示AST 2Sql Validator 结合数字字典catalog去验证sql语法 3生成Logical Plan 将sqlNode表示的AST转换成LogicalPlan 用relNode表示; 4生成 optimized LogicalPlan 先基于calcite rules 去优化logical Plan基于flink定制的一些优化rules 去优化logical Plan 5生成Flink PhysicalPlan 这里也是基于flink里头的rules将将optimized LogicalPlan转成成Flink的物理执行计划 6将物理执行计划转成Flink ExecutionPlan就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。
这里再提一下SQL的优化 5、SQL查询优化器 SQL优化的发展则可以分为两个阶段即RBO基于规则和CBO基于代价。 逻辑优化使用Calcite的Hep优化器基于规则物理优化阶段使用了Calcite的Hep规则优化器和 Volcano优化器基于代价。 1 RBO基于规则的优化器会将原有表达式裁剪掉遍历一系列规则Rule只要满足条件就转 换生成最终的执行计划。一些常见的规则包括分区裁剪Partition Prune、列裁剪、谓词下推 Predicate Pushdown、投影下推Projection Pushdown、聚合下推、limit下推、sort下推、常量折叠Constant Folding、子查询内联转join等。 2 CBO基于代价的优化器会将原有表达式保留基于统计信息和代价模型尝试探索生成等价关 系表达式最终取代价最小的执行计划。CBO的实现有两种模型Vol can o模型Cascades模型。这两种模型思想很是相似不同点在于Cascades模型一边遍历SQL逻辑树一边优化从而进一步裁剪掉一 些执行计划。 看一个案例 RBO基于规则优化 RBO主要是开发人员在使用SQL的过程中有些发现有些通用的规则可以显著提高SQL执行的效率比 如最经典的Filter下推
将Filter下推到Join之前执行这样做的好处是减少了Join的数量同时降低了CPU内存网络等方面 的开销提高效率。 RBO和CBO的区别大概在于RBO只为应用提供的rule而CBO会根据给出的Cost信息智能应用rule 求出一个Cost最低的执行计划。需要纠正很多人误区的一点是CBO其实也是基于rule的接触到RBO和 CBO这两个概念的时候很容易将他们对立起来。但实际上CBO可以理解为就是加上Cost的RBO。 目前各大数据库和计算引擎倾向于CBO。
54 . 简述 什么是Flink on YARN模式
Flink支持多种部署模式 1 Standalone模式Flink安装在普通的Linux机器上或者安装在K8s中集群的资源由Flink自行管理。 2 Yarn、Mesos、K8s等资源管理集群模式Flink向资源集群申请资源创建Flink集群。 3云上模式Flink可以在Google、亚马逊云计算平台上轻松部署
Flink on Yarn交互过程如下
1 Client上传Flink的jar包和配置文件到HDFS集群上 2 Client向Yarn的ResourceManager提交任务和申请资源 3 ResourceManager分配Container资源并启动ApplicationMaster 4 ApplicationMaster加载Flink的jar包和配置文件构建环境启动Flink-JobManager 5 ApplicationMaster向ResourceManager申请任务资源 6 NodeManager加载Flink的jar包和配置文件构建环境并启动TaskManager 7 TaskManager启动后会向JobManager发送心跳并等待JobManager向其分配任务Flink On Yarn模式的两种方式Session模式和Per-Job模式 1、Session模式适合小任务使用 需要先申请资源启动JobManager和TaskManager 不需要每次提交任务再去申请资源而是使用已经申请好的资源从而提高执行效率任务提交完资源不会被释放因此一直会占用资源
2、Per-Job模式适合使用大任务且资源充足 每次提交任务都需要去申请资源申请资源需要时间所有影响执行效率但是在大数据面前都是小 事 每次执行完任务资源就会立刻被释放不会占用资源
55 . 简述Flink如何保证数据不丢失
Checkpoint检查点是Flink实现应用容错的核心机制。 Flink根据配置周期性通知Stream中各个算子的状态来生成检查点快照从而将这些状态数据定期持久化 存储下来Flink程序一旦意外崩溃重新运行程序时可以有选择地从这些快照进行恢复将应用恢复到 最后一次快照的状态从此刻开始重新执行避免数据的丢失、重复。 默认情况下如果设置了检查点选项则Flink只保留最近成功生成的一个检查点而当Flink程序失败 时可以从最近的这个检查点来进行恢复。但是如果希望保留多个检查点并能够根据实际需要选择 其中一个进行恢复会更加灵活。 默认情况下检查点不会被保留取消程序时即会删除它们但是可以通过配置保留定期检查点根据 配置当作业失败或者取消的时候不会自动清除这些保留的检查点。 如果想保留检查点那么Flink也设计了相关实现可选项如下。 ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION取消作业时保留检查点。在这种情况下必须在取消后手动清理检查点状态。 ExternalizedCheckpointCleanup. DELETE_ON_CANCELLATION取消作业时删除检查点。只有在作业失败时检查点状态才可用。
56 . 简述Flink的API可分为哪几层
1SQL Tale AP!同时适用于批处理和流处理这意味着你可以对有界数据流和无界数据流以相同的语义进行查询并产生相同的结果。除了基本查询外 它还支持自定义的标量函数聚合函数以及表值函数可以满足多样化的查询需求。 2DataStream DataSet API 是 Flink 数据处理的核心 APL支持使用 Java 语言或 Scala 语言进行调用提供了数据读取数换和数据输出等一系列常用操作的封装。 3Stateful Stream Processing 是最低级别的抽象它通过 Process Function 函数内嵌到 DataStream AP1 中。ProcessEunction 是 Elink 提供的最底层 API具有最大的灵活性允许开发者对于时间和状态进行细粒度的控制
57 . 简述Flink的分区策略
按照key值分区 全部发往一个分区 广播 上下游并行度一样时一对一发送 随机均匀分配 轮流分配
58 . 简述KeyedState都有哪几类
Keyed State 可以进一步划分为下面的 5 类它们分别是 。比较常用的: ValueState、ListState、MapState 。不太常用的: ReducingState 和 AggregationState
59 . 简述Flink全局快照
全局快照首先是一个分布式应用它有多个进程分布在多个服务器上: 其次它在应用内部有自己的处理逻辑和状态: 第三应用间是可以互相通信的: 第四在这种分布式的应用有内部状态硬件可以通信的情况下某一时刻的全局状态就叫做全局的快照
60 . 简述Flink CEP 编程中当状态没有到达的时候会将数据保存在哪里
在 Flink CEP 的处理逻辑中状态没有满足的和迟到的数据都会存储在一个 Map 数据结构中也就是说如果我们限定判断事件序列的时长为 5 分钟那么内存中就会存储 5 分钟的数据
61 . 简述Flink中的广播变量使用时需要注意什么
我们知道Flink是并行的计算过程可能不在一个 Slot 中进行那么有一种情况即:当我们需要访问同一份数据。那么Fink中的广播变量就是为了解决这种情况。 我们可以把广播变量理解为是一个公共的共享变量我们可以把一个dataset 数据集广播出去然后不同的task在节点上都能够获取到这个数据在每个节点上只会存在一份
62 . 简述Flink-On-Yarn常见的提交模式有哪些分别有什么优缺点
1.yarn-session 式 这种方式需要先启动集群然后在提交作业接着会向varn申请一块空间后资源永远保持不变。如果资源满了下一个就任务就无法提交只能等到varn中其中一个作业完成后程放了资源那下-个作业才会正常提交这种方式资源被限制在sesSi0n中不能超过比较适合特定的运行环境或测试环境。 2.per-job模式 这种方式直接在yarn上提交任务运行Flink作业这种方式的好处是一个任务会对应一个job即每提交一个作业会根据自身的情况.向yarn中申请资源直到作业执行完成并不会影响下一个作业的正常运行除非是yarn上面没有任何资源的情况下。一般生产环境是采用此方式运行。这种方式需要保证集群资源足够
63 . 简述什么是Flink Operator Chains
为了更高效地分布式执行Flink会尽可能地将operator的subtask链接 chain) 在一起形成task。每个task在一个线程中执行。将operatorst技成ask是非带有效的优化:它能减少线程之同的切授减少消息的序列化/反成列化减少少了是迟的同时提高整体的吞吐量。这就是我们所说的算子链。其实就是尽量把操作逻辑放入到同一个subtask里就是一个槽taskSlot
64 . 简述Flink中应用在ableAPI中的UDF有几种
scalar function: 针对一条record的一个字段的操作返回一个字段。 table function: 针对一条record的一个字段的操作返回多个字段 aggregate function: 针对多条记录的一个字段操作返回一条记录
65 . 简述Flink中如何进行状态恢复
Flink使用检查点Checkpoint机制进行状态恢复即在运行过程中将状态保存到外部存储系统如HDFS、S3等中以便在节点故障或手动操作如更新应用程序代码时重新启动时恢复状态。Flink提供两种类型的检查点增量检查点和精确一次检查点。
增量检查点Incremental Checkpoints只保存从上一个检查点到当前检查点之间发生的更改。因此它们比精确一次检查点更快但在恢复时需要应用更多的更改因此可能需要更长的时间来恢复应用程序状态。
精确一次检查点Exactly-once Checkpoints是最常用的检查点类型。它会对整个应用程序的状态进行快照并确保检查点是精确一次的即检查点保存的状态不包含任何重复的记录。这种类型的检查点是最可靠和最完整的状态恢复方式但需要更长的恢复时间和更多的资源。
66 . 简述Flink中的任务并发度是怎样控制的
Flink中的任务并发度由并行度和任务槽数量共同控制。在Flink中每个任务槽task slot代表一个Flink集群中的一个物理资源可以理解为一个线程。并行度指的是同一算子并行执行的任务槽数量。并行度越高同一算子的任务被分配到的任务槽数量越多任务的执行速度也就越快。但是并行度越高也会带来更多的通信和协调开销。因此在实际使用中需要根据数据量、计算复杂度和硬件资源等因素进行调整。
在Flink中可以通过以下方式控制任务的并发度
全局并发度在执行环境中指定的并行度是整个作业的并行度控制着算子任务的总数。 算子并发度在算子实例化时指定的并行度控制着算子任务的分配数量。
67 . 简述Flink中的批处理有哪些优化策略
Flink中批处理作业的优化主要集中在以下几个方面
数据源合理选择数据源减少数据倾斜和数据借助操作的开销。
分区根据数据量和计算资源合理设置并行度和分区数充分利用集群的计算资源。
内存管理Flink中使用了内存管理机制对内存进行管理和分配通过优化内存使用方式可以减少内存分配和GC开销提高处理性能。
重用对象避免在算子中频繁地创建和销毁对象可以通过对象重用机制减少内存分配和GC开销。
算子选择根据具体的业务场景选择性能更优的算子比如使用聚合算子代替多个Map和Reduce算子可以减少数据倾斜和网络开销。
并行算法对于一些高性能的算法可以采用并行算法来进行计算提高处理性能。
缓存使用缓存机制可以减少IO和网络开销提高数据读写速度和处理性能。
压缩对于一些需要传输的大数据可以使用压缩算法来减小数据的传输量提高数据传输速度。 总之Flink中的批处理作业优化需要结合具体的业务场景和数据规模来进行需要从多个方面入手进行优化以达到提高作业性能和效率的目的。