牛商网网站建设,子域名网站二级,建站小程序编辑器闪亮登场,娄底市建设局网站Flink SQL支持对动态表进行复杂而灵活的连接操作#xff0c;本文为您介绍如何使用双流JOIN语句。
背景信息
实时计算的JOIN和传统批处理JOIN的语义一致#xff0c;都用于将两张表关联起来。区别为实时计算关联的是两张动态表#xff0c;关联的结果也会动态更新#xff0c…Flink SQL支持对动态表进行复杂而灵活的连接操作本文为您介绍如何使用双流JOIN语句。
背景信息
实时计算的JOIN和传统批处理JOIN的语义一致都用于将两张表关联起来。区别为实时计算关联的是两张动态表关联的结果也会动态更新以保证最终结果和批处理结果一致。 双流JOIN语法 tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpressionjoinCondition:
ON booleanExpression
| USING ( column [, column ]* ) tableReference表名称。 tableExpression表达式。 joinConditionJOIN条件。
双流JOIN hints
从实时计算引擎VVR 8.0.1 开始您可以通过提示Hints单独为双流JOIN的左右流状态设置不同生命周期 (TTL)来减少维护的状态大小。 语法 -- VVR 8.0.1 开始
SELECT /* JOIN_STATE_TTL(tableReference1 ttl1 [, tableReference2 ttl2]*) */ ...-- VVR 8.0.7 开始您也可以使用社区的Join State TTL Hint语法
SELECT /* STATE_TTL(tableReference1 ttl1 [, tableReference2 ttl2]*) */ ... 注意事项 JOIN STATE TTL HINT仅支持在双流JOIN场景使用不支持维表JOIN、Interval Join或Window Join。 若双流JOIN时JOIN STATE TTL HINT仅指定某一条流的在JOIN节点的状态生命周期则另外一条流的状态生命周期使用Flink SQL作业级别的状态生命周期由table.exec.state.ttl控制参见基本配置默认值为1.5天。 tableReference支持表名视图名和别名一旦为表名指定别名时则需使用别名。 这是一个实验性质的特性HINT语法未来可能会发生变化。 示例 -- HINT使用别名
SELECT /* JOIN_STATE_TTL(o 3d, p 1d) */o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p
ON o.productid p.productid;
-- VVR 8.0.7及以上版本也可以使用新语法
SELECT /* STATE_TTL(o 3d, p 1d) */o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p
ON o.productid p.productid;-- HINT使用表名
SELECT /* JOIN_STATE_TTL(Orders 3d, Products 1d) */ *FROM OrdersJOIN Products
ON Orders.productid Products.productid;
-- VVR 8.0.7及以上版本也可以使用新语法
SELECT /* STATE_TTL(Orders 3d, Products 1d) */ *FROM OrdersJOIN Products
ON Orders.productid Products.productid;-- HINT使用视图名
CREATE TEMPORARY VIEW v AS
SELECT id, ...FROM (SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rnFROM src1WHERE ...) tmp
WHERE rn 1;SELECT /* JOIN_STATE_TTL(v 1d, b 3d) */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id b.id;
-- VVR 8.0.7及以上版本也可以使用新语法
SELECT /* STATE_TTL(v 1d, b 3d) */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id b.id;
Orders JOIN Products表的数据示例 测试数据 表 1. Orders rowtime productid orderid units 10:17:00 30 5 4 10:17:05 10 6 1 10:18:05 20 7 2 10:18:07 30 8 20 11:02:00 10 9 6 11:04:00 10 10 1 11:09:30 40 11 12 11:24:11 10 12 4 表 2. Products productid name unitprice 30 Cheese 17 10 Beer 0.25 20 Wine 6 30 Cheese 17 10 Beer 0.25 10 Beer 0.25 40 Bread 100 10 Beer 0.25 测试语句 SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p
ON o.productid p.productid; 测试结果 o.rowtime o.productid o.orderid o.units p.name p.unitprice 10:17:00 30 5 4 Cheese 17.00 10:17:00 30 5 4 Cheese 17.00 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:18:05 20 7 2 Wine 6.00 10:18:07 30 8 20 Cheese 17.00 10:18:07 30 8 20 Cheese 17.00 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:09:30 40 11 12 Bread 100.00 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25
datahub_stream1 JOIN datahub_stream2表的数据示例 测试数据 表 3. datahub_stream1 aBIGINT bBIGINT cVARCHAR 0 10 test11 1 10 test21 表 4. datahub_stream2 aBIGINT bBIGINT cVARCHAR 0 10 test11 1 10 test21 0 10 test31 1 10 test41 测试语句 SELECT s1.c,s2.c
FROM datahub_stream1 AS s1
JOIN datahub_stream2 AS s2
ON s1.a s2.a
WHERE s1.a 0; 测试结果 s1.cVARCHAR s2.cVARCHAR test11 test11 test11 test31