2015-09-14 8 views
10

最近、スタンドアロンのPython MLコードをsparkに移行する予定です。 spark.mlのMLパイプラインは、アルゴリズムステージの連鎖とハイパーパラメータグリッド検索のための合理化されたAPIにより、非常に便利です。Spark MLパイプラインで中間結果をキャッシュする

でも、既存のドキュメントではわかりにくい1つの重要な機能がサポートされています。中間結果のキャッシング。この特徴の重要性は、パイプラインが計算集約的な段階を含む場合に生じる。

例えば、私の場合は、入力フィーチャを形成するために巨大な疎行列を使用して時系列データに対して複数の移動平均を実行します。マトリックスの構造は、いくつかのハイパーパラメータによって決定される。このステップは、実行時に行列を構築する必要があるため、パイプライン全体のボトルネックとなります。

私は通常、この「構造パラメーター」以外のパラメーターを調べます。ですから、 "構造パラメータ"が変更されていないときに巨大な行列を再利用できれば、時間を大幅に節約できます。このため、これらの中間結果をキャッシュして再利用するように意図的にコードを作成しました。

私の質問は:スパークのMLパイプラインが中間キャッシングを自動的に処理する?それとも手動でコードを作成しなければならないのですか?もしそうなら、そこから学ぶべきベストプラクティスはありますか?

P.S.私は公式文書とその他の資料を調べましたが、そのうちのどれもこのトピックについて議論していないようです。

+0

を私は[関連する質問](http://stackoverflow.com/questions/33161320/distributed-batch-computationを持っています長期間の持続性とチェックポイント設定)、残念ながら応答もありません。 –

答えて

4

私は同じ問題に遭遇しました。私が解決したのは、私自身のPipelineStageを実装していて、入力データセットをキャッシュしてそのまま返すことでした。

import org.apache.spark.ml.Transformer 
import org.apache.spark.ml.param.ParamMap 
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable} 
import org.apache.spark.sql.{DataFrame, Dataset} 
import org.apache.spark.sql.types.StructType 

class Cacher(val uid: String) extends Transformer with DefaultParamsWritable { 
    override def transform(dataset: Dataset[_]): DataFrame = dataset.toDF.cache() 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    override def transformSchema(schema: StructType): StructType = schema 

    def this() = this(Identifiable.randomUID("CacherTransformer")) 
} 

それを使用するには、あなたはこのようなものだろう:

new Pipeline().setStages(Array(stage1, new Cacher(), stage2)) 
+1

私はこの問題の唯一の問題は解決策だと思っています(私は実際にupvoted!)、あなたはprevisouslyにキャッシュされたデータフレームをunpersistし​​ないということです。 SparkはGC時に自動的にunpersistするので問題はないと主張するかもしれませんが、UIからかなり混乱する可能性があります。非常に多くのキャッシュデータが見られます。 –

関連する問題