2016-09-15 4 views
0

Spark Streaming Programmingガイドで推奨されている並列性を高めるために、複数の受信者を設定し、それらのリストを結合しようとしています。このコードは期待通りに動作します。スパークストリーミングのFlume Receiversの一覧

private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

     List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>(); 

     for (String host : hosts) { 
      for (String port : ports) { 
       receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
      } 
     } 

     JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0) 
       .union(receivers.get(1)) 
       .union(receivers.get(2)) 
       .union(receivers.get(3)) 
       .union(receivers.get(4)) 
       .union(receivers.get(5)); 

     return unionStreams; 
    } 

しかし、実際に実行するまでに私のクラスタが持つレシーバの数はわかりません。ループでこれをやろうとすると、NPEが得られます。

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

     List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>(); 

     for (String host : hosts) { 
      for (String port : ports) { 
       receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
      } 
     } 

     JavaDStream<SparkFlumeEvent> unionStreams = null; 
     for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) { 
      if (unionStreams == null) { 
       unionStreams = receiver; 
      } else { 
       unionStreams.union(receiver); 
      } 
     } 

     return unionStreams; 
    } 

ERROR:

16/09/15 17:05:25 ERROR JobScheduler: Error in job generator java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 16/09/15 17:05:25 INFO MemoryStore: ensureFreeSpace(15128) called with curMem=520144, maxMem=555755765 16/09/15 17:05:25 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 14.8 KB, free 529.5 MB) Exception in thread "main" java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

これを行うための正しい方法は何ですか?

+0

私もJavaDStream unionStreams = jssc.union(receivers.get(0)、receivers.subListを試してみました(1、receivers.size()));それはコンパイルされません。エラー:(118、57)java:適切なメソッドが見つかりませんでした。... – Kevin

答えて

0

あなたはそれがあなたの問題を解決するだろう、以下のコードを試してみてくださいすることができ:

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) { 

    List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>(); 

    for (String host : hosts) { 
     for (String port : ports) { 
      receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port))); 
     } 
    } 

    return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));; 
} 
+0

コンパイル時に "Error:(133、85)java:到達不能なステートメント"というエラーが発生しました。Intellijは "互換性のない同等の制約:SparkFlumeEventとT " – Kevin

+0

私が気づいていなかったあなたのコードの;; typoに起因する到達不能な文のエラーを無視しますが、intellijはまだそのエラーを出しています。 – Kevin

+0

intellijキャッシュに問題がありました: - /非常にイライラします!ここのコメントを次のように解決しました:http://stackoverflow.com/questions/15052772/intellij-show-errors-in-scala-source-files-but-the -project-compiles-successfully。助けてくれてありがとう。 – Kevin