0
開始変換と終了変換の中間のRDDに行数を記録します。私のコードは、現在、次のようになります。中間カウントの前に常にRDDをキャッシュする必要がありますか?
val transformation1 = firstTransformation(inputdata).cache // Is this cache recommended or can I remove it?
log("Transformation1 count: " + tranformation1.count)
val tranformation2 = secondTransformation(transformation1).cache
val finalX = transformation2.filter(row => row.contains("x"))
val finalY = tranformation2.filter(row => row.contains("y"))
私の問題は、transformation1
は(それがメモリに収まるが、後にメモリの問題が発生する)巨大RDDであり、メモリの多くを占めていることです。しかし、私はtranformation1(.count()
とsecondTransformation()
)で2つの異なる操作を実行しているので、通常はキャッシュすることをお勧めします。
このタイプのシナリオはおそらく非常に一般的なものですから、それを処理する推奨方法は何ですか?中間カウントの前に常にRDDをキャッシュするか、または変換1で.cache()
を削除することはできますか?
にキャッシュする代わりに続く場合unpersistingた場合、それは良いだろう起こった前にunpersist使用することができた場合、クラッシュをスパークとorg.apacheスレッドで '例外「メイン」と言います.spark.rpc.RpcTimeoutException:先物は[120秒]後にタイムアウトしました。このタイムアウトは、spark.rpc.askTimeout' –
によって制御されます。これは別の問題です。https://stackoverflow.com/questions/41123846/scala-spark-dataframes-join-java-util-concurrent-timeoutexception-futures -t – Mikel