2017-11-21 3 views
0

開始変換と終了変換の中間のRDDに行数を記録します。私のコードは、現在、次のようになります。中間カウントの前に常にRDDをキャッシュする必要がありますか?

val transformation1 = firstTransformation(inputdata).cache // Is this cache recommended or can I remove it? 
log("Transformation1 count: " + tranformation1.count) 
val tranformation2 = secondTransformation(transformation1).cache 
val finalX = transformation2.filter(row => row.contains("x")) 
val finalY = tranformation2.filter(row => row.contains("y")) 

私の問題は、transformation1は(それがメモリに収まるが、後にメモリの問題が発生する)巨大RDDであり、メモリの多くを占めていることです。しかし、私はtranformation1(.count()secondTransformation())で2つの異なる操作を実行しているので、通常はキャッシュすることをお勧めします。

このタイプのシナリオはおそらく非常に一般的なものですから、それを処理する推奨方法は何ですか?中間カウントの前に常にRDDをキャッシュするか、または変換1で.cache()を削除することはできますか?

答えて

1

メモリに問題がある場合は、できるだけ早く固定解除してください。また、ディスクに保存することもできます。

val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY) // Is this cache recommended or can I remove it? 
log("Transformation1 count: " + tranformation1.count) 
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY) 
val finalX = transformation2.filter(row => row.contains("x")) 
val finalY = tranformation2.filter(row => row.contains("y")) 
// All the actions are done 
transformation1.unpersist() 
transformation2.unpersist() 

メモリの問題は、あなたがディスク

+0

にキャッシュする代わりに続く場合unpersistingた場合、それは良いだろう起こった前にunpersist使用することができた場合、クラッシュをスパークとorg.apacheスレッドで '例外「メイン」と言います.spark.rpc.RpcTimeoutException:先物は[120秒]後にタイムアウトしました。このタイムアウトは、spark.rpc.askTimeout' –

+0

によって制御されます。これは別の問題です。https://stackoverflow.com/questions/41123846/scala-spark-dataframes-join-java-util-concurrent-timeoutexception-futures -t – Mikel

関連する問題