加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 02:39:04 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们提纲的向各人先容了什么是好SQL,SQL和Table API是Apache Flink中的统一条理的API抽象,如下图所示: Apache Flink 针对差异的用户场景提供了三层用户API,最基层ProcessFunction API

WordCount焦点统计逻辑就是凭证单词分组,然后计较每个单词的数目,统计逻辑如下:

  1. // 单词统计焦点逻辑 
  2. val result = source 
  3. .groupBy('word) // 单词分组 
  4. .select('word, 'word.count) // 单词统计 

(4) 界说Sink

将WordCount的统计功效写入Sink中,代码如下:

  1. // 自界说Sink 
  2. val sink = new RetractSink // 自界说Sink(下面有完备代码) 
  3. // 计较功效写入sink 
  4. result.toRetractStream[(String, Long)].addSink(sink) 

(5) 完备的HelloWord代码

为了利便各人运行WordCount查询统计,将完备的代码分享各人(基于flink-1.7.0),如下:

  1. import org.apache.flink.api.scala._ 
  2. import org.apache.flink.configuration.Configuration 
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} 
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  5. import org.apache.flink.table.api.TableEnvironment 
  6. import org.apache.flink.table.api.scala._ 
  7.  
  8. import scala.collection.mutable 
  9.  
  10. object HelloWord { 
  11.  
  12. def main(args: Array[String]): Unit = { 
  13. // 测试数据 
  14. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob") 
  15.  
  16. // Stream运行情形 
  17. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18. val tEnv = TableEnvironment.getTableEnvironment(env) 
  19. // 最简朴的获取Source方法 
  20. val source = env.fromCollection(data).toTable(tEnv, 'word) 
  21.  
  22. // 单词统计焦点逻辑 
  23. val result = source 
  24. .groupBy('word) // 单词分组 
  25. .select('word, 'word.count) // 单词统计 
  26.  
  27. // 自界说Sink 
  28. val sink = new RetractSink 
  29. // 计较功效写入sink 
  30. result.toRetractStream[(String, Long)].addSink(sink) 
  31.  
  32. env.execute 
  33.  
  34. class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] { 
  35. private var resultSet: mutable.Set[(String, Long)] = _ 
  36.  
  37. override def open(parameters: Configuration): Unit = { 
  38. // 初始化内存存储布局 
  39. resultSet = new mutable.HashSet[(String, Long)] 
  40.  
  41. override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = { 
  42. if (v._1) { 
  43. // 计较数据 
  44. resultSet.add(v._2) 
  45. else { 
  46. // 撤回数据 
  47. resultSet.remove(v._2) 
  48.  
  49. override def close(): Unit = { 
  50. // 打印写入sink的功效数据 
  51. resultSet.foreach(println) 

运行功效如下:

Apache Flink 漫谈系列(13) - Table API 概述

固然上面用了较长的纸墨先容简朴的WordCount统计逻辑,但source和sink部门都是可以在进修后头算子中被复用的。本例焦点的统计逻辑只有一行代码:

  1. source.groupBy('word).select('word, 'word.count) 

以是Table API开拓技能使命很是的简捷高效。

四、Table API 算子

固然Table API与SQL的算子语义同等,但在表达方法上面SQL以文本的方法揭示,Table API是以java可能scala说话的方法举办开拓。为了各人利便阅读,即即是在《Apache Flink 漫谈系列(08) - SQL概览》中先容过的算子,在这里也会再次举办先容,虽然对付Table API和SQL差异的处所会举办细致先容。

1. 示例数据及测试类

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读