加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

为啥Spark 的Broadcast要用单例模式

发布时间:2019-06-11 16:54:59 所属栏目:教程 来源:浪院长
导读:许多用Spark Streaming 的伴侣应该行使过broadcast,大大都环境下广播变量都是以单例模式声明的有没有粉丝想过为什么?浪尖在这里帮各人说明一下,有以下几个缘故起因: 广播变量大大都环境下是不会改观的,行使单例模式可以镌汰spark streaming每次job天生执行

起首,一个根基观念就是Spark应用措施从开始提交到task执行分了许多层。

  1. 应用调治器。首要是资源打点器,好比standalone,yarn等认真Spark整个应用的调治和集群资源的打点。
  2. job调治器。spark 的算子分为首要两大类,transform和action,个中每一个action城市发生一个job。这个job必要在executor提供的资源池里调治执行,虽然并不少直接调治执行job。
  3. stage分别及调治。job详细会分别为多少stage,这个就有一个根基的观念就是宽依靠和窄依靠,宽依靠就会分别stage。stage也必要调治执行,从后往前分别,以前去后调治执行。
  4. task切割及调治。stage往下继承细化就是会按照不太的并行度分别出task荟萃,这个就是在executor上调治执行的根基单位,今朝的调治默认是一个task一个cpu。
  5. Spark Streaming 的job天生是周期性的。当前job的执行时刻高出天生周期就会发生job 累加。累加必然数量标job后有也许会导致应用措施失败。这个首要缘故起因是因为FIFO的调治模式和Spark Streaming的默认单线程的job执行机制

3.Spark Streaming job天生

这个源码首要进口是StreamingContext#JobScheduler#JobGenerator工具,内部有个RecurringTimer,首要认真凭证批处理赏罚时刻周期发生GenrateJobs变乱,虽然在存在windows的环境下,该周期有也许不会天生job,要取决于滑动隔断,有乐趣本身去揭秘,浪尖星球里分享的视频教程里讲到了。详细代码块如下

  1. private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
  2.    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 

我们直接看着实当代码块:

  1. eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
  2.       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
  3.  
  4.       override protected def onError(e: Throwable): Unit = { 
  5.         jobScheduler.reportError("Error in job generator", e) 
  6.       } 
  7.     } 
  8.     eventLoop.start() 

event处理赏罚函数是processEvent要领

  1. /** Processes all events */ 
  2.   private def processEvent(event: JobGeneratorEvent) { 
  3.     logDebug("Got event " + event) 
  4.     event match { 
  5.       case GenerateJobs(time) => generateJobs(time) 
  6.       case ClearMetadata(time) => clearMetadata(time) 
  7.       case DoCheckpoint(time, clearCheckpointDataLater) => 
  8.         doCheckpoint(time, clearCheckpointDataLater) 
  9.       case ClearCheckpointData(time) => clearCheckpointData(time) 
  10.     } 
  11.   } 

在接管到GenerateJob变乱的时辰,会执行generateJobs代码,就是在该代码内部发生和调治job的。

  1. /** Generate jobs and perform checkpointing for the given `time`.  */ 
  2.   private def generateJobs(time: Time) { 
  3.     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
  4.     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
  5.     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 
  6.     Try { 
  7.       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
  8.       graph.generateJobs(time) // generate jobs using allocated block 
  9.     } match { 
  10.       case Success(jobs) => 
  11.         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
  12.         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
  13.       case Failure(e) => 
  14.         jobScheduler.reportError("Error generating jobs for time " + time, e) 
  15.         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
  16.     } 
  17.     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  18.   } 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读