2016-10-17 10 views
0

私はDRIの最初のRDDの各パーティションの内容が何であるかを確認するのがとても簡単です。 これは私が今やっていることです:DStream - Sparkストリーミング用のmapPartitionsWithIndexを使用

SparkConf sparkConfiguration= new SparkConf().setAppName("DataAnalysis").setMaster("local[*]"); 
    JavaStreamingContext sparkStrContext=new JavaStreamingContext(sparkConfiguration, Durations.seconds(1)); 
    JavaReceiverInputDStream<String> receiveParkingData=sparkStrContext.socketTextStream("localhost",5554); 


Time time=new Time(1000); 

JavaRDD<String>dataRDD= receiveParkingData.compute(time); 

//I get an error in this RDD 

    JavaRDD<String>indexDataRDD=dataRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { 
     @Override 

     public Iterator<String> call(Integer integer, Iterator<String> stringIterator) throws Exception { 
      return null; 
     } 
    }); 

indexDataRDD.collect(); 

だから私は、各パーティションとそのIDの内容を印刷したいです。しかし、indexDataRDD私IntelliJ IDEでこのメッセージが表示されます:mapPartitionsWithIndex (Function2<Integer, Iterator<String>, Iterator<String>>, boolean) in AbstractJavaRDDLike cannot be applied to (Function2<Integer, Iterator<String>, Iterator<String>>)

誰かがこの問題を私に助けてくれますか?各パーティションでコンテンツを取得する別の簡単な方法はありますか?私は本当に各パーティションの特定の内容を知りたいです。 ありがとうございます。

+0

「計算(時間)」の呼び出し理由は何ですか?あなたは一般的にそれをしてはいけません。 'DStream'は、与えられたバッチ間隔のパイプラインに一つの' RDD'を持っています。 –

答えて

0

参照用のmapPartitionsWithIndex用のサンプルプログラムです。

public class SparkDemo { 
public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    List<String> data = Arrays.asList("one","two","three","four","five"); 
    JavaRDD<String> javaRDD = sc.parallelize(data, 2); 
    JavaRDD<String> mapPartitionsWithIndexRDD = javaRDD 
      .mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { 
       @Override 
       public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception { 
        LinkedList<String> linkedList = new LinkedList<String>(); 
        while (iterator.hasNext()){ 
          linkedList.add(Integer.toString(index) + "-" + iterator.next()); 
         } 
        return linkedList.iterator(); 
       } 
      }, false); 
    System.out.println("mapPartitionsWithIndexRDD " + mapPartitionsWithIndexRDD.collect()); 
    sc.stop(); 
    sc.close(); 
    } 
} 
関連する問題