开发者社区> 问答> 正文

Spark Task不可序列化/没有为第三方Jar定义的类

我一直在谷歌或Stackoverflow上搜索一个星期,仍然无法找到一个好的答案。

我有一个化合物数据集,我需要使用第三方Jar来读取SDF中的这些化合物(类似JSON的数据格式)。然后我必须计算不同化合物之间的相似性。读取和计算需要非常复杂的化学细节,所以我不能自己重现这个功能。也就是说,我必须使用第三方Jar在Spark上的映射函数中运行该函数。Jar文件名为JCompoundMapper。它使用DFS算法迭代地读取原子键,看起来非常复杂。无论如何,这个线程不是关于阅读化合物。但是关于如何在Spark上映射第三方jar。当我尝试这样做时,我遇到的任务不是可序列化的问题:

展开
收起
社区小助手 2018-12-06 15:37:34 3165 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    mport de.zbit.jcmapper.distance.DistanceTanimoto
    import de.zbit.jcmapper.distance.IDistanceMeasure
    import de.zbit.jcmapper.fingerprinters.EncodingFingerprint
    import de.zbit.jcmapper.fingerprinters.features.FeatureMap
    import de.zbit.jcmapper.fingerprinters.features.IFeature
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DAllShortestPath
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DCATS
    import de.zbit.jcmapper.fingerprinters.topological.Encoding2DECFP
    import de.zbit.jcmapper.io.reader.RandomAccessMDLReader
    import de.zbit.jcmapper.io.writer.ExporterFullFingerprintCSV
    import de.zbit.jcmapper.io.writer.ExporterFullFingerprintTABUnfolded
    import de.zbit.jcmapper.io.writer.ExporterLinear
    import de.zbit.jcmapper.io.writer.ExporterSDFProperty
    import java.io.FileWriter
    import java.util.List
    import java.io.File

    val similarity: IDistanceMeasure = new DistanceTanimoto()
    val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
    val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
    val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
    def getSimilarity( id1:Int, id2:Int ) : Double = {

    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp
    

    val func = combinations.map(x => {

    getSimilarity(0, 1)
    }).take(5)
    

    Name: org.apache.spark.SparkException
    Message: Task not serializable
    StackTrace: at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.map(RDD.scala:370)
    ... 48 elided
    Caused by: java.io.NotSerializableException: de.zbit.jcmapper.io.reader.RandomAccessMDLReader`
    我读了其他线程并理解我必须将变量和函数放在一个对象中以使其可序列化。但是,如果我这样做,我遇到了空指针异常错误:

    object Holder{
    val reader:RandomAccessMDLReader = new RandomAccessMDLReader(new File("datasets/internal.sdf"))
    val similarity: IDistanceMeasure = new DistanceTanimoto()
    val fingerprinter: Encoding2DAllShortestPath = new Encoding2DAllShortestPath()
    val rawFeatures2: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(0))
    val rawFeatures: List[IFeature] = fingerprinter.getFingerprint(reader.getMol(1))
    def getSimilarity( id1:Int, id2:Int ) : Double = {

    val featureMaps: List[FeatureMap] = new ArrayList[FeatureMap]()
    featureMaps.add(new FeatureMap(rawFeatures))
    featureMaps.add(new FeatureMap(rawFeatures2))
    val temp: Double = similarity.getSimilarity(featureMaps.get(0), featureMaps.get(1))
    return temp

    }

    val func = combinations.map(x => {
    Holder.getSimilarity(0, 1)
    }).take(5)

    Name: org.apache.spark.SparkException
    Message: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-10-245-2-223.ec2.internal, executor 1): java.lang.NullPointerException

    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.setRanges(Unknown Source)
    at de.zbit.jcmapper.io.reader.RandomAccessMDLReader.<init>(Unknown Source)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Holder$.<init>(<console>:78)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder$lzycompute(<console>:77)
    at $line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.Holder(<console>:77)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:85)
    at $line57.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:84)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)`

    对于阅读部分,我可以使用巨大的LinkedHashMap并存储所有化合物。但是,我必须使用getSimilarity()函数来使用第三方jar计算相似度。因此,即使我只使用getSimilarity()函数,如果我把它放在一个对象中,我也有空指针异常。如果我把它放在对象之外,那我的任务就不是可序列化的问题。因此,我有几个问题,我无法找到一个好的答案:

    Spark是否支持将第三方Jar映射到每个执行程序?在读者文件中,Spark是否将读取器类分发到每个执行器中并单独读取文件或作为整体读取文件,然后将文件分发到每个执行器上的小块中?
    为什么它显示空指针异常问题?似乎该对象确实解决了序列化问题,但没有解决空指针异常。

    2019-07-17 23:18:35
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载

相关实验场景

更多