2013-10-17 7 views
5

私は、各ユーザーのメッセージを数え、トップ10を印刷するメッセージのリスト(JSON形式)を受け取る単純なストリーム処理Sparkジョブを作成しようとしています。ユーザー。SparkでソートするときにNotSerializableExceptionが発生する

しかし、コンパレータ>を定義して縮小カウントをソートすると、java.io.NotSerializableExceptionがスローされ、すべてが失敗します。スパークのため

私のMavenの依存関係:

<groupId>org.apache.spark</groupId> 
<artifactId>spark-core_2.9.3</artifactId> 
<version>0.8.0-incubating</version> 

私が使用しているJavaコード:

public static void main(String[] args) { 

    JavaSparkContext sc = new JavaSparkContext("local", "spark"); 

    JavaRDD<String> lines = sc.textFile("stream.sample.txt").cache(); 

    JavaPairRDD<String, Long> words = lines 
     .map(new Function<String, JsonElement>() { 
      // parse line into JSON 
      @Override 
      public JsonElement call(String t) throws Exception { 
       return (new JsonParser()).parse(t); 
      } 

     }).map(new Function<JsonElement, String>() { 
      // read User ID from JSON 
      @Override 
      public String call(JsonElement json) throws Exception { 
       return json.getAsJsonObject().get("userId").toString(); 
      } 

     }).map(new PairFunction<String, String, Long>() { 
      // count each line 
      @Override 
      public Tuple2<String, Long> call(String arg0) throws Exception { 
       return new Tuple2(arg0, 1L); 
      } 

     }).reduceByKey(new Function2<Long, Long, Long>() { 
      // count messages for every user 
      @Override 
      public Long call(Long arg0, Long arg1) throws Exception { 
       return arg0 + arg1; 
      } 

     }); 

    // sort result in a descending order and take 10 users with highest message count 
    // This causes the exception 
    List<Tuple2<String, Long>> sorted = words.takeOrdered(10, new Comparator<Tuple2<String, Long>>(){ 

     @Override 
     public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { 
      return -1 * o1._2().compareTo(o2._2()); 
     } 

    }); 

    // print result 
    for (Tuple2<String, Long> tuple : sorted) { 
     System.out.println(tuple._1() + ": " + tuple._2()); 
    } 

} 

たスタックトレース:私はスパークAPIを介して行ってきました

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:601) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:297) 
    at java.lang.Thread.run(Thread.java:722) 
Caused by: org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: net.imagini.spark.test.App$5 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) 
    at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) 

私は正しい方向を指し示すものを見つけることができませんでした。何か間違っているのですか、これはSparkのバグですか? ご協力いただければ幸いです。

+0

UPDATE:どうやら、それをすべて* takeOrdered()*の第2引数として渡されているComparatorオブジェクトに繋がります。コンパレータのインターフェースを使用すると、「直列化可能」コンパレータを作成する必要があり、この作業を行うためにSerializableを拡張しないよう: 'パブリックインターフェイスSerializableComparator は、コンパレータを拡張し、Serializableを{}その後' は、このインタフェースを実装するオブジェクトを渡しますコンパレータが元の例外を防止するためです。 これはおそらくこの問題の最も洗練された解決策ではありません。私は間違いなくいくつかの提案を歓迎します:) –

答えて

2

@ vanco.antonで触れたように、あなたは、Java 8の機能のインターフェイス使用して、次のような何かを行うことができます:あなたのコード内

public interface SerializableComparator<T> extends Comparator<T>, Serializable { 

    static <T> SerializableComparator<T> serialize(SerializableComparator<T> comparator) { 
    return comparator; 
    } 

} 

そして:

import static SerializableComparator.serialize; 
... 
rdd.top(10, serialize((a, b) -> -a._2.compareTo(b._2))); 
関連する問題