除了看日记,我们可以用呼吁表现的查询我们是否乐成的建设了flink-topic,如下:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181
-
- flink-tipic
假如输出flink-tipic那么声名我们的Topic乐成建设了。
那么Topic是生涯在那边?Kafka是奈何进动作静的宣布和订阅的呢?为了直观,我们看如下Kafka架构表示图简朴领略一下:

简朴先容一下,Kafka操作ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们建设的Topic可以在一个或多个Broker中。Kafka操作Push模式发送动静,操作Pull方法拉打动静。
3. 发送动静
怎样向已经存在的Topic中发送动静呢,虽然我们可以API的方法编写代码发送动静。同时,还可以操作呼吁方法来便捷的发送动静,如下:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
- >Kafka test msg
- >Kafka connector
上面我们发送了两条动静Kafka test msg 和 Kafka connector 到 flink-topic Topic中。
4. 读打动静
假如读取指定Topic的动静呢?同样可以API和呼吁两种方法都可以完成,我们以呼吁方法读取flink-topic的动静,如下:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
- Kafka test msg
- Kafka connector
个中--from-beginning 描写了我们从Topic开始位置读打动静。
三、Flink Kafka Connector
前面我们以最简朴的方法安装了Kafka情形,那么我们以上面的情形先容Flink Kafka Connector的行使。Flink Connector相干的基本常识会在《Apache Flink 漫谈系列(14) - Connectors》中先容,这里我们直接先容与Kafka Connector相干的内容。
Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例举办先容。
1. mvn 依靠
要行使Kakfa Connector必要在我们的pom中增进对Kafka Connector的依靠,如下:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.11</artifactId>
- <version>1.7.0</version>
- </dependency>
Flink Kafka Consumer必要知道怎样将Kafka中的二进制数据转换为Java / Scala工具。 DeserializationSchema应承用户指定这样的模式。 为每个Kafka动静挪用 T deserialize(byte [] message)要领,从Kafka转达值。
2. Examples
我们示例读取Kafka的数据,再将数据做简朴处理赏罚之后写入到Kafka中。我们必要再建设一个用于写入的Topic,如下:
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output
以是示例中我们Source操作flink-topic, Sink用slink-topic-output。
(1) Simple ETL
我们假设Kafka中存储的就是一个简朴的字符串,以是我们必要一个用于对字符串举办serialize和deserialize的实现,也就是我们要界说一个实现DeserializationSchema和SerializationSchema 的序列化和反序列化的类。由于我们示例中是字符串,以是我们自界说一个KafkaMsgSchema实现类,然后在编写Flink主措施。
- import org.apache.flink.api.common.serialization.DeserializationSchema;
- import org.apache.flink.api.common.serialization.SerializationSchema;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.util.Preconditions;
-
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.nio.charset.Charset;
-
- public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
- private static final long serialVersionUID = 1L;
- private transient Charset charset;
-
- public KafkaMsgSchema() {
- // 默认UTF-8编码
- this(Charset.forName("UTF-8"));
- }
-
- public KafkaMsgSchema(Charset charset) {
- this.charset = Preconditions.checkNotNull(charset);
- }
-
- public Charset getCharset() {
- return this.charset;
- }
-
- public String deserialize(byte[] message) {
- // 将Kafka的动静反序列化为java工具
- return new String(message, charset);
- }
-
- public boolean isEndOfStream(String nextElement) {
- // 流永久不竣事
- return false;
- }
-
- public byte[] serialize(String element) {
- // 将java工具序列化为Kafka的动静
- return element.getBytes(this.charset);
- }
-
- public TypeInformation<String> getProducedType() {
- // 界说发生的数据Typeinfo
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- out.writeUTF(this.charset.name());
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- String charsetName = in.readUTF();
- this.charset = Charset.forName(charsetName);
- }
- }
主措施 - 完备代码
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.utils.ParameterTool;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
- import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-
- import java.util.Properties;
-
- public class KafkaExample {
- public static void main(String[] args) throws Exception {
- // 用户参数获取
- final ParameterTool parameterTool = ParameterTool.fromArgs(args);
- // Stream 情形
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // Source的topic
- String sourceTopic = "flink-topic";
- // Sink的topic
- String sinkTopic = "flink-topic-output";
- // broker 地点
- String broker = "localhost:9092";
-
- // 属性参数 - 现实投产可以在呼吁行传入
- Properties p = parameterTool.getProperties();
- p.putAll(parameterTool.getProperties());
- p.put("bootstrap.servers", broker);
-
- env.getConfig().setGlobalJobParameters(parameterTool);
-
- // 建设斲丧者
- FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
- sourceTopic,
- new KafkaMsgSchema(),
- p);
- // 配置读取最早的数据
- // consumer.setStartFromEarliest();
-
- // 读取Kafka动静
- DataStream<String> input = env.addSource(consumer);
-
-
- // 数据处理赏罚
- DataStream<String> result = input.map(new MapFunction<String, String>() {
- public String map(String s) throws Exception {
- String msg = "Flink study ".concat(s);
- System.out.println(msg);
- return msg;
- }
- });
-
- // 建设出产者
- FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
- sinkTopic,
- new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
- p,
- FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-
- // 将数据写入Kafka指定Topic中
- result.addSink(producer);
-
- // 执行job
- env.execute("Kafka Example");
- }
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|