加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

发布时间:2019-01-18 02:47:36 所属栏目:教程 来源:孙金城
导读:一、说什么 JOIN 算子是数据处理赏罚的焦点算子,前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》先容了UnBounded的双流JOIN,在《Apache Flink 漫谈系列(10) - JOIN LATERAL》先容了单流与UDTF的JOIN操纵,在《Apache Flink 漫谈系列(11) - Temporal Ta

接下来我们以图示的方法直观声名Interval JOIN的语义,我们对上面的示例需求轻微变革一下: 订单可以预付款(不管是否公道,我们只是为了声名语义)也就是订单 前后 1小时的付款都是有用的。SQL语句如下:

  1. SELECT 
  2. ... 
  3. FROM 
  4. Orders AS o JOIN Payment AS p ON 
  5. o.orderId = p.orderId AND 
  6. p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND 
  7. orderTime + INTERVAL '1' HOUR 

这样的查询语义表示图如下:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

上图有几个要害点,如下:

  • 数据JOIN的区间 - 好比Order时刻为3的订单会在付款时刻为[2, 4]区间举办JOIN。
  • WaterMark - 好比图示Order最后一条数据时刻是3,Payment最后一条数据时刻是5,那么WaterMark是按照现实最小值减去UpperBound天生,即:Min(3,5)-1 = 2
  • 逾期数据 - 出于机能和存储的思量,要将逾期数据破除,如图当WaterMark是2的时辰时刻为2早年的数据逾期了,可以被破除。

3. Interval JOIN 实现道理

因为Interval JOIN和双流JOIN相同都要存储阁下双方的数据,以是底层实现中如故是操作State举办数据的存储。流计较的特点是数据不断的流入,我们可以不断的举办增量计较,也就是我们每条数据流入都可以举办JOIN计较。我们照旧以详细示例和图示来声名内部计较逻辑,如下图:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

简朴表明一下每笔记录的处理赏罚逻辑如下:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

现实的内部逻辑会比描写的伟大的多,各人可以按照如上简述领略内部道理即可。

四、示例代码

我们照旧以订单和付款示例,将完备代码分享给各人,详细如下(代码基于flink-1.7.0):

  1. import java.sql.Timestamp 
  2.  
  3. import org.apache.flink.api.scala._ 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8. import org.apache.flink.table.api.TableEnvironment 
  9. import org.apache.flink.table.api.scala._ 
  10. import org.apache.flink.types.Row 
  11.  
  12. import scala.collection.mutable 
  13.  
  14. object SimpleTimeIntervalJoin { 
  15. def main(args: Array[String]): Unit = { 
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17. val tEnv = TableEnvironment.getTableEnvironment(env) 
  18. env.setParallelism(1) 
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  20. // 结构订单数据 
  21. val ordersData = new mutable.MutableList[(String, String, Timestamp)] 
  22. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) 
  23. ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) 
  24. ordersData.+=(("003", "book", new Timestamp(1545800004000L))) 
  25. ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) 
  26.  
  27. // 结构付款表 
  28. val paymentData = new mutable.MutableList[(String, String, Timestamp)] 
  29. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) 
  30. paymentData.+=(("002", "card", new Timestamp(1545803602000L))) 
  31. paymentData.+=(("003", "card", new Timestamp(1545803610000L))) 
  32. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) 
  33. val orders = env 
  34. .fromCollection(ordersData) 
  35. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  36. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) 
  37. val ratesHistory = env 
  38. .fromCollection(paymentData) 
  39. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  40. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) 
  41.  
  42. tEnv.registerTable("Orders", orders) 
  43. tEnv.registerTable("Payment", ratesHistory) 
  44.  
  45. var sqlQuery = 
  46. """ 
  47. |SELECT 
  48. | o.orderId, 
  49. | o.productName, 
  50. | p.payType, 
  51. | o.orderTime, 
  52. | cast(payTime as timestamp) as payTime 
  53. |FROM 
  54. | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND 
  55. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR 
  56. |""".stripMargin 
  57. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) 
  58.  
  59. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  60. result.print() 
  61. env.execute() 
  62.  
  63.  
  64. class TimestampExtractor[T1, T2] 
  65. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  66. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  67. element._3.getTime 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读