2016-12-02 9 views
0

ここに、私はJSONにデータフレームを書き込むために使用しているコードを示します。私はツェッペリンから、このコードを実行しています:jsonファイルにsparkデータフレームを書き込むことができません。

val df = Seq((2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating") 
df.write.json("/tmp/out.json") 

私は何を期待することは/tmp/out.jsonファイルに記述されたデータフレームのデータです。しかしそれは「/tmp/out.json」名前のディレクトリを作成し、その内部で私は2つのファイルが次見つける:これらのファイルのいずれもが、JSONデータを持たないさ

_SUCCESS 
._SUCCESS.crc 

を。私はここで何が欠けていますか?

+1

ですあなたはクラスタを実行しているのか、それともローカルで実行していますドライバのマシンではなく、エグゼキュータの出力ディレクトリをクラスタでチェックしたことがありますか? – ImDarrenG

+0

@ImDarrenG executorのjsonデータを見ることができます。それはエグゼキュータで分割されます。 1つのjsonファイルですべてのjsonデータを取得する方法はありますか? –

+0

はい、可能です。http://stackoverflow.com/a/40594798/7098262 – Mariusz

答えて

0

あなたはいくつかのオプションがあります:共有の場所へ

  • 書き込みをし、ドライバーに
  • df.rdd.collect()データを(マージを行うためにスパークを使用していない)ファイルをマージしてファイルに書き込みます。標準的なscalaライブラリを使用して、パーティショニングは行われません。これには、エグゼキュータからすべてのデータをドライバにプルする必要があるという欠点があります。データとドライバのリソースの量によっては、速度が遅く、実行不能になる可能性があります。データセット全体を集めるよりも
  • より良いアプローチは、順番に各パーティションを収集し、ドライバ

例えば上の単一のファイルにデータをストリーミングするために、次のようになります。

val rdd = df.rdd 
for (p <- rdd.partitions) { 
    val idx = p.index 
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true) 
    //The second argument is true to avoid rdd reshuffling 
    val data = partRdd.collect //data contains all values from a single partition 
           //in the form of array 
    //Now you can do with the data whatever you want: iterate, save to a file, etc. 
} 

https://stackoverflow.com/a/21801828/4697497

関連する問題