加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - State

发布时间:2018-10-16 19:39:41 所属栏目:教程 来源:孙金城
导读:【51CTO技能沙龙】10月27日,让我们配合试探AI场景化应用实现之道 现实题目 在流计较场景中,数据会绵绵不断的流入Apache Flink体系,每条数据进入Apache Flink体系城市触发计较。假如我们想举办一个Count聚合计较,那么每次触发计较是将汗青上全部流入的数

如上实现我们相识到分派Key到指定的key-group的逻辑是操作key的hashCode和maxParallelism举办取余操纵来分派的。如下图当parallelism=2,maxParallelism=10的环境下贱上key与key-group的对应相关如下图所示:

Apache Flink 漫谈系列 - State

如上图key(a)的hashCode是97,与最大并发10取余后是7,被分派到了KG-7中,流上每个event城市分派到KG-0至KG-9个中一个Key-Group中。

每个Operator实譬喻何获取Key-Groups

相识了Key-Groups观念和怎样分派每个Key到指定的Key-Groups之后,我们看看怎样计较每个Operator实例所处理赏罚的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex要领描写了分派算法:

  1. public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( 
  2.       int maxParallelism, 
  3.       int parallelism, 
  4.       int operatorIndex) { 
  5.         GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex); 
  6.         int startGroup = splitRange.getStartGroup(); 
  7.         int endGroup = splitRange.getEndGroup(); 
  8.    return new KeyGroupRange(startGroup, endGroup - 1); 
  9.  
  10. public GroupRange getSplitRange(int numSplits, int splitIndex) { 
  11.         ... 
  12.         final int numGroupsPerSplit = getNumGroups() / numSplits; 
  13.         final int numFatSplits = getNumGroups() % numSplits; 
  14.  
  15.         int startGroupForThisSplit; 
  16.         int endGroupForThisSplit; 
  17.         if (splitIndex < numFatSplits) { 
  18.             startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1); 
  19.             endGroupForThisSplit =   startGroupForThisSplit + numGroupsPerSplit + 1; 
  20.         } else { 
  21.             startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits; 
  22.             endGroupForThisSplit =  startGroupForThisSplit + numGroupsPerSplit; 
  23.         } 
  24.         if (startGroupForThisSplit >= endGroupForThisSplit) { 
  25.                 return GroupRange.emptyGroupRange(); 
  26.         } else { 
  27.                 return new GroupRange(startGroupForThisSplit, endGroupForThisSplit); 
  28.         }} 

上面代码的焦点逻辑是先计较每个Operator实例至少分派的Key-Group个数,将不能整除的部门N个,均匀分给前N个实例。最终每个Operator实例打点的Key-Groups会在GroupRange中暗示,本质是一个区间值;下面我们就上图的case,声名一下怎样举办分派以及扩容后怎样从头分派。

假设上面的Stateful Operation节点的最大并行度maxParallelism的值是10,也就是我们一共有10个Key-Group,当我们并发是2的时辰和并发是3的时辰分派的环境如下图:

Apache Flink 漫谈系列 - State

如上算法我们发此刻举办扩容时辰,大部门state照旧落到当地的,如Task0只有KG-4被分出去,其他的照旧保持在当地。同时我们也发明,一个job假如修改了maxParallelism的值那么会直接影响到Key-Groups的数目和key的分派,也会打乱全部的Key-Group的分派,今朝在Apache Flink体系中同一将maxParallelism的默认值调解到4096,最洪流平的停止无法扩容的环境产生。

小结

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读