可以看到代码里起首会执行job天生代码
- graph.generateJobs(time)
-
- 详细代码块儿
-
- def generateJobs(time: Time): Seq[Job] = {
- logDebug("Generating jobs for time " + time)
- val jobs = this.synchronized {
- outputStreams.flatMap { outputStream =>
- val jobOption = outputStream.generateJob(time)
- jobOption.foreach(_.setCallSite(outputStream.creationSite))
- jobOption
- }
- }
- logDebug("Generated " + jobs.length + " jobs for time " + time)
- jobs
- }
每个输出流城市天生一个job,输出流就相同于foreachrdd,print这些。着实内部都是ForEachDStream。以是天生的是一个job荟萃。
然后就会将job荟萃提交到线程池里去执行,这些都是在driver端完成的哦。
- jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
-
- 详细h函数内容
- def submitJobSet(jobSet: JobSet) {
- if (jobSet.jobs.isEmpty) {
- logInfo("No jobs added for time " + jobSet.time)
- } else {
- listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
- jobSets.put(jobSet.time, jobSet)
- jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
- logInfo("Added jobs for time " + jobSet.time)
- }
- }
着实就是遍历天生的job荟萃,然后提交到线程池jobExecutor内部执行。这个也是在driver端的哦。
jobExecutor就是一个牢靠线程数的线程池,默认是1个线程。
- private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
- private val jobExecutor =
- ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
必要的话可以设置spark.streaming.concurrentJobs来同时提交执行多个job。
那么这种环境下,job就可以并行执行了吗?
显然不是的!
还要修改一下调治模式为Fair,具体的设置可以参考:
http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application
简朴的均分的话只必要
- conf.set("spark.scheduler.mode", "FAIR")
然后,同时运行的job就会均分全部executor提供的资源。
这就是整个job天生的整个进程了哦。
由于Spark Streaming的使命存在Fair模式下并发的环境,以是必要在行使单例模式天生broadcast的时辰要留意声明同步。 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|