接着我们必要再封装一个LocalNonOpSerializer,
- class LocalNonOpSerializer(conf: SparkConf) extends Serializer with Externalizable {
- val javaS = new JavaSerializer(conf)
-
- override def newInstance(): SerializerInstance = {
- new LocalNonOpSerializerInstance(javaS.newInstance())
- }
-
- override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
- javaS.writeExternal(out)
- }
-
- override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
- javaS.readExternal(in)
- }
- }
此刻,万事俱备,只欠春风了,我们怎么才气把这些代码让Spark运行起来。详细做法很是魔幻,实现一个enhance类:
- def enhanceSparkEnvForAPIService(session: SparkSession) = {
- val env = SparkEnv.get
- //建设一个新的WowSparkEnv工具,然后将内里的Serializer替代成我们本身的LocalNonOpSerializer
- val wowEnv = new WowSparkEnv(
- .....
- new LocalNonOpSerializer(env.conf): Serializer,
- ....)
- // 将SparkEnv object里的实例替代成我们的
- //WowSparkEnv
- SparkEnv.set(wowEnv)
- //可是许多处地址SparkContext启动后都已经在行使之前就已经天生的SparkEnv,我们必要做些调解
- //我们先把之前已经启动的LocalSchedulerBackend里的scheduer停掉
- val localScheduler = session.sparkContext.schedulerBackend.asInstanceOf[LocalSchedulerBackend]
-
- val scheduler = ReflectHelper.field(localScheduler, "scheduler")
-
- val totalCores = localScheduler.totalCores
- localScheduler.stop()
-
- //建设一个新的LocalSchedulerBackend
- val wowLocalSchedulerBackend = new WowLocalSchedulerBackend(session.sparkContext.getConf, scheduler.asInstanceOf[TaskSchedulerImpl], totalCores)
- wowLocalSchedulerBackend.start()
- //把SparkContext里的_schedulerBackend替代成我们的实现
- ReflectHelper.field(session.sparkContext, "_schedulerBackend", wowLocalSchedulerBackend)
- }
落成。
着实尚有许多
好比在Spark里,Python Worker默认一分钟没有被行使是会被杀死的,可是在StreamingPro里,这些python worker由于都要加载模子,以是启动本钱长短常高的,杀了之后再启动就没步伐忍受了,通过相同的方法举办魔改,从而使得空闲时刻是可设置的。假如各人感乐趣,可以翻看StreamingPro相干代码。 【编辑保举】 - 一文理清Apache Spark内存打点脉络
- 深度:Hadoop对Spark五大维度正面比拼陈诉!
- 呆板进修实践:怎样将Spark与Python团结?
- Spark机能优化:开拓调优篇
- 大数据处理赏罚引擎Spark与Flink大比拼
【责任编辑:未丽燕 TEL:(010)68476606】
点赞 0 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|