2016-04-04 1 views
0

私はSparkを使って新しいです。私は巨大なtimeseriesに対処しなければなりません。ベンチマークのために、私はロールメーンのいくつかの実装を比較する必要があります。 numpyの反復モードでは、実際に高速です(1,000,000ポイントで0.055970s、window = 3)。私は、ロール・マンの新しいcomer-in-pysparkバージョンを書いて、結果は恐ろしいものです(同じベクトルでは数秒です)。 は例えば、私はスパークではどのようにロールマンを書いていますか?

ts_list = ["key1", "key2",...,"keyN"] 
seq = sc.parallelize(ts_list) 
d = {"key1": [1, 2, 3, ...], "key2": [1, 2, 3, ...]} 

マイ処理を持っている:

s = seq.map(lambda s: (s, d[s]))\ 
.flatMap(lambda s: [(s[0], sum(elem)/k) for elem in rolling_window(np.array(s[1]), k)])\ 
.groupByKey().mapValues(lambda x: list(x))\ 
.collect() 

私は、k個のポイントのために労働者を使用することは価値がないと思います。私はマスターと労働者の間のコミュニケーションは時間がかかると思っています。また、私は並列化または内部の労働者とドライバでデータを読み込む必要がありますか?

ベクターに何兆点ものポイントがある場合はどうなりますか?

+0

されますさて、あなたは 'groupBy'を使用し、その方法は、大量に消費しますすべてのノード間でデータを移動する必要があるためです。 –

答えて

1

問題の主観的なリスト:mapValues続いgroupByKey続いflatMapを使用して

  • は全く理にかなっていません。それは必要以上に高価であり、一般的に(ここではない)注文に関する保証はありません。だけではなく、必要なロジックを適用するためにmapValuesを使用しますnumpyのアレイを作成

    def rolling_mean(xs): 
        ... 
    
    rdd.mapValues(rolling_mean) 
    
  • は高価です。これらが小さい場合、オーバーヘッドはかなり大きくなる可能性があります。あなたがrolling_windowの実装を提供しなかったので、意味のあるテストを行うことも可能ですが、あなたは、プレーンlistかけ始めsliding_windowからNumPy配列を使用したり、内蔵のarrayない場合は、一般的に速く

  • データロードする必要があります:

    パラレル化または内部のワーカーでドライバにデータをロードする必要がありますか?

    可能であれば、ドライバから渡されないワーカーには常にデータをロードする必要があります。後者のオプションは、主にテストとプロトタイプを作成するのに便利であり、プログラムに深刻なIOボトルネックが導入されます。さらに、データが単一のマシンのメモリに収まる場合、些細な計算を分散することは、むしろ利益をもたらす可能性は低い。

    あなたがparallelizeと決めるとそれはスマートになると言われています。

    sc.parallelize(d) 
    

    これは、各エグゼキュータにdの完全なコピーを渡す必要はありません。

  • 最後に、現実的な期待があります。タスクはここのような比較的安価である場合、総コストは駆動が、ネットワークIO、ローカルソケット通信のような他の要因、シリアライズ、デシリアライゼーションのと一般的な簿記

+0

この建設的な答えをありがとう。私はスパークの知識が不足しているのを知っています。私はそれに取り組んでいます!私は終わりました: 'result = seq.map(lambda s:(s、load_vector(s)))mapValues(lambda x:rollmean_calc(x、3))。 – GwydionFR

関連する問題