2016-12-21 6 views
0

私は異なる列( 'features')を含むデータフレームを持っています。Spark - filter()の代わりにgroupBy()を使用して、データフレームの計算時間を最適化します。

私の目標は、列X統計的尺度を計算することです: 平均、スタンダール・偏差、分散

をしかし、列Y.への依存 例えばとともに、それらのすべてを計算しますY = 1のすべての行を取得し、mean、stddev、varを計算すると はY = 2のすべての行に対して同じ処理を行います。

私の現在の実装は次のようになります。

print "For CONGESTION_FLAG = 0:" 
log_df.filter(log_df[flag_col] == 0).select([mean(size_col), stddev(size_col), 
             pow(stddev(size_col), 2)]).show(20, False) 

print "For CONGESTION_FLAG = 1:" 
log_df.filter(log_df[flag_col] == 1).select([mean(size_col), stddev(size_col), 
             pow(stddev(size_col), 2)]).show(20, False) 
print "For CONGESTION_FLAG = 2:" 
log_df.filter(log_df[flag_col] == 2).select([mean(size_col), stddev(size_col), 
             pow(stddev(size_col), 2)]).show(20, False) 

私はより速く、それらの計算の実行を行うために(私は1GBのデータでこれを使用していることをfilter()方法は、計算時間の面で無駄で語った、とアドバイスを受けました。ファイル)の場合は、groupBy()メソッドを使用する方が良いでしょう。

誰かが、代わりにgroupByを使用して、同じ計算を行うためにそれらの行を変換するのを手伝ってもらえますか? シンタックスが混乱してしまい、正しく処理できませんでした。

ありがとうございました。

答えて

1

フィルターは無駄ではありません。問題は、データを3回スキャンしていることを意味する複数回(値ごとに1回)ということです。記述している操作は、グループ化された列の値ごとに基本的にデータを集めるgroupbyによって最も効果的です。あなたはこのような何か行うことができます

agg_df = log_df.groupBy(flag_col).agg(mean(size_col).alias("mean"), stddev(size_col).alias("stddev"), pow(stddev(size_col),2).alias("pow")) 

をまた(あなたがあなたのデータでそれを試してみてください)集約後STDDEV^2を計算することで、より良いパフォーマンスを得る可能性があります:用

agg_df = log_df.groupBy(flag_col).agg(mean(size_col).alias("mean"), stddev(size_col).alias("stddev")) 
agg_df2 = agg_df.withColumn("pow", agg_df["stddev"] * agg_df["stddev"]) 
+0

感謝を説明的な答え。 aggregated_dfを作成する前に 'log_df.cache()'を使用するとパフォーマンスが向上しますか? – Adiel

+0

log_dfの生成方法と使用方法によって異なります。ファイルを作成するだけの場合(たとえば、ファイルから読み込む場合)、何のメリットもありません。あなたが他のもののためにそれを再使用し、十分なメモリを持っているならおそらくはい。 –

+0

私はテキストファイル(1GB)を読んでいて、いくつかの列の値を操作していて、他の列のいくつかの条件でいくつかの行を削除してから計算に行きます。私はgroupBy \ aggregateメソッドによる計算の直前にdf.cache()を追加したい – Adiel

1

次のことが可能です。それだけで

log_df.groupBy(log_df[flag_col]).agg(
    mean(size_col), stddev(size_col), pow(stddev(size_col), 2) 
) 
関連する問題