我们结构一个只包括一个data字段的用户表,用户表数据如下:

查询的需求是将data字段flatten成为name和age两个字段的表,祈望获得:

我们以ITCase方法完成如上查询需求,完备代码如下:
- @Test
- def testLateralTVF(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
-
- val userData = new mutable.MutableList[(String)]
- userData.+=(("Sunny#8"))
- userData.+=(("Kevin#36"))
- userData.+=(("Panpan#36"))
-
- val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
-
- val users = env.fromCollection(userData).toTable(tEnv, 'data)
-
- val tvf = new SplitTVF()
- tEnv.registerTable("userTab", users)
- tEnv.registerFunction("splitTVF", tvf)
-
- val result = tEnv.SQLQuery(SQLQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- StreamITCase.testResults.foreach(println(_))
- }
运行功效:

上面的焦点语句是:
- val SQLQuery = "SELECT data, name, age FROM userTab, LATERAL TABLE(splitTVF(data)) AS T(name, age)"
假如各人想运行上面的示例,请查阅《Apache Flink 漫谈系列 - SQL概览》中 源码方法 搭建测试情形。
六、小结
本篇重点向各人先容了一种新的JOIN范例 - JOIN LATERAL。并向各人先容了SQL Server中对LATERAL的支持方法,具体说明白JOIN LATERAL和INNER JOIN的区别与接洽,最后切入到Apache Flink中,以UDTF示例声名白Apache Flink中对JOIN LATERAL的支持,后续篇章会先容Apache Flink中另一种行使LATERAL的场景,就是Temporal JION,Temporal JION也是一种新的JOIN范例,我们下一篇再会!
关于点赞和评述
本系列文章不免有许多缺陷和不敷,真诚但愿读者对有收成的篇章给以点赞勉励,对有不敷的篇章给以反馈和提议,先行感激各人!
作者:孙金城,混名 金竹,今朝就职于阿里巴巴,自2015年以来一向投入于基于Apache Flink的阿里巴巴计较平台Blink的计划研发事变。
【本文为51CTO专栏作者“金竹”原创稿件,转载请接洽原作者】
【编辑保举】
- Apache Flink 漫谈系列 - Fault Tolerance
- Apache Flink 漫谈系列 - 流表对偶(duality)性
- Apache Flink 漫谈系列 - 一连查询(Continuous Queries)
- Apache Flink 漫谈系列 - SQL概览
- Apache Flink 漫谈系列 - JOIN 算子
【责任编辑:赵宁宁 TEL:(010)68476606】
点赞 0 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|