2016-12-06 10 views
2

サンプルデータで単純なSpark RDD変換mapPartition()を実行しようとしています。しかし、その過程で、私はjava.io.NotSerializableException: org.apache.spark.InterruptibleIterator の例外を取得しています。ここでjava.io.NotSerializableException:org.apache.spark.InterruptibleIterator spark javaでmapPartition()を実行しています

は私の例外である:ここでは

java.io.NotSerializableException: org.apache.spark.InterruptibleIterator 
Serialization stack: 
    - object not serializable (class: org.apache.spark.InterruptibleIterator, value: non-empty iterator) 
    - field (class: scala.collection.convert.Wrappers$IteratorWrapper, name: underlying, type: interface scala.collection.Iterator) 
    - object (class scala.collection.convert.Wrappers$IteratorWrapper, IteratorWrapper(non-empty iterator)) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 2) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:265) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/12/06 19:36:24 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.spark.InterruptibleIterator 
Serialization stack: 
    - object not serializable (class: org.apache.spark.InterruptibleIterator, value: non-empty iterator) 
    - field (class: scala.collection.convert.Wrappers$IteratorWrapper, name: underlying, type: interface scala.collection.Iterator) 
    - object (class scala.collection.convert.Wrappers$IteratorWrapper, IteratorWrapper(non-empty iterator)) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 2); not retrying 
16/12/06 19:36:24 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/06 19:36:24 INFO TaskSchedulerImpl: Cancelling stage 0 
16/12/06 19:36:24 INFO DAGScheduler: ResultStage 0 (collect at MapPartition.java:18) failed in 0.168 s 
16/12/06 19:36:24 INFO DAGScheduler: Job 0 failed: collect at MapPartition.java:18, took 0.529927 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.spark.InterruptibleIterator 
Serialization stack: 
    - object not serializable (class: org.apache.spark.InterruptibleIterator, value: non-empty iterator) 
    - field (class: scala.collection.convert.Wrappers$IteratorWrapper, name: underlying, type: interface scala.collection.Iterator) 
    - object (class scala.collection.convert.Wrappers$IteratorWrapper, IteratorWrapper(non-empty iterator)) 
    - element of array (index: 0) 
    - array (class [Ljava.lang.Object;, size 2) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46) 
    at in.inndata.sparkbasics.MapPartition.main(MapPartition.java:18) 

は私のコードです:

SparkConf conf = new SparkConf().setAppName("MapPartition").setMaster("local"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     List<Integer> list = new ArrayList<Integer>(Arrays.asList(10,20,30,40,50,60,70,80,90,100)); 

     JavaRDD<Integer> lines = sc.parallelize(list,1); 
     JavaRDD<Object> mappartitions = lines.mapPartitions(f -> Arrays.asList(f,f)); 
     System.out.println(mappartitions.collect()); 

私は

IteratorWrapper(non-empty iterator) 

答えて

2

問題になっmappartitions.foreach(f -> System.out.println(f));を実行した場合:ここでは

JavaRDD<Object> mappartitions = lines.mapPartitions(f -> Arrays.asList(f,f)); 

2つの要素のリストを作成していますが、どちらも同じイテレータです。イテレータは直列化不可能で送信できません。そのため、このエラーが発生します。

回避策: 使用、その後、イテレータ(反復子の要素を含むリスト)からリストを作成したい場合:

種類をチェックする可能性なしに書か
lines.mapPartitions(f -> { 
List<String> list = new LinkedList<String>(); 

while(iter.hasNext()) { 
    list.add(iter.next()); 
} 
return Arrays.asList(list).iterator(); // this will create iterator with only one element - our list of all elements in partition 
}); 

// I場合は、お知らせください間違いをした

関連する問題