上面我们先容了Apache Flink Table API焦点算子的语义和详细示例,这部门将选取Bounded EventTime Tumble Window为例为各人编写一个完备的包罗Source和Sink界说的Apache Flink Table API Job。假设有一张淘宝页面会见表(PageAccess_tab),有区域,用户ID和会见时刻。我们必要按差异区域统计每2分钟的淘宝首页的会见量(PV)。详细数据如下:

1. Source 界说
自界说Apache Flink Stream Source必要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource要领获取DataStream, 以是我们必要自界说一个 SourceFunction, 而且要支持发生WaterMark,也就是要实现DefinedRowtimeAttributes接口。
(1) Source Function界说
支持吸取携带EventTime的数据荟萃,Either的数据布局,Right暗示WaterMark和Left暗示数据:
- class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
- extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???}
(2) 界说 StreamTableSource
我们自界说的Source要携带我们测试的数据,以及对应的WaterMark数据,详细如下:
- class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
-
- val fieldNames = Array("accessTime", "region", "userId")
- val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
- val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
- fieldNames)
-
- // 页面会见表数据 rows with timestamps and watermarks
- val data = Seq(
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
- Right(1510365660000L),
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
- Right(1510365660000L),
- Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
- Right(1510366200000L),
- Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
- Right(1510366260000L),
- Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
- Right(1510373400000L)
- )
-
- override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
- Collections.singletonList(new RowtimeAttributeDescriptor(
- "accessTime",
- new ExistingField("accessTime"),
- PreserveWatermarks.INSTANCE))
- }
-
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
- }
-
- override def getReturnType: TypeInformation[Row] = rowType
-
- override def getTableSchema: TableSchema = schema
-
- }
(3) Sink 界说 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|