2016-08-03 14 views
2

DataFrameのチャンクで任意の(SQLではない)ロジックを呼び出すSpark DataFrameで分散計算を進める必要があります。 私がやった:私が代わりに私が「itertools.chain」しまった、各マップ呼び出し内スパークデータフレームのオブジェクトを持つことが期待Spark DataFrame mapPartitions

AttributeError: 'itertools.chain' object has no attribute 'toPandas'

def some_func(df_chunk): 
    pan_df = df_chunk.toPandas() 
    #whatever logic here 

df = sqlContext.read.parquet(...) 
result = df.mapPartitions(some_func) 

は生憎それはにつながります。どうして?そしてこれを克服する方法は?

+2

PySparkはmapPartitionにデータを渡すためにitertools.chainを使用しているので、あなたはそれが認識されない関数にこのオブジェクトを渡しています。 –

答えて

2

はこれを試してみてください:

>>> columns = df.columns 
>>> df.rdd.mapPartitions(lambda iter: [pd.DataFrame(list(iter), columns=columns)])