-1

私は3つのPySpark DataFramesに基づいて計算を行っています。Pysparkの新しいDataFrameに繰り返し保存する

このスクリプトは、計算を実行するという意味では機能しますが、私は計算の結果を正しく処理するのに苦労しています。列のそれぞれについて、およびデータの行ごとに

import sys 
import numpy as np 
from pyspark import SparkConf, SparkContext, SQLContext 
sc = SparkContext("local") 
sqlContext = SQLContext(sc) 

# Dummy Data 
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5']) 
df.show() 
+---+---+---+---+---+ 
| p1| p2| p3| p4| p5| 
+---+---+---+---+---+ 
| 0| 1| 0| 0| 0| 
| 1| 1| 0| 0| 1| 
| 0| 0| 1| 0| 1| 
| 1| 0| 1| 1| 0| 
| 1| 1| 0| 0| 0| 
+---+---+---+---+---+ 

# Values 
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index')) 
values.show() 
+----+----+-----+ 
| f1| f2|index| 
+----+----+-----+ 
| 0| 1| p1| 
|null| 1| p2| 
| 0| 0| p3| 
|null| 0| p4| 
| 1|null| p5| 
+----+----+-----+ 

# Weights 
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index')) 
weights.show() 
+----+----+-----+ 
| f1| f2|index| 
+----+----+-----+ 
| 4| 3| p1| 
|null| 1| p2| 
| 2| 2| p3| 
|null| 3| p4| 
| 3|null| p5| 
+----+----+-----+ 

# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V. 
# If there a no similarities between Row and V outputs 0 
def W_sum(row,v,w): 
    if len(w[row==v])>0: 
     return float(np.sum(w[row==v])/len(w)) 
    else: 
     return 0.0 

、上記機能が適用されます。

# We iterate over the columns of Values (except the last one called index) 
for val in values.columns[:-1]: 
    # we filter the data to work only with the columns that are defined for the selected Value 
    defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()] 
    # we select only the useful columns 
    df_select= df.select(defined_col) 
    # we retrieve the reference value and weights 
    V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten() 
    W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten() 
    W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType()) 
    df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns)))) 

これは与える:私はそれを尋ねたとして

+---+---+---+---+---+---+ 
| p1| p2| p3| p4| p5| f1| 
+---+---+---+---+---+---+ 
| 0| 1| 0| 0| 0|2.0| 
| 1| 1| 0| 0| 1|1.0| 
| 0| 0| 1| 0| 1|2.0| 
| 1| 0| 1| 1| 0|0.0| 
| 1| 1| 0| 0| 0|0.0| 
+---+---+---+---+---+---+ 

それはスライスされたデータフレームに列を追加しました。問題は、むしろ、結果を調べるために最後にアクセスできる新しいデータにデータを収集することです。
pandasと同じように、PySparkのDataFrameを(いくらか効率的に)拡張することは可能ですか?

編集私の目標をより明確にするために:理想的には私はこのような単なる計算カラムとデータフレームになるだろう

+---+---+ 
    | f1| f2| 
    +---+---+ 
    |2.0|1.0| 
    |1.0|2.0| 
    |2.0|0.0| 
    |0.0|0.0| 
    |0.0|2.0| 
    +---+---+ 
+1

火花のデータフレームは不変です。各行に新しいデータフレームを作成し、以前のデータフレームにユニオンを適用して、行が「追加」された新しいフレームワークを作成する必要があります。 –

+1

あなたの質問は何ですか(できれば望ましい出力の例があります)? – desertnaut

+0

@desertnaut:私は編集しました、うまくいけば私の目標は今より明確です。 – Haboryme

答えて

2

あなたの質問といくつかの問題があります...

まず、最後の行のdf_selectがどこにも定義されていないので、ループのforループでエラーが発生します。最後には割り当てもありません(何が生成されますか?)。 df_selectが実際にあなたのsubsubsampleデータフレームであることをと仮定すると

は、前にいくつかの行を定義し、あなたの最後の行が

new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns)))) 

のようなものであることを、あなたの問題がより明確になって起動します。 (f1との結果は単に上書きされるので、天然の)

values.columns[:-1] 
# ['f1', 'f2'] 

ので、ループ全体の結果は、ちょうど

+---+---+---+---+---+ 
| p1| p2| p3| p4| f2| 
+---+---+---+---+---+ 
| 0| 1| 0| 0|1.0| 
| 1| 1| 0| 0|2.0| 
| 0| 0| 1| 0|0.0| 
| 1| 0| 1| 1|0.0| 
| 1| 1| 0| 0|2.0| 
+---+---+---+---+---+ 

即ち含まのみカラムf2であろう。今

、私はは状況がこのようなものです、そして、あなたの問題ではなく別のデータフレームで一緒に両方の列f1 & f2を持つことがどのように実際にあることを、あなただけのsubsubsampleを忘れて、あなたに列を追加することができを想定して、言ったようにdf初期、おそらくその後、不要なものを削除する:結果のnew_dfはなり

init_cols = df.columns 
init_cols 
# ['p1', 'p2', 'p3', 'p4', 'p5'] 

new_df = df 

for val in values.columns[:-1]: 
    # we filter the data to work only with the columns that are defined for the selected Value 
    defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()] 
    # we retrieve the reference value and weights 
    V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten() 
    W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten() 
    W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType()) 
    new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here 

# drop initial columns: 
for i in init_cols: 
    new_df = new_df.drop(i) 

を:

+---+---+ 
| f1| f2| 
+---+---+ 
|2.0|1.0| 
|1.0|2.0| 
|2.0|0.0| 
|0.0|0.0| 
|0.0|2.0| 
+---+---+ 

(コメントの後)UPDATE:フロート、使用するあなたのW_sum機能で除算を強制するには:

from __future__ import division 

new_df今のようになります

+---------+----+ 
|  f1| f2| 
+---------+----+ 
|  2.0| 1.5| 
|1.6666666|2.25| 
|2.3333333|0.75| 
|  0.0|0.75| 
|0.6666667|2.25| 
+---------+----+ 

f2にそれがあるべきとおりにあなたのコメントによると。

+0

あなたの返事をありがとう、これは私を助けてくれました。最後の1つは、結果は切り捨てられます(f2は1.5,2.25,0.75,0.75,2.25であるべきです)。 – Haboryme

+0

@HaborymeはあなたのUDFの詳細を一切通らなかった。私は、質問がついていた部分を追加する列に専念していましたが、私は間違いなくあなたの希望する出力を正確に複製しました。にもかかわらず、更新を参照してください... – desertnaut

+1

素晴らしい!ありがとう。 – Haboryme

関連する問題