2016-03-20 21 views
0

私のコードでは、3つの異なる列を別々に実行して分散/標準偏差/平均などを計算する計算がいくつかあります.....問題は、値を再マッピングしてから各列の分散を計算する必要があるためです。分散/標準/平均の非同期スパークRDD計算

これらの3つのステートメントを同時に非同期で実行し、以下の例で指定する3つの変数の最終値を取得することはできますか?以下は

コード:

 final Double varSHOUR    = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getSHOUR(); 
     } 
    }).variance(); 
    final Double varHOURLYFRAMESIN  = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getHOURLYFRAMESIN(); 
     } 
    }).variance(); 
    final Double varHOURLYFRAMESOUT  = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getHOURLYFRAMESOUT(); 
     } 
    }).variance(); 

答えて

1

あなたが代わりにダブルのごModelHealthクラスを使用してJavaDoubleRDD.variance()のスパークの実装を模倣する必要があります。これはSparkのStatCounterを使用して実際の計算を行うことができるので難しくありません。そのうち3つが必要になります。例えば

、私は3つのDoubleフィールドV1、V2、V3との簡単なModelHealthを使用します:次に

static class ModelHealth { 
    final Double v1; 
    final Double v2; 
    final Double v3; 
} 

JavaRDD<ModelHealth> dataset = // your data 

// zero value - three empty StatCounters: 
final Tuple3<StatCounter, StatCounter, StatCounter> zeroValue = new Tuple3<>(new StatCounter(), new StatCounter(), new StatCounter()); 

// using `aggregate` to aggregate ModelHealth records into three StatCounters: 
final Tuple3<StatCounter, StatCounter, StatCounter> stats = dataset.aggregate(zeroValue, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, ModelHealth, Tuple3<StatCounter, StatCounter, StatCounter>>() { 
    @Override 
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> stats, ModelHealth record) throws Exception { 
     // merging record into tuple of StatCounters - each value merged with corresponding counter 
     stats._1().merge(record.v1); 
     stats._2().merge(record.v2); 
     stats._3().merge(record.v3); 
     return stats; 
    } 
}, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>>() { 
    @Override 
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> v1, Tuple3<StatCounter, StatCounter, StatCounter> v2) throws Exception { 
     // merging tuples of StatCounters - each counter merged with corresponding one 
     v1._1().merge(v2._1()); 
     v1._2().merge(v2._2()); 
     v1._3().merge(v2._3()); 
     return v1; 
    } 
}); 

Double v1_variance = stats._1().variance(); 
Double v2_variance = stats._2().variance(); 
Double v3_variance = stats._3().variance(); 

これはあなたが持っていた同じ結果が得られますが、データセット上で単一の集計を使用します。

+0

集計用にFunction2を実装しようとすると、コードの2番目の部分でコンパイラからエラーが発生し、抽象宣言または抽象メソッドを実装する必要があります。 – user2100493

+0

私のために正常にコンパイルされ、実行されました - "コール"メソッドのシグネチャを変更しましたか?スーパーインプットと一致することを確認し、戻り値の型は型定義と一致する必要があります。 –

関連する問題