挪用的是ClosureCleaner.clean要领,该要领里是这么挪用学序列化的:
- try {
- if (SparkEnv.get != null) {
- SparkEnv.get.closureSerializer.newInstance().serialize(func)
- }
- } catch {
- case ex: Exception => throw new SparkException("Task not serializable", ex)
- }
SparkEnv是在SparkContext初始化的时辰建设的,该工具内里包括了closureSerializer,该工具通过new JavaSerializer建设。既然序列化太慢,又由于我们着实是在Local模式下,自己是可以不必要序列化的,以是我们这里想步伐把closureSerializer的实现替代掉。正如我们前面吐槽,由于在Spark代码里写死了,没有袒露任何自界说的也许性,以是我们又要魔改一下了。
起首,我们新建一个SparkEnv的子类:
- class WowSparkEnv(
- ....) extends SparkEnv(
接实在现一个自界说的Serializer:
- class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance {
-
- private def isClosure(cls: Class[_]): Boolean = {
- cls.getName.contains("$anonfun$")
- }
-
- override def serialize[T: ClassTag](t: T): ByteBuffer = {
- if (isClosure(t.getClass)) {
- val uuid = UUID.randomUUID().toString
- LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef])
- ByteBuffer.wrap(uuid.getBytes())
- } else {
- javaD.serialize(t)
- }
-
- }
-
- override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
- val s = StandardCharsets.UTF_8.decode(bytes).toString()
- if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
- LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
- } else {
- bytes.flip()
- javaD.deserialize(bytes)
- }
-
- }
-
- override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
- val s = StandardCharsets.UTF_8.decode(bytes).toString()
- if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
- LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
- } else {
- bytes.flip()
- javaD.deserialize(bytes, loader)
- }
- }
-
- override def serializeStream(s: OutputStream): SerializationStream = {
- javaD.serializeStream(s)
- }
-
- override def deserializeStream(s: InputStream): DeserializationStream = {
- javaD.deserializeStream(s)
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|