WordCount焦点统计逻辑就是凭证单词分组,然后计较每个单词的数目,统计逻辑如下:
- // 单词统计焦点逻辑
- val result = source
- .groupBy('word) // 单词分组
- .select('word, 'word.count) // 单词统计
(4) 界说Sink
将WordCount的统计功效写入Sink中,代码如下:
- // 自界说Sink
- val sink = new RetractSink // 自界说Sink(下面有完备代码)
- // 计较功效写入sink
- result.toRetractStream[(String, Long)].addSink(sink)
(5) 完备的HelloWord代码
为了利便各人运行WordCount查询统计,将完备的代码分享各人(基于flink-1.7.0),如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
-
- import scala.collection.mutable
-
- object HelloWord {
-
- def main(args: Array[String]): Unit = {
- // 测试数据
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
-
- // Stream运行情形
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- // 最简朴的获取Source方法
- val source = env.fromCollection(data).toTable(tEnv, 'word)
-
- // 单词统计焦点逻辑
- val result = source
- .groupBy('word) // 单词分组
- .select('word, 'word.count) // 单词统计
-
- // 自界说Sink
- val sink = new RetractSink
- // 计较功效写入sink
- result.toRetractStream[(String, Long)].addSink(sink)
-
- env.execute
- }
- }
-
- class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
- private var resultSet: mutable.Set[(String, Long)] = _
-
- override def open(parameters: Configuration): Unit = {
- // 初始化内存存储布局
- resultSet = new mutable.HashSet[(String, Long)]
- }
-
- override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
- if (v._1) {
- // 计较数据
- resultSet.add(v._2)
- }
- else {
- // 撤回数据
- resultSet.remove(v._2)
- }
- }
-
- override def close(): Unit = {
- // 打印写入sink的功效数据
- resultSet.foreach(println)
- }
- }
运行功效如下:

固然上面用了较长的纸墨先容简朴的WordCount统计逻辑,但source和sink部门都是可以在进修后头算子中被复用的。本例焦点的统计逻辑只有一行代码:
- source.groupBy('word).select('word, 'word.count)
以是Table API开拓技能使命很是的简捷高效。
四、Table API 算子
固然Table API与SQL的算子语义同等,但在表达方法上面SQL以文本的方法揭示,Table API是以java可能scala说话的方法举办开拓。为了各人利便阅读,即即是在《Apache Flink 漫谈系列(08) - SQL概览》中先容过的算子,在这里也会再次举办先容,虽然对付Table API和SQL差异的处所会举办细致先容。
1. 示例数据及测试类
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|