2016-10-03 5 views
0

私はSparkデータフレームで作業しています。私は多くのレベルで私のデータフレームにカテゴリ変数を持っています。私はこの変数の単純な変換を試みています - n個の観測値(例えば1000)より大きい上位のいくつかのレベルだけを選んでください。クラブを他のすべてのレベルで「その他」カテゴリに分類します。Sparkデータフレーム列を変換する

私はSparkにはかなり新しいので、これを実装するのに苦労しています。これは私がこれまで達成してきたことです:

# Extract all levels having > 1000 observations (df is the dataframe name) 
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count")) 

# Extract the level names 
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect 

これは私に保持したいレベルの名前を持つ配列を与えます。次に、列に適用できる変換関数を定義する必要があります。これは私が立ち往生しているところです。私は、ユーザー定義関数を作成する必要があると信じています。これは私が試したものです。しかし

# Define UDF 
val var_transform = udf((x: String) => { 
    if (level_names contains x) x 
    else "others" 
}) 

# Apply UDF to the column 
val df_new = df.withColumn("Var_new", var_transform($"Col_name")) 

、私はそれが「タスク直列化可能ではない」例外をスローdf_new.showをしようとします。私は間違って何をしていますか?また、これを行うには良い方法がありますか?

ありがとうございます!ここで

+0

rdd.map(x => x(0))。collect –

+0

@ArunakiranNulu保持したいレベルの値の配列 – Dataminer

答えて

1

は、私の意見では、このような単純な変換のために良いだろうなソリューションです:(放送に参加することなど)データフレームのAPIと信頼触媒に固執し、タングステンが最適化されるように:

val levels_count = df 
    .groupBy($"Col_name".as("new_col_name")) 
    .count 
    .filter("count >10000") 

val df_new = df 
    .join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter") 
    .drop("Col_name") 
    .withColumn("new_col_name",coalesce($"new_col_name", lit("other"))) 
+0

これは私に与えられます型ミスマッチエラー.... found:String( "other") 必須:org.apache.spark.sql.Column – Dataminer

+0

Ha、申し訳ありませんが、私は暗闇の中でそれを書きました。この場合、リテラルから列を作成するorg.apache.spark.sql.functions.lit関数があります。 – Wilmerton

+0

joinTypeの2番目の編集: - | – Wilmerton

関連する問題