副问题[/!--empirenews.page--]
一、什么是Table API
在《Apache Flink 漫谈系列(08) - SQL概览》中我们提纲的向各人先容了什么是好SQL,SQL和Table API是Apache Flink中的统一条理的API抽象,如下图所示:

Apache Flink 针对差异的用户场景提供了三层用户API,最基层ProcessFunction API可以对State,Timer等伟大机制举办有用的节制,但用户行使的便捷性很弱,也就是说纵然很简朴统计逻辑,也要较多的代码开拓。第二层DataStream API对窗口,聚合等算子举办了封装,用户的便捷性有所加强。最上层是SQL/Table API,Table API是Apache Flink中的声明式,可被查询优化器优化的高级说明API。
二、Table API的特点
Table API和SQL都是Apache Flink中最高层的说明API,SQL所具备的特点Table API也都具有,如下:
- 声明式 - 用户只体谅做什么,不消体谅怎么做;
- 高机能 - 支持查询优化,可以获取最好的执行机能;
- 流批同一 - 沟通的统计逻辑,既可以流模式运行,也可以批模式运行;
- 尺度不变 - 语义遵循SQL尺度,语法语义明晰,不易变换。
虽然除了SQL的特征,由于Table API是在Flink中专门计划的,以是Table API还具有自身的特点:
- 表达方法的扩展性 - 在Flink中可觉得Table API开拓许多便捷性成果,如:Row.flatten(), map/flatMap 等
- 成果的扩展性 - 在Flink中可觉得Table API扩展更多的成果,如:Iteration,flatAggregate 等新成果
- 编译搜查 - Table API支持java和scala说话开拓,支持IDE中举办编译搜查。
声名:上面说的map/flatMap/flatAggregate都是Apache Flink 社区 FLIP-29 中筹划的新成果。
三、HelloWorld
在先容Table API全部算子之前我们先编写一个简朴的HelloWorld来直观相识怎样举办Table API的开拓。
1. Maven 依靠
在pom文件中增进如下设置,本篇以flink-1.7.0成果为准举办后续先容。
- <properties>
- <table.version>1.7.0</table.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.11</artifactId>
- <version>${table.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.11</artifactId>
- <version>${table.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.11</artifactId>
- <version>${table.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>${table.version}</version>
- </dependency>
-
- </dependencies>
2. 措施布局
在编写第一Flink Table API job之前我们先简朴相识一下Flink Table API job的布局,如下图所示:

- 外部数据源,好比Kafka, Rabbitmq, CSV 等等;
- 查询计较逻辑,好比最简朴的数据导入select,双流Join,Window Aggregate 等;
- 外部功效存储,好比Kafka,Cassandra,CSV等。
声名:1和3 在Apache Flink中统称为Connector。
3. 主措施
我们以一个统计单词数目的营业场景,编写第一个HelloWorld措施。
按照上面Flink job根基布局先容,要Table API完成WordCount的计较需求,我们必要完成三部门代码:
- TableSoruce Code - 用于建设数据源的代码
- Table API Query - 用于举办word count统计的Table API 查询逻辑
- TableSink Code - 用于生涯word count计较功效的功效表代码
(1) 运行模式选择
一个job我们要选择是Stream方法运行照旧Batch模式运行,以是任何统计job的第一步是举办运行模式选择,如下我们选择Stream方法运行。
- // Stream运行情形
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
(2) 构建测试Source
我们用最简朴的构建Source方法举办本次测试,代码如下:
- // 测试数据
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
- // 最简朴的获取Source方法
- val source = env.fromCollection(data).toTable(tEnv, 'word)
(3) WordCount 统计逻辑
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|