接下来我们以图示的方法直观声名Interval JOIN的语义,我们对上面的示例需求轻微变革一下: 订单可以预付款(不管是否公道,我们只是为了声名语义)也就是订单 前后 1小时的付款都是有用的。SQL语句如下:
- SELECT
- ...
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND
- p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
- orderTime + INTERVAL '1' HOUR
这样的查询语义表示图如下:

上图有几个要害点,如下:
- 数据JOIN的区间 - 好比Order时刻为3的订单会在付款时刻为[2, 4]区间举办JOIN。
- WaterMark - 好比图示Order最后一条数据时刻是3,Payment最后一条数据时刻是5,那么WaterMark是按照现实最小值减去UpperBound天生,即:Min(3,5)-1 = 2
- 逾期数据 - 出于机能和存储的思量,要将逾期数据破除,如图当WaterMark是2的时辰时刻为2早年的数据逾期了,可以被破除。
3. Interval JOIN 实现道理
因为Interval JOIN和双流JOIN相同都要存储阁下双方的数据,以是底层实现中如故是操作State举办数据的存储。流计较的特点是数据不断的流入,我们可以不断的举办增量计较,也就是我们每条数据流入都可以举办JOIN计较。我们照旧以详细示例和图示来声名内部计较逻辑,如下图:

简朴表明一下每笔记录的处理赏罚逻辑如下:

现实的内部逻辑会比描写的伟大的多,各人可以按照如上简述领略内部道理即可。
四、示例代码
我们照旧以订单和付款示例,将完备代码分享给各人,详细如下(代码基于flink-1.7.0):
- import java.sql.Timestamp
-
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
-
- import scala.collection.mutable
-
- object SimpleTimeIntervalJoin {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 结构订单数据
- val ordersData = new mutable.MutableList[(String, String, Timestamp)]
- ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
- ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
- ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
- ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))
-
- // 结构付款表
- val paymentData = new mutable.MutableList[(String, String, Timestamp)]
- paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
- paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
- paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
- paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
- val orders = env
- .fromCollection(ordersData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
- val ratesHistory = env
- .fromCollection(paymentData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
-
- tEnv.registerTable("Orders", orders)
- tEnv.registerTable("Payment", ratesHistory)
-
- var sqlQuery =
- """
- |SELECT
- | o.orderId,
- | o.productName,
- | p.payType,
- | o.orderTime,
- | cast(payTime as timestamp) as payTime
- |FROM
- | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND
- | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
- |""".stripMargin
- tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
-
- val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
- result.print()
- env.execute()
- }
-
- }
-
- class TimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|