加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - State

发布时间:2018-10-16 19:39:41 所属栏目:教程 来源:孙金城
导读:【51CTO技能沙龙】10月27日,让我们配合试探AI场景化应用实现之道 现实题目 在流计较场景中,数据会绵绵不断的流入Apache Flink体系,每条数据进入Apache Flink体系城市触发计较。假如我们想举办一个Count聚合计较,那么每次触发计较是将汗青上全部流入的数

我们发明 snapshotState要领的返回值是一个List,T是Tuple2

  1. public interface InputSplit extends Serializable { 
  2.     int getSplitNumber(); 

也就是说,InputSplit我们可以领略为是一个Partition索引,有了这个数据布局我们在看看上面图所示的case是怎样事变的?当Source的并行度是1的时辰,全部打partition数据都在统一个线程中读取,全部partition的state也在统一个state中维护,State存储信息名目如下:

Apache Flink 漫谈系列 - State

假如我们此刻将并发调解为2,那么我们5个分区的State将会在2个独立的使命(线程)中举办维护,在内部实现中我们有如下算法举办分派每个Task所处理赏罚和维护partition的State信息,如下:

  1. List<Integer> assignedPartitions = new LinkedList<>(); 
  2. for (int i = 0; i < partitions; i++) { 
  3.         if (i % consumerCount == consumerIndex) { 
  4.                 assignedPartitions.add(i); 
  5.         } 

这个求mod的算法,抉择了每个并发所处理赏罚和维护partition的State信息,针对我们当前的case详细的存储环境如下:

Apache Flink 漫谈系列 - State

那么到此刻我们发明上面扩容后State得以很好的分派得益于OperatorState回收了List的数据布局的计划。其它各人留意一个题目,信托各人已经发明上面分派partition的算法有一个限定,那就是Source的扩容(并发数)是否可以高出Source物理存储的partition数目呢?谜底是否认的,不能。今朝Apache Flink的做法是提前报错,纵然不报错也是资源的挥霍,由于高出partition数目的并发永久分派不到待打点的partition。

KeyedState对扩容的处理赏罚

对付KeyedState最轻易想到的是hash(key) mod parallelism(operator) 方法分派state,就和OperatorState一样,这种分派方法大大都环境是规复的state不是当地已有的state,必要一次收集拷贝,这种服从较量低,OperatorState回收这种简朴的方法举办处理赏罚是由于OperatorState的state一样平常都较量小,收集拉取的本钱很小,对付KeyedState每每很大,我们会有更好的选择,在Apache Flink中回收的是Key-Groups方法举办分派。

什么是Key-Groups

Key-Groups 是Apache Flink中对keyed state凭证key举办分组的方法,每个key-group中会包括N>0个key,一个key-group是State分派的原子单元。在Apache Flink中关于Key-Group的工具是 KeyGroupRange, 如下:

  1. public class KeyGroupRange implements KeyGroupsList, Serializable { 
  2.         ... 
  3.         ... 
  4.         private final int startKeyGroup; 
  5.         private final int endKeyGroup; 
  6.         ... 
  7.         ...} 

KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,界说了startKeyGroup和endKeyGroup属性后Operator上面的Key-Group的个数也就确定了。

什么抉择Key-Groups的个数

key-group的数目在job启动前必需是确定的且运行中不能改变。因为key-group是state分派的原子单元,而每个operator并行实例至少包括一个key-group,因此operator的最大并行度不能高出设定的key-group的个数,那么在Apache Flink的内部实现上key-group的数目就是最大并行度的值。

GroupRange.of(0, maxParallelism)怎样抉择key属于哪个Key-Group

确定好GroupRange之后,怎样抉择每个Key属于哪个Key-Group呢?我们采纳的是取mod的方法,在KeyGroupRangeAssignment中的assignToKeyGroup要了解将key分别到指定的key-group中,如下:

  1. public static int assignToKeyGroup(Object key, int maxParallelism) { 
  2.       return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  3.  
  4. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  5.       return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism); 
  6.  
  7. @Override 
  8. public int partition(T key, int numPartitions) { 
  9.       return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions; 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读