加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 02:39:04 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们提纲的向各人先容了什么是好SQL,SQL和Table API是Apache Flink中的统一条理的API抽象,如下图所示: Apache Flink 针对差异的用户场景提供了三层用户API,最基层ProcessFunction API

上面我们先容了Apache Flink Table API焦点算子的语义和详细示例,这部门将选取Bounded EventTime Tumble Window为例为各人编写一个完备的包罗Source和Sink界说的Apache Flink Table API Job。假设有一张淘宝页面会见表(PageAccess_tab),有区域,用户ID和会见时刻。我们必要按差异区域统计每2分钟的淘宝首页的会见量(PV)。详细数据如下:

Apache Flink 漫谈系列(13) - Table API 概述

1. Source 界说

自界说Apache Flink Stream Source必要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource要领获取DataStream, 以是我们必要自界说一个 SourceFunction, 而且要支持发生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

(1) Source Function界说

支持吸取携带EventTime的数据荟萃,Either的数据布局,Right暗示WaterMark和Left暗示数据:

  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  2. extends SourceFunction[T] { 
  3. override def run(ctx: SourceContext[T]): Unit = { 
  4. dataWithTimestampList.foreach { 
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  6. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  7. override def cancel(): Unit = ???} 

(2) 界说 StreamTableSource

我们自界说的Source要携带我们测试的数据,以及对应的WaterMark数据,详细如下:

  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { 
  2.  
  3. val fieldNames = Array("accessTime", "region", "userId") 
  4. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) 
  5. val rowType = new RowTypeInfo( 
  6. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], 
  7. fieldNames) 
  8.  
  9. // 页面会见表数据 rows with timestamps and watermarks 
  10. val data = Seq( 
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), 
  12. Right(1510365660000L), 
  13. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), 
  14. Right(1510365660000L), 
  15. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), 
  16. Right(1510366200000L), 
  17. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), 
  18. Right(1510366260000L), 
  19. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), 
  20. Right(1510373400000L) 
  21.  
  22. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { 
  23. Collections.singletonList(new RowtimeAttributeDescriptor( 
  24. "accessTime", 
  25. new ExistingField("accessTime"), 
  26. PreserveWatermarks.INSTANCE)) 
  27.  
  28. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { 
  29. execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1) 
  30.  
  31. override def getReturnType: TypeInformation[Row] = rowType 
  32.  
  33. override def getTableSchema: TableSchema = schema 
  34.  

(3) Sink 界说

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读