|
在运行上面代码之前必要留意上面代码中对EventTime时刻提取的进程,也就是说Apache Flink的TimeCharacteristic.EventTime 模式,必要挪用assignTimestampsAndWatermarks要领配置EventTime的天生方法,这种方法也很是机动,用户可以节制营业数据的EventTime的值和WaterMark的发生,WaterMark相干内容可以查阅《Apache Flink 漫谈系列(03) - Watermark》。 在本示例中提取EventTime的完备代码如下:
- import java.SQL.Timestamp
-
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.windowing.time.Time
-
- class OrderTimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
查察运行功效:

5. With CSVConnector 实当代码
在现实的出产开拓中,都必要现实的Connector的界说,下面我们以CSV名目标Connector界说来开拓Temporal Table JOIN Demo。
(1) genEventRatesHistorySource
- def genEventRatesHistorySource: CsvTableSource = {
-
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 测试数据写入姑且文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
-
- // 建设Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )}
(2) genRatesOrderSource
- def genRatesOrderSource: CsvTableSource = {
-
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 测试数据写入姑且文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
-
- // 建设Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|