加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

发布时间:2019-01-19 12:08:17 所属栏目:教程 来源:孙金城
导读:一、聊什么 为了满意本系列读者的需求,我先先容一下Kafka在Apache Flink中的行使。以是本篇以一个简朴的示例,向各人先容在Apache Flink中怎样行使Kafka。 二、Kafka 简介 Apache Kafka是一个漫衍式宣布-订阅动静转达体系。 它最初由LinkedIn公司开拓,Li
副问题[/!--empirenews.page--]

一、聊什么

为了满意本系列读者的需求,我先先容一下Kafka在Apache Flink中的行使。以是本篇以一个简朴的示例,向各人先容在Apache Flink中怎样行使Kafka。

二、Kafka 简介

Apache Kafka是一个漫衍式宣布-订阅动静转达体系。 它最初由LinkedIn公司开拓,LinkedIn于2010年孝顺给了Apache基金会并成为顶级开源项目。Kafka用于构建及时数据管道和流式应用措施。它具有程度扩展性、容错性、极快的速率,今朝也获得了普及的应用。

Kafka不单是漫衍式动静体系并且也支持流式计较,以是在先容Kafka在Apache Flink中的应用之前,先以一个Kafka的简朴示例直观相识什么是Kafka。

1. 安装

本篇不是体系的,细致的先容Kafka,而是想让各人直观熟悉Kafka,以便在Apahe Flink中举办很好的应用,以是我们以最简朴的方法安装Kafka。

(1) 下载二进制包:

  1. curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz 

(2) 解压安装

Kafka安装只必要将下载的tgz解压即可,如下:

  1. jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
  2. jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0 
  3. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls 
  4. LICENSE        NOTICE        bin        config        libs        site-docs 

个中bin包括了全部Kafka的打点呼吁,如接下来我们要启动的Kafka的Server。

(3) 启动Kafka Server

Kafka是一个宣布订阅体系,动静订阅起主要有个处事存在。我们启动一个Kafka Server 实例。 Kafka必要行使ZooKeeper,要举办投产陈设我们必要安装ZooKeeper集群,这不在本篇的先容范畴内,以是我们操作Kafka提供的剧本,安装一个只有一个节点的ZooKeeper实例。如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties & 
  2.  
  3. [2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) 
  4. .... 
  5. .... 
  6. [2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) 

启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties 
  2. [2019-01-13 09:09:16,937] INFO Registered kafkakafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) 
  3. [2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer) 
  4. [2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) 
  5. [2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) 
  6. ... 
  7. ... 
  8. [2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) 

假如上面统统顺遂,Kafka的安装就完成了。

2. 建设Topic

Kafka是动静订阅体系,起首建设可以被订阅的Topic,我们建设一个名为flink-tipic的Topic,在一个新的terminal中,执行如下呼吁:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic 
  2.  
  3. Created topic "flink-tipic". 

在Kafka Server的terminal中也会输出如下乐成建设信息:

  1. ... 
  2. [2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)... 

上面表现了flink-topic的根基属性设置,如动静压缩方法,动静名目,备份数目等等。

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读