2016-11-10 6 views
2

免責を生成RDDに呼び出された後には戻りません:コールは()combineByKey機能によって

[(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph, Jimmy, James, Jackeline, Juan]), (J,[Jimbo, Jina])]

と私:私は

スパークに新しいです、私は次のようになりますRDDを持っていますcombineByKeyを呼び出して結果としてJavaPairRDDを返します。<文字、整数>

この呼び出しは正しく動作しているようです(この時点から制御フローが渡され、デバッガfooに何らかの値があるようです)

JavaPairRDD<Character, Integer> foo = rdd.combineByKey(createAcc, addAndCount, combine); 
System.out.println(foo.collect()); 

私の問題は、プログラムがfoo.collect()の呼び出し後に戻ってこないということです。 アイデアはありますか?それはcombineByKeyによって呼び出される関数のコードは以下の通りです(:私はスパークバージョン2.0.0およびJava 8

EDITを使用しています私は、Eclipseのデバッガでデバッグしようとしましたが、私はすべての

で機会がなかったです

  Function<Iterable<String>, Integer> createAcc = 

      new Function<Iterable<String>, Integer>() { 

        public Integer call(Iterable<String> x) { 
          int counter = 0; 
          Iterator<String> it = x.iterator(); 
          while (it.hasNext()) { 
            counter++; 
          } 
          return counter; 
        } 
      }; 

      Function2<Integer, Iterable<String>, Integer> addAndCount = 

      new Function2<Integer,Iterable<String>, Integer>() { 

        public Integer call(Integer acc , Iterable<String> x) { 
          int counter = 0; 
          Iterator<String> it = x.iterator(); 
          while (it.hasNext()) { 
            counter++; 
          } 
          return counter + acc; 
        } 
      }; 

      Function2<Integer,Integer,Integer> combine = 

      new Function2<Integer,Integer, Integer>() { 

        public Integer call(Integer x, Integer y) { 
          return x+y; 
        } 
      }; 

アップデート2:要求されたログは、明らかに私はスパークに新しいですダミーコード原因は、 combineByKeyを呼び出して私の目標は、各キーにbeloning文字列のリスト)の合計の長さを見つけることです続く

16/11/11 17:21:32 INFO SparkContext: Starting job: count at Foo.java:265 16/11/11 17:21:32 INFO DAGScheduler: Got job 9 (count at Foo.java:265) with 3 output partitions 16/11/11 17:21:32 INFO DAGScheduler: Final stage: ResultStage 20 (count at Foo.java:265) 16/11/11 17:21:32 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 19, ShuffleMapStage 18) 16/11/11 17:21:32 INFO DAGScheduler: Missing parents: List() 16/11/11 17:21:32 INFO DAGScheduler: Submitting ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264), which has no missing parents 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 6.7 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 3.4 KB, free 1946.0 MB) 16/11/11 17:21:32 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on xxx.xxx.xx.xx:55712 (size: 3.4 KB, free: 1946.1 MB) 16/11/11 17:21:32 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1012 16/11/11 17:21:32 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 20 (MapPartitionsRDD[24] at combineByKey at Foo.java:264) 16/11/11 17:21:32 INFO TaskSchedulerImpl: Adding task set 20.0 with 3 tasks 16/11/11 17:21:32 INFO TaskSetManager: Starting task 0.0 in stage 20.0 (TID 30, localhost, partition 0, ANY, 5288 bytes) 16/11/11 17:21:32 INFO Executor: Running task 0.0 in stage 20.0 (TID 30) 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 3 blocks 16/11/11 17:21:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

+0

に変更し、それら? "foo.count()"の結果は何ですか? – Yaron

+0

データのサイズはかなり小さいです(期限切れに使用しているrddは質問に投稿されたものです)。 foo.count()への呼び出しはどちらも返されないようです(私はそれが同じ理由であると仮定します)。 – XII

+0

combineByKey変換は、アクションの呼び出し時にのみ実行されます(count ou collectなど)。この問題はおそらくcombineByKey関数にあります。詳細を教えてください。 – Marie

答えて

1

これは単純なJavaの問題です。「while」ループは決してit.nextを呼び出すことはなく、決して終了しません。データのサイズが何であるかを

while (it.hasNext()) { 
     it.next(); 
     counter++; 
    } 
+0

私は最大のnoobです – XII