北京专业建网站的公司,wordpress 抓别人数据,丹阳网站怎么做seo,浪花直播历史文章迁移#xff0c;稍后整理 使用DataGenerator 提前进行压测#xff0c;了解数据的处理瓶颈、性能测试和消费能力 开启minibatch#xff1a;table.exec.mini-batch.enabled, true
开启LocalGlobal 两阶段聚合#xff1a;table.exec.m…历史文章迁移稍后整理 使用DataGenerator 提前进行压测了解数据的处理瓶颈、性能测试和消费能力 开启minibatchtable.exec.mini-batch.enabled, true
开启LocalGlobal 两阶段聚合table.exec.mini-batch.enabled, true
解决数据倾斜问题
流式倾斜开启minibatch
窗口类有界操作传统的两阶段聚合的方式
数据源分布就不均匀做reblance
针对大状态开启rocksdb
针对分区无数据导致watermark的窗口等不触发设置idle
利用paimon做中间存储既可以做批流复用olaplookup join 时把全量数据拉到rocksdb并且是分片存的效率很高缺点是有延迟会有join key miss的问题
暴力调优加内存调大并行度 设置空闲 State 保留时间 看情况设置不当会影响结果正确性
FlinkSql 可以指定空闲状态即未更新的状态被保留的最小时间当状态中的某个 Key 对应的状态未更新的时间达到阈值时这条状态会被自动清理 4.2 开启 MiniBatch
Flink 是流式数据处理没过来一条数据就会被直接处理
MiniBatch 是把流处理变为微批处理的方式先缓存一定的数据后在触发处理这样可以减少对 State 的访问、提升吞吐、有效减少输出数据量
但是会牺牲低延迟对超低延迟要求的场景不建议用常用在需要聚合的场景有显著的性能提升
// 开启 miniBatch
configuration.setString(table.exec.mini-batch.enabled, true);
// 批量输出的间隔时间
configuration.setString(table.exec.mini-batch.allow-latency, 5 s);
// 防止 OOM 设置每个批次最多缓存数据的条数可以设为 2 万条
configuration.setString(table.exec.mini-batch.size, 20000);
主要是依靠每个 Task 上注册的 Timer 线程Flink 的定时器来触发微批当然了是需要消耗一定的线程性能 4.3 开启 LocalGlobal
其实就是本地聚合Spark 的 reduceByKey 和 MR 的 Combine所以开启 LocalGlobal 必须开启 MiniBatch可以有效解决SUM的那个聚合函数数据倾斜的问题同时还能优化上游对下游的数据传输、以及下游聚合的压力
// 开启 LocalGlobal
configuration.setString(table.optimizer.agg-phase-strategy, TWO_PHASE);
如下图红色和紫色分别代表两个 Key 的数据进行聚合时的效果 4.4 开启 Split Distinct
LocalGlobal 的方式可以有效解决 SUM 等聚合函数数据倾斜的问题但是对于 Group 后的 Count ( Distinct )的热点问题没法解决
1. 以前我们手动打散的方案
SELECT a, SUM(cnt)
FROM (
SELECT a, COUNT(DISTINCT b) as cnt
FROM T
GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a
2. FlinkSql 自动实现了这部分功能只需要我们开启 Split Distinct 参数即可
// 设置参数(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString(table.optimizer.distinct-agg.split.enabled, true);
// 第一层打散的 bucket 数目
configuration.setString(table.optimizer.distinct-agg.split.bucket-num, 1024);
原理如下图红色和紫色仍然分别代表两个 Key 的数据但是红色的数据显然很多但是去重必须同一个 Key 的数据肯定在一个节点所以压力较大 4.5 Count ( Distinct ) 时可以用 Filter 代替 Case When
我们经常会写这样的 Sql如下会有 3 个状态实例
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN (A, B) THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN (C, D) THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a 而 FlinkSql 的优化器可以识别同一唯一键的不同 Filter 参数三个 COUNT DISTINCT 都作用在 b 列上我们可以利用 Filter 的这一特性Flink 可以只使用一个共享状态实例可减少状态的大小和对状态的访问
SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN (A, B)) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN (C, D)) AS CD_b
FROM T
GROUP BY a 解决数据倾斜、反压问题
lookup join 的优化避免性能较差的热查询
paimon属于链路的优化既可以数据重用重写了lookup join 减少checkpoint压力缺点是...
FlinkSql window tvf 本身也是一种优化 当使用细粒度的滑动窗口窗口长度远远大于滑动步长时重叠的窗口过多一个数据会属于多个窗口性能会急剧下降 比如 24h 的窗口3分钟滑动一次那么粒度就是 24 * 60 / 3 480 会导致两个问题 1. 状态 对于一个元素将其对应的keywindow写入 WindowState那意味着每个元素到来更新 WindowState 时都要遍历 480 个窗口然后写入开销很大即使用 RocksDBStateBackend 瓶颈也很明显 2. 定时器 了解过窗口函数原理的应该清楚每一个keywindow都需要注册两个定时器,而细粒度窗口会导致维护的定时器增多加重内存负担 一个是触发器注册的定时器用于决定窗口数据何时输出 第二个是 registerCleanupTimer() 注册的一个清理定时器用于窗口过期比如 allowedLateness 过期之后及时清理窗口的内部状态 这些都是通用的很多时候其实这些方式解决不了可以根据实际业务去探索某个业务的最佳方式
另外有时基于海量数据和业务要求的时效性和复杂度经常需要用到算子来处理