2017-01-05 13 views
0

私は次のようなデータフレームを持っているのオフベース値:重複行の別の列

ID NumRecords 
123 2 
456 1 
789 3 

私は2つの列を連結し、に基づいて行を複製し、新たなデータフレームを作成したいですNumRecordsの値は

ので、出力は

ID_New 123-1 
ID_New 123-2 
ID_New 456-1 
ID_New 789-1 
ID_New 789-2 
ID_New 789-3 

私は「爆発する」機能に探していたしなければならないが、EXAに基づいてのみ定数を取るように見えました私は見た。私は同様の問題があった

from pyspark.sql.functions import udf, explode, concat_ws 
from pyspark.sql.types import * 

range_ = udf(lambda x: [str(y) for y in range(1, x + 1)], ArrayType(StringType())) 

df.withColumn("records", range_("NumRecords") \ 
    .withColumn("record", explode("records")) \ 
    .withColumn("ID_New", concat_ws("-", "id", "record")) 

答えて

-1

は、あなたが使用することができ、NumRecords列の値に基づいて行重複します。このコード:

from pyspark.sql import Row 


def duplicate_function(row): 
    data = [] # list of rows to return 
    to_duplicate = float(row["NumRecords"]) 

    i = 0 
    while i < to_duplicate: 
     row_dict = row.asDict() # convert a Spark Row object to a Python dictionary 
     row_dict["SERIAL_NO"] = str(i) 
     new_row = Row(**row_dict) # create a Spark Row object based on a Python dictionary 
     to_return.append(new_row) # adds this Row to the list 
     i += 1 

    return data # returns the final list 


# create final dataset based on value in NumRecords column 
df_flatmap = df_input.rdd.flatMap(duplicate_function).toDF(df_input.schema) 
+0

私が得たが、「タプルオブジェクトを呼び出すことはできません"on df.withColumn(" records "、range _(" NumRecords ")) – Dan