开发者社区> 问答> 正文

使用Spark collectionAccumulator时出现ConcurrentModificationException

我正在尝试在Azure HDInsight按需群集上运行基于Spark的应用程序,并且看到许多SparkExceptions(由ConcurrentModificationExceptions引起)被记录。启动本地Spark实例时,应用程序运行时没有这些错误。

我在使用累加器时看到了类似错误的报告,而我的代码确实使用了CollectionAccumulator,但是我已经在我使用它的地方放置了同步块,并且它没有任何区别。与累加器相关的代码如下所示:

class MySparkClass(sc : SparkContext) {

val myAccumulator = sc.collectionAccumulator[MyRecord]

override def add(record: MyRecord) = {
    synchronized {
        myAccumulator.add(record)
    }
}

override def endOfBatch() = {
    synchronized {
        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            processIt(record)
        })
    }
}

}
异常不会导致应用程序失败,但是当endOfBatch调用并且代码尝试从累加器中读取值时,它是空的并且processIt永远不会被调用。

我们使用HDInsight版本3.6和Spark版本2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:

at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor

$$ reportHeartBeat(Executor.scala:785) at org.apache.spark.executor.Executor $$

anon$2

$$ anonfun$run$1.apply$mcV$sp(Executor.scala:814) at org.apache.spark.executor.Executor $$

anon$2

$$ anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.executor.Executor $$

anon$2

$$ anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988) at org.apache.spark.executor.Executor $$

anon$2.run(Executor.scala:814)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.ConcurrentModificationException

at java.util.ArrayList.writeObject(ArrayList.java:770)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
... 13 more

展开
收起
社区小助手 2018-12-06 13:57:38 4820 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    是否同步块确实有帮助。CustomeAccumulators或所有其他累加器不是线程安全的。它们实际上并不是必须的,因为在任务完成(成功或失败)之后,spark驱动程序用于更新累加器值的DAGScheduler.updateAccumulators方法仅在运行调度循环的单个线程上执行。除此之外,它们是具有自己的本地累加器引用的工作者的只写数据结构,而只有驱动程序才允许访问累加器的值。当你说它在本地模式下工作,因为它是单个JVM但在集群模式下,它们是不同的JVM和java实例,正在触发PRC调用以启用通信。

    你的MyRecord对象是如何形状的,如果你只是用.value结束你的行,而不是在它上面有一个迭代器就会有所帮助。

    2019-07-17 23:18:31
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载