加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

发布时间:2019-01-19 12:08:17 所属栏目:教程 来源:孙金城
导读:一、聊什么 为了满意本系列读者的需求,我先先容一下Kafka在Apache Flink中的行使。以是本篇以一个简朴的示例,向各人先容在Apache Flink中怎样行使Kafka。 二、Kafka 简介 Apache Kafka是一个漫衍式宣布-订阅动静转达体系。 它最初由LinkedIn公司开拓,Li

要行使内置的Schemas必要添加如下依靠:

  1. <dependency> 
  2. <groupId>org.apache.flink</groupId> 
  3. <artifactId>flink-avro</artifactId> 
  4. <version>1.7.0</version> 
  5. </dependency> 

(3) 读取位置设置

我们在斲丧Kafka数据时辰,也许必要指定斲丧的位置,Apache Flink 的FlinkKafkaConsumer提供许多便利的位置配置,如下:

  • consumer.setStartFromEarliest() - 从最早的记录开始;
  • consumer.setStartFromLatest() - 从最新记录开始;
  • consumer.setStartFromTimestamp(...); // 从指定的epoch时刻戳(毫秒)开始;
  • consumer.setStartFromGroupOffsets(); // 默认举动,以前次斲丧的偏移量举办继承斲丧。

上面的位置指定可以准确到每个分区,好比如下代码:

  1. Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); 
  2. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始 
  3. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始 
  4. specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始 
  5.  
  6. consumer.setStartFromSpecificOffsets(specificStartOffsets); 

对付没有指定的分区照旧默认的setStartFromGroupOffsets方法。

(4) Topic发明

Kafka支持Topic自动发明,也就是用正则的方法建设FlinkKafkaConsumer,好比:

  1. // 建设斲丧者 
  2. FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")), 
  3. new KafkaMsgSchema(), 
  4. p); 

在上面的示例中,看成业开始运行时,斲丧者将订阅名称与指定正则表达式匹配的全部Topic(以sourceTopic的值开头并以单个数字末了)。

3. 界说Watermark(Window)

对Kafka Connector的应用不只限于上面的简朴数据提取,我们更多时辰是祈望对Kafka数据举办Event-time的窗口操纵,那么就必要在Flink Kafka Source中界说Watermark。

要界说Event-time,起首是Kafka数据内里携带时刻属性,假设我们数据是String#Long的名目,如only for test#1000。那么我们将Long作为时刻列。

  • KafkaWithTsMsgSchema - 完备代码

要想理会上面的Kafka的数据名目,我们必要开拓一个自界说的Schema,好比叫KafkaWithTsMsgSchema,将String#Long理会为一个Java的Tuple2

  1. import org.apache.flink.api.common.serialization.DeserializationSchema; 
  2. import org.apache.flink.api.common.serialization.SerializationSchema; 
  3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
  4. import org.apache.flink.api.common.typeinfo.TypeInformation; 
  5. import org.apache.flink.api.java.tuple.Tuple2; 
  6. import org.apache.flink.api.java.typeutils.TupleTypeInfo; 
  7. import org.apache.flink.util.Preconditions; 
  8.  
  9. import java.io.IOException; 
  10. import java.io.ObjectInputStream; 
  11. import java.io.ObjectOutputStream; 
  12. import java.nio.charset.Charset; 
  13.  
  14. public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> { 
  15.     private static final long serialVersionUID = 1L; 
  16.     private transient Charset charset; 
  17.  
  18.     public KafkaWithTsMsgSchema() { 
  19.         this(Charset.forName("UTF-8")); 
  20.     } 
  21.  
  22.     public KafkaWithTsMsgSchema(Charset charset) { 
  23.         this.charset = Preconditions.checkNotNull(charset); 
  24.     } 
  25.  
  26.     public Charset getCharset() { 
  27.         return this.charset; 
  28.     } 
  29.  
  30.     public Tuple2<String, Long> deserialize(byte[] message) { 
  31.         String msg = new String(message, charset); 
  32.         String[] dataAndTs = msg.split("#"); 
  33.         if(dataAndTs.length == 2){ 
  34.             return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim())); 
  35.         }else{ 
  36.             // 现实出产上必要抛出runtime非常 
  37.             System.out.println("Fail due to invalid msg format.. ["+msg+"]"); 
  38.             return new Tuple2<String, Long>(msg, 0L); 
  39.         } 
  40.     } 
  41.  
  42.     @Override 
  43.     public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) { 
  44.         return false; 
  45.     } 
  46.  
  47.     public byte[] serialize(Tuple2<String, Long> element) { 
  48.         return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset); 
  49.     } 
  50.  
  51.     private void writeObject(ObjectOutputStream out) throws IOException { 
  52.         out.defaultWriteObject(); 
  53.         out.writeUTF(this.charset.name()); 
  54.     } 
  55.  
  56.     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
  57.         in.defaultReadObject(); 
  58.         String charsetName = in.readUTF(); 
  59.         this.charset = Charset.forName(charsetName); 
  60.     } 
  61.  
  62.     @Override 
  63.     public TypeInformation<Tuple2<String, Long>> getProducedType() { 
  64.         return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); 
  65.     }} 
  • Watermark天生

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读