2017-10-13 5 views
0

私はテキストファイルを読み、日付に基づいて平均を計算し、その概要をMysqlデータベースに保存する小さなシナリオを持っています。続いデータセットのデータがMysqlデータベースに挿入された後に更新されます

は、次repo_sumデータフレーム内の平均値を算出した後、コード

val repo_sum = joined_data.map(SensorReport.generateReport) 
      repo_sum.show() --- STEP 1 
      repo_sum.write.mode(SaveMode.Overwrite).jdbc(url, "sensor_report", prop) 
      repo_sum.show() --- STEP 2 

ステップ1

+----------+------------------+-----+-----+ 
|  date|    flo| hz|count| 
+----------+------------------+-----+-----+ 
|2017-10-05|52.887049194476745|10.27| 5.0| 
|2017-10-04| 55.4188048943416|10.27| 5.0| 
|2017-10-03| 54.1529270444092|10.27| 10.0| 
+----------+------------------+-----+-----+ 

の結果である[保存コマンドが実行され、ステップ2でのデータセットの値が

であります
+----------+-----------------+------------------+-----+ 
|  date|    flo|    hz|count| 
+----------+-----------------+------------------+-----+ 
|2017-10-05|52.88704919447673|31.578524597238367| 10.0| 
|2017-10-04| 55.4188048943416| 32.84440244717079| 10.0| 
+----------+-----------------+------------------+-----+ 

以下は完全コード

sparcでsaveコマンドを実行すると、プロセス全体が再び実行されることが分かります。私の理解は正しいと思いますか?

+0

試してみてください。2つのアクションは、この予期しない動作を引き起こしているデータフレーム上で実行されるため、データフレームをキャッシュしても問題が解決するだろう。この場合


、 'val repo_sum = joined_data.map(SensorReport.generateReport).cache()'が発生します。 – Shaido

+0

はいval repo_sum = joined_data.map(SensorReport.generateReport).cache()はうまく動作します –

+0

質問に完全な回答を追加しました。 – Shaido

答えて

1

すべての変換は怠惰ですが、actionが呼び出されるまで何も起こりません。同時に、同じRDDまたはデータフレームで複数のアクションが呼び出された場合、すべての計算が複数回実行されることを意味します。これには、データとすべての変換のロードが含まれます。

これを避けるには、cache()またはpersist()を使用します(cache()は異なる種類のストレージを指定できますが、デフォルトはRAMメモリのみ)。 cache()は、アクションが最初に使用された後にRDD /データフレームをメモリに保持します。したがって、同じ変換を複数回実行することは避けてください。 `RDD`をキャッシュし、それかどうかを確認するために

val repo_sum = joined_data.map(SensorReport.generateReport).cache() 
関連する問題