副问题[/!--empirenews.page--]
技能沙龙 | 邀您于8月25日与国美/AWS/转转三位专家配合切磋小措施电商拭魅战
媒介
这两年做 streamingpro 时,不行停止的必要对Spark做大量的加强。就犹如我之前吐槽的,Spark大量行使了new举办工具的建设,导致内里的实现根基没有步伐举办替代。

好比SparkEnv里有个属性叫closureSerializer,是专门做使命的序列化反序列化的,虽然也认真对函数闭包的序列化反序列化。我们看看内部是怎么实现的:
- val serializer = instantiateClassFromConf[Serializer](
- "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
- logDebug(s"Using serializer: ${serializer.getClass}")
-
- val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
-
- val closureSerializer = new JavaSerializer(conf)
-
- val envInstance = new SparkEnv(
- .....
- closureSerializer, ....
这里直接new了一个JavaSerializer,并不能做设置。假如不改源码,你没有任何步伐可以替代掉掉这个实现。同理,假如我想替代掉Executor的实现,根基也是不行能的。
本年有两个大处所涉及到了对Spark的【魔改】,也就是不通过改源码,行使原有发型包,通过添加新代码的方法来对Spark举办加强。
二层RPC的支持
我们知道,在Spark里,我们只能通过Task才气touch到Executor。现有的API你是没步伐直接操纵到全部可能指定部门的Executor。好比,我但愿全部Executor都加载一个资源文件,此刻是没步伐做到的。为了可以或许对Executor举办直接的操纵,那就必要成立一个新的通信层。那详细怎么做呢?
起首,在Driver端成立一个Backend,这个较量简朴,
- class PSDriverBackend(sc: SparkContext) extends Logging {
-
- val conf = sc.conf
- var psDriverRpcEndpointRef: RpcEndpointRef = null
-
- def createRpcEnv = {
- val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER
- val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS)
- val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS)
- var port = sc.conf.getOption("spark.ps.driver.port").getOrElse("7777").toInt
- val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) {
- Some(CryptoStreamUtils.createKey(sc.conf))
- } else {
- None
- }
- logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
- var createSucess = false
- var count = 0
- val env = new AtomicReference[RpcEnv]()
- while (!createSucess && count < 10) {
- try {
- env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf,
- sc.env.securityManager, clientMode = !isDriver))
- createSucess = true
- } catch {
- case e: Exception =>
- logInfo("fail to create rpcenv", e)
- count += 1
- port += 1
- }
- }
- if (env.get() == null) {
- logError(s"fail to create rpcenv finally with attemp ${count} ")
- }
- env.get()
- }
-
- def start() = {
- val env = createRpcEnv
- val pSDriverBackend = new PSDriverEndpoint(sc, env)
- psDriverRpcEndpointRef = env.setupEndpoint("ps-driver-endpoint", pSDriverBackend)
- }
-
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|