手机网站设计方案,wap 手机网站建设,乐清网络公司哪家最好,软硬件开发都包括什么目录
FlinkSQL 调优
1 设置空闲状态保留时间
2 开启 MiniBatch
3 开启 LocalGlobal
3.1 原理概述
3.2 提交案例#xff1a;统计每天每个 mid 出现次数
3.3 提交案例#xff1a;开启 miniBatch 和 LocalGlobal
4 开启 Split Distinct
4.1 原理概述
4.2 提交案例统计每天每个 mid 出现次数
3.3 提交案例开启 miniBatch 和 LocalGlobal
4 开启 Split Distinct
4.1 原理概述
4.2 提交案例count (distinct) 存在热点问题
4.3 提交案例开启 split distinct
5 多维 DISTINCT 使用 Filter
5.1 原理概述
5.2 提交案例多维 Distinct
5.3 提交案例使用 Filter
6 设置参数总结
总结 在 Flink SQL 的应用场景中优化工作至关重要它直接关乎作业的性能、资源利用以及数据处理的准确性与高效性。随着数据量的不断增长和业务需求的日益复杂若不对 Flink SQL 进行调优可能会面临诸如状态失控、处理延迟过高、资源瓶颈等诸多问题。例如新手容易忽视的空闲状态保留时间设置不当可能导致状态爆炸而在聚合操作中数据倾斜也会严重影响性能。深入了解并掌握 Flink SQL 的调优技巧能够让我们在大数据处理的浪潮中更好地驾驭 Flink SQL确保数据处理任务平稳、高效地运行。 FlinkSQL 调优 FlinkSQL 官网配置参数//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/
1 设置空闲状态保留时间 Flink SQL 新手有可能犯的错误其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景 FlinkSQL 的 regular joininner、left、right左右表的数据都会一直保存在状态里不会清理要么设置 TTL要么使用 FlinkSQL 的 interval join。使用 Top-N 语法进行去重重复数据的出现一般都位于特定区间内例如一小时或一天内过了这段时间之后对应的状态就不再需要了。 Flink SQL 可以指定空闲状态 (即未更新的状态) 被保留的最小时间当状态中某个 key 对应的状态未更新的时间达到阈值时该条状态被自动清理 收起 java
// API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
// 参数指定
Configuration configuration tableEnv.getConfig().getConfiguration();
configuration.setString(table.exec.state.ttl, 1 h);2 开启 MiniBatch MiniBatch 是微批处理原理是缓存一定的数据后再触发处理以减少对 State 的访问从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程来触发微批需要消耗一定的线程调度性能。 MiniBatch 默认关闭开启方式如下: 收起 java
// 初始化 table environment
TableEnvironment tEnv ...// 获取 tableEnv 的配置对象
Configuration configuration tEnv.getConfig().getConfiguration();// 设置参数
// 开启 miniBatch
configuration.setString(table.exec.mini-batch.enabled, true);
// 批量输出的间隔时间
configuration.setString(table.exec.mini-batch.allow-latency, 5 s);
// 防止 OOM 设置每个批次最多缓存数据的条数可以设为 2 万条
configuration.setString(table.exec.mini-batch.size, 20000);适用场景 微批处理通过增加延迟换取高吞吐如果有超低延迟的要求不建议开启微批处理。通常对于聚合的场景微批处理可以显著的提升系统性能建议开启。 注意事项 目前key-value 配置项仅被 Blink planner 支持。1.12 之前的版本有 bug开启 miniBatch不会清理过期状态也就是说如果设置状态的 TTL无法清理过期状态。1.12 版本才修复这个问题。 参考 ISSUE[FLINK-17096] Mini-batch group aggregation doesnt expire state even if state ttl is enabled - ASF JIRA
3 开启 LocalGlobal
3.1 原理概述 LocalGlobal 优化将原先的 Aggregate 分成 Local Global 两阶段聚合即 MapReduce 模型中的 Combine Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合localAgg并输出这次微批的增量值Accumulator。第二阶段再将收到的 Accumulator 合并Merge得到最终的结果GlobalAgg。 LocalGlobal 本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据从而降低 GlobalAgg 的热点提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。 由上图可知 未开启 LocalGlobal 优化由于流中的数据倾斜Key 为红色的聚合算子实例需要处理更多的记录这就导致了热点问题。 开启 LocalGlobal 优化后先进行本地聚合再进行全局聚合。可大大减少 GlobalAgg 的热点提高性能。 LocalGlobal 开启方式 LocalGlobal 优化需要先开启 MiniBatch依赖于 MiniBatch 的参数。table.optimizer.agg-phase-strategy: 聚合策略。默认 AUTO支持参数 AUTO、TWO_PHASE (使用 LocalGlobal 两阶段聚合)、ONE_PHASE (仅使用 Global 一阶段聚合)。 收起 java
// 初始化 table environment
TableEnvironment tEnv ...// 获取 tableEnv 的配置对象
Configuration configuration tEnv.getConfig().getConfiguration();// 设置参数
// 开启 miniBatch
configuration.setString(table.exec.mini-batch.enabled, true);
// 批量输出的间隔时间
configuration.setString(table.exec.mini-batch.allow-latency, 5 s);
// 防止 OOM 设置每个批次最多缓存数据的条数可以设为 2 万条
configuration.setString(table.exec.mini-batch.size, 20000);
// 开启 LocalGlobal
configuration.setString(table.optimizer.agg-phase-strategy, TWO_PHASE);注意事项 需要先开启 MiniBatch开启 LocalGlobal 需要 UDAF 实现 Merge 方法。
3.2 提交案例统计每天每个 mid 出现次数 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo count可以看到存在数据倾斜。
3.3 提交案例开启 miniBatch 和 LocalGlobal 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo count \
--minibatch true \
--local-global true从 WebUI 可以看到分组聚合变成了 Local 和 Global 两部分数据相对均匀且没有数据倾斜。
4 开启 Split Distinct LocalGlobal 优化针对普通聚合例如 SUM、COUNT、MAX、MIN 和 AVG有较好的效果对于 DISTINCT 的聚合如 COUNT DISTINCT收效不明显因为 COUNT DISTINCT 在 Local 聚合时对于 DISTINCT KEY 的去重率不高导致在 Global 节点仍然存在热点。
4.1 原理概述 之前为了解决 COUNT DISTINCT 的热点问题通常需要手动改写为两层聚合增加按 Distinct Key 取模的打散层。 从 Flink1.9.0 版本开始提供了 COUNT DISTINCT 自动打散功能通过 HASH_CODE (distinct_key) % BUCKET_NUM 打散不需要手动重写。Split Distinct 和 LocalGlobal 的原理对比参见下图。 Distinct 举例 收起 sql
SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a手动打散举例 收起 sql
SELECT a, SUM(cnt)
FROM (SELECT a, COUNT(DISTINCT b) as cntFROM TGROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY aSplit Distinct 开启方式 默认不开启使用参数显式开启 收起 java
table.optimizer.distinct-agg.split.enabled: true默认 false。
table.optimizer.distinct-agg.split.bucket-num: Split Distinct 优化在第一层聚合中被打散的 bucket 数目。默认 1024。java
// 初始化 table environment
TableEnvironment tEnv ...// 获取 tableEnv 的配置对象
Configuration configuration tEnv.getConfig().getConfiguration();// 设置参数(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString(table.optimizer.distinct-agg.split.enabled, true);
// 第一层打散的 bucket 数目
configuration.setString(table.optimizer.distinct-agg.split.bucket-num, 1024);注意事项 目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。该功能在 Flink1.9.0 版本及以上版本才支持。
4.2 提交案例count (distinct) 存在热点问题 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo distinct可以看到存在热点问题。
4.3 提交案例开启 split distinct 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo distinct \
--minibatch true \
--split-distinct true 从 WebUI 可以看到有两次聚合而且有 partialFinal 字样第二次聚合时已经均匀。
5 多维 DISTINCT 使用 Filter
5.1 原理概述 在某些场景下可能需要从不同维度来统计 countdistinct的结果比如统计 uv、app 端的 uv、web 端的 uv可能会使用如下 CASE WHEN 语法。 收起 sql
SELECTa,COUNT(DISTINCT b) AS total_b,COUNT(DISTINCT CASE WHEN c IN (A, B) THEN b ELSE NULL END) AS AB_b,COUNT(DISTINCT CASE WHEN c IN (C, D) THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a在这种情况下建议使用 FILTER 语法目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如在上面的示例中三个 COUNT DISTINCT 都作用在 b 列上。此时经过优化器识别后Flink 可以只使用一个共享状态实例而不是三个状态实例可减少状态的大小和对状态的访问。 将上边的 CASE WHEN 替换成 FILTER 后如下所示: 收起 sql
SELECTa,COUNT(DISTINCT b) AS total_b,COUNT(DISTINCT b) FILTER (WHERE c IN (A, B)) AS AB_b,COUNT(DISTINNT b) FILTER (WHERE c IN (C, D)) AS CD_b
FROM T
GROUP BY a5.2 提交案例多维 Distinct 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo dim-difcount5.3 提交案例使用 Filter 收起 plaintext
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabledtrue \
-Dyarn.application.queuetest \
-Djobmanager.memory.process.size1024mb \
-Dtaskmanager.memory.process.size2048mb \
-Dtaskmanager.numberOfTaskSlots2 \
-c com.atguigu.flink.tuning.SqlDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--demo dim-difcount-filter通过 WebUI 对比前 10 次 Checkpoint 的大小可以看到状态有所减小。
6 设置参数总结 总结以上的调优参数代码如下 收起 java
// 初始化 table environment
TableEnvironment tEnv ...// 获取 tableEnv 的配置对象
Configuration configuration tEnv.getConfig().getConfiguration();// 设置参数
// 开启 miniBatch
configuration.setString(table.exec.mini-batch.enabled, true);
// 批量输出的间隔时间
configuration.setString(table.exec.mini-batch.allow-latency, 5 s);
// 防止 OOM 设置每个批次最多缓存数据的条数可以设为 2 万条
configuration.setString(table.exec.mini-batch.size, 20000);
// 开启 LocalGlobal
configuration.setString(table.optimizer.agg-phase-strategy, TWO_PHASE);
// 开启 Split Distinct
configuration.setString(table.optimizer.distinct-agg.split.enabled, true);
// 第一层打散的 bucket 数目
configuration.setString(table.optimizer.distinct-agg.split.bucket-num, 1024);
// 指定时区
configuration.setString(table.local-time-zone, Asia/Shanghai); 总结 本文全面深入地讲解了 Flink SQL 调优的关键要点。 在设置空闲状态保留时间方面明确了其对于防止因遗忘而导致状态爆炸的关键作用尤其是在 regular join 和 Top-N 去重场景中并给出了 API 和参数两种指定方式。MiniBatch 微批处理通过缓存数据触发处理来提升性能但有适用场景限制且在不同 Flink 版本存在差异。LocalGlobal 优化通过独特的两阶段聚合模式有效解决数据倾斜问题不过要依赖 MiniBatch 且对 UDAF 有要求通过案例展示了其优化效果。Split Distinct 为 COUNT DISTINCT 聚合提供了新的优化思路虽有功能限制但在特定场景下能发挥作用案例也呈现了开启前后的变化。多维 DISTINCT 使用 Filter 借助优化器识别减少了状态实例WebUI 中 Checkpoint 大小的对比直观体现了其优势。 总之掌握这些 Flink SQL 调优知识能帮助使用者在实际应用中根据具体业务需求灵活运用调优策略提升 Flink SQL 作业的整体质量和性能有效应对各种复杂的数据处理挑战保障数据处理流程的高效与稳定。