起首,一个根基观念就是Spark应用措施从开始提交到task执行分了许多层。
- 应用调治器。首要是资源打点器,好比standalone,yarn等认真Spark整个应用的调治和集群资源的打点。
- job调治器。spark 的算子分为首要两大类,transform和action,个中每一个action城市发生一个job。这个job必要在executor提供的资源池里调治执行,虽然并不少直接调治执行job。
- stage分别及调治。job详细会分别为多少stage,这个就有一个根基的观念就是宽依靠和窄依靠,宽依靠就会分别stage。stage也必要调治执行,从后往前分别,以前去后调治执行。
- task切割及调治。stage往下继承细化就是会按照不太的并行度分别出task荟萃,这个就是在executor上调治执行的根基单位,今朝的调治默认是一个task一个cpu。
- Spark Streaming 的job天生是周期性的。当前job的执行时刻高出天生周期就会发生job 累加。累加必然数量标job后有也许会导致应用措施失败。这个首要缘故起因是因为FIFO的调治模式和Spark Streaming的默认单线程的job执行机制
3.Spark Streaming job天生
这个源码首要进口是StreamingContext#JobScheduler#JobGenerator工具,内部有个RecurringTimer,首要认真凭证批处理赏罚时刻周期发生GenrateJobs变乱,虽然在存在windows的环境下,该周期有也许不会天生job,要取决于滑动隔断,有乐趣本身去揭秘,浪尖星球里分享的视频教程里讲到了。详细代码块如下
- private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
我们直接看着实当代码块:
- eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
- override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
-
- override protected def onError(e: Throwable): Unit = {
- jobScheduler.reportError("Error in job generator", e)
- }
- }
- eventLoop.start()
event处理赏罚函数是processEvent要领
- /** Processes all events */
- private def processEvent(event: JobGeneratorEvent) {
- logDebug("Got event " + event)
- event match {
- case GenerateJobs(time) => generateJobs(time)
- case ClearMetadata(time) => clearMetadata(time)
- case DoCheckpoint(time, clearCheckpointDataLater) =>
- doCheckpoint(time, clearCheckpointDataLater)
- case ClearCheckpointData(time) => clearCheckpointData(time)
- }
- }
在接管到GenerateJob变乱的时辰,会执行generateJobs代码,就是在该代码内部发生和调治job的。
- /** Generate jobs and perform checkpointing for the given `time`. */
- private def generateJobs(time: Time) {
- // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
- // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
- ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
- Try {
- jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
- graph.generateJobs(time) // generate jobs using allocated block
- } match {
- case Success(jobs) =>
- val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
- case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
- }
- eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|