2017-02-17 11 views
1

私がしようとしていることを簡単な例で説明しましょう。私たちは以下のように2つの非常に単純なデータフレームを持っているとしましょう:DF1からpysparkデータフレームの複数の列のデカルト積に基づいて新しい列を作成する方法

Df1 
+---+---+---+ 
| a1| a2| a3| 
+---+---+---+ 
| 2| 3| 7| 
| 1| 9| 6| 
+---+---+---+ 

Df2 
+---+---+ 
| b1| b2| 
+---+---+ 
| 10| 2| 
| 9| 3| 
+---+---+ 

、DF2、我々はDF1から元の列のデカルト積、DF2ある列を持つ新しいDFを作成する必要があります。特に、新しいdfは 'a1b1'、 'a1b2'、 'a2b1'、 'a2b2'、 'a3b1'、 'a3b2'を持ち、行はdf1、df2からの対応する列の乗算になります。結果DFは次のようになります。

Df3 
+----+----+----+----+----+----+ 
|a1b1|a1b2|a2b1|a2b2|a3b1|a3b2| 
+----+----+----+----+----+----+ 
| 20| 4| 30| 6| 70| 14| 
| 9| 3| 81| 27| 54| 18| 
+----+----+----+----+----+----+ 

私は、スパーク、オンラインドキュメントを検索しましただけでなく、質問はここに掲載が、彼らがすべての行ではなく、列のデカルト積についてあるようです。例えば、rdd.cartesian()次のコードのように、列の値の異なる組み合わせのデカルト積を提供する:

r = sc.parallelize([1, 2]) 
r.cartesian(r).toDF().show() 

+---+---+ 
| _1| _2| 
+---+---+ 
| 1| 1| 
| 1| 2| 
| 2| 1| 
| 2| 2| 
+---+---+ 

しかし、これは私が必要なものではありません。ここでも、行の代わりに新しい列を作成する必要があります。私の問題では、行数は同じままです。私はudfが最終的に問題を解決できることを理解しています。しかし私の実際のアプリケーションでは、巨大なデータセットがあり、すべての列を作成するには時間がかかります(可能なすべての列の組み合わせとして約500個の新しい列)。効率を上げる可能性のある何らかのベクトル演算を行うことを好みます。私は間違っているかもしれませんが、スパークudfは行の操作に基づいているように思えます。

提案/フィードバック/コメントありがとうございました。あなたの便宜のために

は、私が上に示したデータフレームの例を作成するために、ここで簡単なコードを添付:

df1 = sqlContext.createDataFrame([[2,3,7],[1,9,6]],['a1','a2','a3']) 
df1.show() 

df2 = sqlContext.createDataFrame([[10,2],[9,3]],['b1','b2']) 
df2.show() 
+0

どのように行をリンクしますか?秩序はあなたが一般的に頼ることのできるものではありません。 – zero323

+0

こんにちはZero323、あなたのメッセージのおかげで。行をリンクする主キーがあります。ここでは、行が整数インデックスで一致し、すべてのデータフレームの行数が同じであると仮定します。 – spectrum

+0

OK、プロのヒント:明示的なキーを持つことは良いです。 df1.join(df2、['id']).df1.columns内のxに対して、df2.columns内のyに対してxf(x)* df2 [y]を選択します。 = 'id'とy!= 'id']) 'id'がリンク列であるとき。 – zero323

答えて

-1

そのない簡単な私の知る限り。ここではevalを使用してのショットは次のとおりです。あなたの助けになるかもしれない他の

# function to add rownumbers in a dataframe 
def addrownum(df): 
    dff = df.rdd.zipWithIndex().toDF(['features','rownum']) 
    odf = dff.map(lambda x : tuple(x.features)+tuple([x.rownum])).toDF(df.columns+['rownum']) 
    return odf 

df1_ = addrownum(df1) 
df2_ = addrownum(df2) 
# Join based on rownumbers 
outputdf = df1_.rownum.join(df2_,df1_.rownum==df2_.rownum).drop(df1_.rownum).drop(df2_.rownum) 

n1 = ['a1','a2','a3'] # columns in set1 
n2 = ['b1','b2']  # columns in set2 

# I create a string of expression that I want to execute 
eval_list = ['x.'+l1+'*'+'x.'+l2 for l1 in n1 for l2 in n2] 
eval_str = '('+','.join(eval_list)+')' 
col_list = [l1+l2 for l1 in n1 for l2 in n2] 

dfcartesian = outputdf.map(lambda x:eval(eval_str)).toDF(col_list) 

何かがspark.ml.featureで要素単位の製品ですが、それは何もそれほど複雑になることはありません。 1つのリストの複数の要素から別のリストに要素を取り出し、その特徴ベクトルをデータフレームに展開し直します。

+0

こんにちは、お返事ありがとうございます。繰り返しますが、使用しているメソッドは行操作であり、巨大なデータセットに対しては非常に遅いです。また、別のウェイトベクトルを使用して配列内のセルに行を掛けているため、mllibのElementwise製品は機能しません。 – spectrum

関連する問題