要行使内置的Schemas必要添加如下依靠:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>1.7.0</version>
- </dependency>
(3) 读取位置设置
我们在斲丧Kafka数据时辰,也许必要指定斲丧的位置,Apache Flink 的FlinkKafkaConsumer提供许多便利的位置配置,如下:
- consumer.setStartFromEarliest() - 从最早的记录开始;
- consumer.setStartFromLatest() - 从最新记录开始;
- consumer.setStartFromTimestamp(...); // 从指定的epoch时刻戳(毫秒)开始;
- consumer.setStartFromGroupOffsets(); // 默认举动,以前次斲丧的偏移量举办继承斲丧。
上面的位置指定可以准确到每个分区,好比如下代码:
- Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始
-
- consumer.setStartFromSpecificOffsets(specificStartOffsets);
对付没有指定的分区照旧默认的setStartFromGroupOffsets方法。
(4) Topic发明
Kafka支持Topic自动发明,也就是用正则的方法建设FlinkKafkaConsumer,好比:
- // 建设斲丧者
- FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
- new KafkaMsgSchema(),
- p);
在上面的示例中,看成业开始运行时,斲丧者将订阅名称与指定正则表达式匹配的全部Topic(以sourceTopic的值开头并以单个数字末了)。
3. 界说Watermark(Window)
对Kafka Connector的应用不只限于上面的简朴数据提取,我们更多时辰是祈望对Kafka数据举办Event-time的窗口操纵,那么就必要在Flink Kafka Source中界说Watermark。
要界说Event-time,起首是Kafka数据内里携带时刻属性,假设我们数据是String#Long的名目,如only for test#1000。那么我们将Long作为时刻列。
- KafkaWithTsMsgSchema - 完备代码
要想理会上面的Kafka的数据名目,我们必要开拓一个自界说的Schema,好比叫KafkaWithTsMsgSchema,将String#Long理会为一个Java的Tuple2
- import org.apache.flink.api.common.serialization.DeserializationSchema;
- import org.apache.flink.api.common.serialization.SerializationSchema;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.api.java.typeutils.TupleTypeInfo;
- import org.apache.flink.util.Preconditions;
-
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.nio.charset.Charset;
-
- public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
- private static final long serialVersionUID = 1L;
- private transient Charset charset;
-
- public KafkaWithTsMsgSchema() {
- this(Charset.forName("UTF-8"));
- }
-
- public KafkaWithTsMsgSchema(Charset charset) {
- this.charset = Preconditions.checkNotNull(charset);
- }
-
- public Charset getCharset() {
- return this.charset;
- }
-
- public Tuple2<String, Long> deserialize(byte[] message) {
- String msg = new String(message, charset);
- String[] dataAndTs = msg.split("#");
- if(dataAndTs.length == 2){
- return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
- }else{
- // 现实出产上必要抛出runtime非常
- System.out.println("Fail due to invalid msg format.. ["+msg+"]");
- return new Tuple2<String, Long>(msg, 0L);
- }
- }
-
- @Override
- public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
- return false;
- }
-
- public byte[] serialize(Tuple2<String, Long> element) {
- return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- out.writeUTF(this.charset.name());
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- String charsetName = in.readUTF();
- this.charset = Charset.forName(charsetName);
- }
-
- @Override
- public TypeInformation<Tuple2<String, Long>> getProducedType() {
- return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
- }}
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|