我们简朴的将计较功效写入到Apache Flink内置支持的CSVSink中,界说Sink如下:
- def getCsvTableSink: TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- // 打印sink的文件路径,利便我们查察运行功效
- println("Sink path : " + tempFile)
- if (tempFile.exists()) {
- tempFile.delete()
- }
- new CsvTableSink(tempFile.getAbsolutePath).configure(
- Array[String]("region", "winStart", "winEnd", "pv"),
- Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))}
2. 构建主措施
主措施包罗执行情形的界说,Source/Sink的注册以及统计查SQL的执行,详细如下:
- def main(args: Array[String]): Unit = {
- // Streaming 情形
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- // 配置EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- //利便我们查出输出数据
- env.setParallelism(1)
-
- val sourceTableName = "mySource"
- // 建设自界说source数据布局
- val tableSource = new MyTableSource
-
- val sinkTableName = "csvSink"
- // 建设CSV sink 数据布局
- val tableSink = getCsvTableSink
-
- // 注册source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注册sink
- tEnv.registerTableSink(sinkTableName, tableSink)
-
- val result = tEnv.scan(sourceTableName)
- .window(Tumble over 2.minute on 'accessTime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
-
- result.insertInto(sinkTableName)
- env.execute()
- }
3. 执行并查察运行功效
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|