加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

发布时间:2019-01-19 12:08:17 所属栏目:教程 来源:孙金城
导读:一、聊什么 为了满意本系列读者的需求,我先先容一下Kafka在Apache Flink中的行使。以是本篇以一个简朴的示例,向各人先容在Apache Flink中怎样行使Kafka。 二、Kafka 简介 Apache Kafka是一个漫衍式宣布-订阅动静转达体系。 它最初由LinkedIn公司开拓,Li

除了看日记,我们可以用呼吁表现的查询我们是否乐成的建设了flink-topic,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181 
  2.  
  3. flink-tipic 

假如输出flink-tipic那么声名我们的Topic乐成建设了。

那么Topic是生涯在那边?Kafka是奈何进动作静的宣布和订阅的呢?为了直观,我们看如下Kafka架构表示图简朴领略一下:

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

简朴先容一下,Kafka操作ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们建设的Topic可以在一个或多个Broker中。Kafka操作Push模式发送动静,操作Pull方法拉打动静。

3. 发送动静

怎样向已经存在的Topic中发送动静呢,虽然我们可以API的方法编写代码发送动静。同时,还可以操作呼吁方法来便捷的发送动静,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic 
  2. >Kafka test msg 
  3. >Kafka connector 

上面我们发送了两条动静Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

4. 读打动静

假如读取指定Topic的动静呢?同样可以API和呼吁两种方法都可以完成,我们以呼吁方法读取flink-topic的动静,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning 
  2. Kafka test msg 
  3. Kafka connector 

个中--from-beginning 描写了我们从Topic开始位置读打动静。

三、Flink Kafka Connector

前面我们以最简朴的方法安装了Kafka情形,那么我们以上面的情形先容Flink Kafka Connector的行使。Flink Connector相干的基本常识会在《Apache Flink 漫谈系列(14) - Connectors》中先容,这里我们直接先容与Kafka Connector相干的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例举办先容。

1. mvn 依靠

要行使Kakfa Connector必要在我们的pom中增进对Kafka Connector的依靠,如下:

  1. <dependency> 
  2. <groupId>org.apache.flink</groupId> 
  3. <artifactId>flink-connector-kafka_2.11</artifactId> 
  4. <version>1.7.0</version> 
  5. </dependency> 

Flink Kafka Consumer必要知道怎样将Kafka中的二进制数据转换为Java / Scala工具。 DeserializationSchema应承用户指定这样的模式。 为每个Kafka动静挪用 T deserialize(byte [] message)要领,从Kafka转达值。

2. Examples

我们示例读取Kafka的数据,再将数据做简朴处理赏罚之后写入到Kafka中。我们必要再建设一个用于写入的Topic,如下:

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output 

以是示例中我们Source操作flink-topic, Sink用slink-topic-output。

(1) Simple ETL

我们假设Kafka中存储的就是一个简朴的字符串,以是我们必要一个用于对字符串举办serialize和deserialize的实现,也就是我们要界说一个实现DeserializationSchema和SerializationSchema 的序列化和反序列化的类。由于我们示例中是字符串,以是我们自界说一个KafkaMsgSchema实现类,然后在编写Flink主措施。

  • KafkaMsgSchema - 完备代码
    1. import org.apache.flink.api.common.serialization.DeserializationSchema; 
    2. import org.apache.flink.api.common.serialization.SerializationSchema; 
    3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
    4. import org.apache.flink.api.common.typeinfo.TypeInformation; 
    5. import org.apache.flink.util.Preconditions; 
    6.  
    7. import java.io.IOException; 
    8. import java.io.ObjectInputStream; 
    9. import java.io.ObjectOutputStream; 
    10. import java.nio.charset.Charset; 
    11.  
    12. public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> { 
    13.     private static final long serialVersionUID = 1L; 
    14.     private transient Charset charset; 
    15.  
    16.     public KafkaMsgSchema() { 
    17. // 默认UTF-8编码 
    18.         this(Charset.forName("UTF-8")); 
    19.     } 
    20.  
    21.     public KafkaMsgSchema(Charset charset) { 
    22.         this.charset = Preconditions.checkNotNull(charset); 
    23.     } 
    24.  
    25.     public Charset getCharset() { 
    26.         return this.charset; 
    27.     } 
    28.  
    29.     public String deserialize(byte[] message) { 
    30. // 将Kafka的动静反序列化为java工具 
    31.         return new String(message, charset); 
    32.     } 
    33.  
    34.     public boolean isEndOfStream(String nextElement) { 
    35. // 流永久不竣事 
    36.         return false; 
    37.     } 
    38.  
    39.     public byte[] serialize(String element) { 
    40. // 将java工具序列化为Kafka的动静 
    41.         return element.getBytes(this.charset); 
    42.     } 
    43.  
    44.     public TypeInformation<String> getProducedType() { 
    45. // 界说发生的数据Typeinfo 
    46.         return BasicTypeInfo.STRING_TYPE_INFO; 
    47.     } 
    48.  
    49.     private void writeObject(ObjectOutputStream out) throws IOException { 
    50.         out.defaultWriteObject(); 
    51.         out.writeUTF(this.charset.name()); 
    52.     } 
    53.  
    54.     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
    55.         in.defaultReadObject(); 
    56.         String charsetName = in.readUTF(); 
    57.         this.charset = Charset.forName(charsetName); 
    58.     } 
  • 主措施 - 完备代码
    1. import org.apache.flink.api.common.functions.MapFunction; 
    2. import org.apache.flink.api.java.utils.ParameterTool; 
    3. import org.apache.flink.streaming.api.datastream.DataStream; 
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
    6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
    7. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
    8.  
    9. import java.util.Properties; 
    10.  
    11. public class KafkaExample { 
    12.     public static void main(String[] args) throws Exception { 
    13.         // 用户参数获取 
    14.         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
    15.         // Stream 情形 
    16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    17.  
    18.         // Source的topic 
    19.         String sourceTopic = "flink-topic"; 
    20.         // Sink的topic 
    21.         String sinkTopic = "flink-topic-output"; 
    22.         // broker 地点 
    23.         String broker = "localhost:9092"; 
    24.  
    25.         // 属性参数 - 现实投产可以在呼吁行传入 
    26.         Properties p = parameterTool.getProperties(); 
    27.         p.putAll(parameterTool.getProperties()); 
    28.         p.put("bootstrap.servers", broker); 
    29.  
    30.         env.getConfig().setGlobalJobParameters(parameterTool); 
    31.  
    32.         // 建设斲丧者 
    33.         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( 
    34.                 sourceTopic, 
    35.                 new KafkaMsgSchema(), 
    36.                 p); 
    37.         // 配置读取最早的数据 
    38. //        consumer.setStartFromEarliest(); 
    39.  
    40.         // 读取Kafka动静 
    41.         DataStream<String> input = env.addSource(consumer); 
    42.  
    43.  
    44.         // 数据处理赏罚 
    45.         DataStream<String> result = input.map(new MapFunction<String, String>() { 
    46.             public String map(String s) throws Exception { 
    47.                 String msg = "Flink study ".concat(s); 
    48.                 System.out.println(msg); 
    49.                 return msg; 
    50.             } 
    51.         }); 
    52.  
    53.         // 建设出产者 
    54.         FlinkKafkaProducer producer = new FlinkKafkaProducer<String>( 
    55.                 sinkTopic, 
    56.                 new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()), 
    57.                 p, 
    58.                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
    59.  
    60.         // 将数据写入Kafka指定Topic中 
    61.         result.addSink(producer); 
    62.  
    63.         // 执行job 
    64.         env.execute("Kafka Example"); 
    65.     } 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读