2016-11-21 3 views
1

私はSpark Streamingの新機能を使用しています。この問題を処理する方法を理解しようとしていますが、さらに多くの例がシングル(K、V)ペアです。 JavaでSparkの変換を使用する最良の方法を見つけるために、私はいくつかの助けに感謝します。 複数のキーでスパークストリーミングを減らすJava

目標は、時間ウィンドウ内の要素の集合の誤差率を得ることで、私は簡単にシナリオを記述してみましょう。次の入力を考えると

(A, Error) 
(B, Success) 
(B, Error) 
(B, Success) 
(C, Success) 
(C, Error) 

要素で集計するために起こっていると、ステータス(Element, (Number of Success, Number of Error))。 > I1 /(I1 + I2) - この場合には変換の結果は、

(A, (0,1)) 
(B, (2,1)) 
(C, (1,1)) 

そして、このような(I1、I2)のような関数を使用して、最終的比率計算であろう。

(A, 100%) 
(B, 33.3%) 
(C, 50%) 

は、私の知る限り理解し、その結果が、例えば、

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1)); 

は、アプリケーションの逆流後、reduceByKeyAndWindow()関数によって与えられるであろう、私の質問は、

あります

複数の値またはキー(たとえばJavaPairDStream<String, Tuple2<Integer,Integer>>のようなもの)を使用してJavaPairDStreamでペアを定義する方法はありますか?

reduceFuncは複数のキーを持つペアのための最良のアプローチはどれですか?

最初のDStream(おそらくJavaDStream<Tuple2<String, String>> line = input.map(func)のようなもの)をマップする最善の方法はどれですか?

ご協力いただきありがとうございます。

答えて

2

私はすでに解決策を見つけました。関数クラスとタプルで作業することで、Scalaで構築する任意の組み合わせを見つけることができます。問題は、Javaに関連するドキュメンテーションや例が見つかりませんでした。あなたが将来誰にでも役立つ場合に備えて、私の解決策を見つけることができます。

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() { 
      public Iterator<Tuple2<String,String>> call(String s) throws Exception { 
       return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator(); 
      } 
     }); 


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
       new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() { 
        public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) { 
         return new Tuple2<Tuple2<String,String>, Integer>(t, 1); 
        } 
       }); 

     JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Error")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() { 
      public Boolean call(Tuple2<Tuple2<String,String>, Integer> t) 
      { 
       return (t._1._2.equals("Success")); 
      }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() { 
      public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) { 
       return new Tuple2(t._1._1,t._2); 
      } 
     }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      }}, Durations.seconds(30), Durations.seconds(1)); 

     JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors); 

     JavaPairDStream<String, Double> mappedRDD = countPairs 
       .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() { 
        public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception { 
         if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get()/
            ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get()))); 
         } else if (stringTuple2Tuple2._2()._2().isPresent()){ 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0); 
         } else { 
          return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0); 
         } 
        } 
       }); 
関連する問題