2017-01-30 12 views
0

dataframeを繰り返し実行する必要があります。これは、forループを使用して値のセットを反復処理できるようにpySparkを使用することです。以下は私が書いたコードです。このコードの問題は、私は私が私が持っていたら、私はループを壊すことができない機能にfuncRowIter pySparkを使用してデータフレームの各行を反復する

  • をDATAFRAMEから任意の値を印刷することはできませんよ、並列
  • を破った収集使用する必要が

    1. です一致が見つかりました。

    私はpySparkでそれをしなければならないし、このためにパンダを使用することはできません。

    from pyspark.sql.functions import * 
    from pyspark.sql import HiveContext 
    from pyspark.sql import functions 
    from pyspark.sql import DataFrameWriter 
    from pyspark.sql.readwriter import DataFrameWriter 
    from pyspark import SparkContext 
    
    sc = SparkContext() 
    hive_context = HiveContext(sc) 
    
    tab = hive_context.sql("select * from update_poc.test_table_a") 
    
    tab.registerTempTable("tab") 
    print type(tab) 
    
    df = tab.rdd 
    
    def funcRowIter(rows): 
        print type(rows) 
         if(rows.id == "1"): 
          return 1 
    
    df_1 = df.map(funcRowIter).collect() 
    print df_1 
    
  • 答えて

    1

    は、あなたの目標は、特定の行を表示することであると思われます。 .filter、次に.collectを使用できます。例えば

    row_1 = rdd.filter(lambda x: x.id==1).collect() 
    

    しかし、あなたのデータフレームの上にこの方法を反復処理しようとするのは効率的ではありません。

    +0

    の何百万人をすなわち)だって収集並列処理を中断します –

    0

    df_1 = df.map(funcRowIter).collect()を使用する代わりに、UDFを試してください。これが役立つことを願っています。

    from pyspark.sql.functions import struct 
    from pyspark.sql.functions import * 
    def funcRowIter(rows): 
        print type(rows) 
        if(row is nor None and row.id is not None) 
         if(rows.id == "1"): 
          return 1 
    A = udf(funcRowIter, ArrayType(StringType())) 
    z = df.withColumn(data_id, A(struct([df[x] for x in df.columns]))) 
    z.show() 
    

    collect()非常にビッグデータのための良いオプションになることはありません(私は収集せずにこれを実行しようとしていたレコード

    関連する問題