加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-15 16:12:14 所属栏目:教程 来源:孙金城
导读:一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计较机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年月早期从 Early History of SQL 中相识相关模子后在IBM开拓的。该版本最初称为[SEQUEL: A Structured English Query Lang

自界说Apache Flink Stream Source必要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource要领获取DataStream, 以是我们必要自界说一个 SourceFunction, 而且要支持发生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

(1) Source Function界说

支持吸取携带EventTime的数据荟萃,Either的数据布局,Right暗示WaterMark和Left暗示数据:

  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  2. extends SourceFunction[T] { 
  3. override def run(ctx: SourceContext[T]): Unit = { 
  4. dataWithTimestampList.foreach { 
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  6. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  7. override def cancel(): Unit = ???} 

(2) 界说 StreamTableSource

我们自界说的Source要携带我们测试的数据,以及对应WaterMark数据,详细如下:

  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { 
  2.  
  3. val fieldNames = Array("accessTime", "region", "userId") 
  4. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) 
  5. val rowType = new RowTypeInfo( 
  6. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], 
  7. fieldNames) 
  8.  
  9. // 页面会见表数据 rows with timestamps and watermarks 
  10. val data = Seq( 
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), 
  12. Right(1510365660000L), 
  13. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), 
  14. Right(1510365660000L), 
  15. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), 
  16. Right(1510366200000L), 
  17. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), 
  18. Right(1510366260000L), 
  19. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), 
  20. Right(1510373400000L) 
  21.  
  22. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { 
  23. Collections.singletonList(new RowtimeAttributeDescriptor( 
  24. "accessTime", 
  25. new ExistingField("accessTime"), 
  26. PreserveWatermarks.INSTANCE)) 
  27.  
  28. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { 
  29. execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType) 
  30.  
  31. override def getReturnType: TypeInformation[Row] = rowType 
  32.  
  33. override def getTableSchema: TableSchema = schema 
  34.  

2. Sink 界说

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读