1

私はマスターと2つのスレーブ(Spark Standaloneを使用しています)でスパーククラスタをセットアップしました。クラスターはいくつかの例ではうまくいきますが、私のアプリケーションではうまくいきません。私のアプリケーションのワークフローは、csv - > csvの各行をヘッダー - > JSONに変換 - > S3に保存して読み込みます。ここに私のコードは次のとおりです。PySpark - SparkクラスタEC2 - S3に保存できません

def upload_func(row): 
    f = row.toJSON() 
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json") 
    print(f) 
    print(row.name) 

if __name__ == "__main__": 
    spark = SparkSession \ 
     .builder \ 
     .appName("Python Spark SQL data source example") \ 
     .getOrCreate() 
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED") 
    df.rdd.map(upload_func) 

私もEC2環境にAWS_Key_IDAWS_Secret_Keyをエクスポートしています。しかし、上記のコードでは、私のアプリケーションは動作しません。下記の問題されています

  1. JSONファイルをS3に保存されていないが、私は、アプリケーションの数回の実行を試してみましたが、また、S3のページが、データなしをリロードしてきました。アプリケーションはログにエラーなしで完了しました。また、print(f)print(row.name)はログに出力されません。 S3上でJSONを保存するために修正する必要があるのですが、とにかく私がデバッグ目的でログに出力するにはどうしたらいいですか?

  2. 現在、私はアプリケーションがcsvファイルを読むことができるように、csvファイルをワーカーノードに置く必要があります。どのようにしてファイルを別の場所に置くことができますか?マスターノードとアプリケーションの実行時に、csvファイルをすべてのワーカーノードに分割して、分散システムとして並列にアップロードできるようにしますか?

ヘルプは本当に感謝しています。あなたの助けを前もってありがとう。

は、デバッグにロガーを入れた後

を更新し、私はマップ機能upload_func()が呼び出されていないか、アプリケーションが(関数呼び出しの前と後のロガー出力されるメッセージを)この機能の中に入ることができなかった問題を特定しました。その理由を知っていれば助けてください?

答えて

0

地図を強制的に評価する必要があります。 sparkはオンデマンドで作業を実行します。

df.rdd.map(upload_func).count() do

関連する問題