加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 02:39:04 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们提纲的向各人先容了什么是好SQL,SQL和Table API是Apache Flink中的统一条理的API抽象,如下图所示: Apache Flink 针对差异的用户场景提供了三层用户API,最基层ProcessFunction API

我们简朴的将计较功效写入到Apache Flink内置支持的CSVSink中,界说Sink如下:

  1. def getCsvTableSink: TableSink[Row] = { 
  2. val tempFile = File.createTempFile("csv_sink_", "tem") 
  3. // 打印sink的文件路径,利便我们查察运行功效 
  4. println("Sink path : " + tempFile) 
  5. if (tempFile.exists()) { 
  6. tempFile.delete() 
  7. new CsvTableSink(tempFile.getAbsolutePath).configure( 
  8. Array[String]("region", "winStart", "winEnd", "pv"), 
  9. Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))} 

2. 构建主措施

主措施包罗执行情形的界说,Source/Sink的注册以及统计查SQL的执行,详细如下:

  1. def main(args: Array[String]): Unit = { 
  2. // Streaming 情形 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5.  
  6. // 配置EventTime 
  7. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  8.  
  9. //利便我们查出输出数据 
  10. env.setParallelism(1) 
  11.  
  12. val sourceTableName = "mySource" 
  13. // 建设自界说source数据布局 
  14. val tableSource = new MyTableSource 
  15.  
  16. val sinkTableName = "csvSink" 
  17. // 建设CSV sink 数据布局 
  18. val tableSink = getCsvTableSink 
  19.  
  20. // 注册source 
  21. tEnv.registerTableSource(sourceTableName, tableSource) 
  22. // 注册sink 
  23. tEnv.registerTableSink(sinkTableName, tableSink) 
  24.  
  25. val result = tEnv.scan(sourceTableName) 
  26. .window(Tumble over 2.minute on 'accessTime as 'w) 
  27. .groupBy('w, 'region) 
  28. .select('region, 'w.start, 'w.end, 'region.count as 'pv) 
  29.  
  30. result.insertInto(sinkTableName) 
  31. env.execute() 

3. 执行并查察运行功效

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读