私はマスターと2つのスレーブ(Spark Standaloneを使用しています)でスパーククラスタをセットアップしました。クラスターはいくつかの例ではうまくいきますが、私のアプリケーションではうまくいきません。私のアプリケーションのワークフローは、csv - > csvの各行をヘッダー - > JSONに変換 - > S3に保存して読み込みます。ここに私のコードは次のとおりです。PySpark - SparkクラスタEC2 - S3に保存できません
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
私もEC2環境にAWS_Key_ID
とAWS_Secret_Key
をエクスポートしています。しかし、上記のコードでは、私のアプリケーションは動作しません。下記の問題されています
JSONファイルをS3に保存されていないが、私は、アプリケーションの数回の実行を試してみましたが、また、S3のページが、データなしをリロードしてきました。アプリケーションはログにエラーなしで完了しました。また、
print(f)
とprint(row.name)
はログに出力されません。 S3上でJSONを保存するために修正する必要があるのですが、とにかく私がデバッグ目的でログに出力するにはどうしたらいいですか?現在、私はアプリケーションがcsvファイルを読むことができるように、csvファイルをワーカーノードに置く必要があります。どのようにしてファイルを別の場所に置くことができますか?マスターノードとアプリケーションの実行時に、csvファイルをすべてのワーカーノードに分割して、分散システムとして並列にアップロードできるようにしますか?
ヘルプは本当に感謝しています。あなたの助けを前もってありがとう。
は、デバッグにロガーを入れた後
を更新し、私はマップ機能upload_func()
が呼び出されていないか、アプリケーションが(関数呼び出しの前と後のロガー出力されるメッセージを)この機能の中に入ることができなかった問題を特定しました。その理由を知っていれば助けてください?