2016-02-22 14 views
6

私は、単純なグラフの問題で動作するようにSparkを使用しようとしました。私はSparkのソースフォルダーにtransitive_closure.pyというサンプルプログラムを見つけました。transitive_closure.pyは、200以上のエッジと頂点を持たないグラフ内の推移閉包を計算します。しかし、自分のラップトップでは、10分以上実行され、終了しません。私が使用するコマンドラインは:spark-submit transitive_closure.pyです。Sparkサンプルプログラムが非常に遅く実行されます

このような小さな推移閉包結果を計算しても、なぜスパークが遅いのだろうと思いますか?それは一般的なケースですか?私が見逃す構成はありますか?

プログラムは以下のとおりで、ウェブサイトのspark installフォルダにあります。

from __future__ import print_function 

import sys 
from random import Random 

from pyspark import SparkContext 

numEdges = 200 
numVertices = 100 
rand = Random(42) 


def generateGraph(): 
    edges = set() 
    while len(edges) < numEdges: 
     src = rand.randrange(0, numEdges) 
     dst = rand.randrange(0, numEdges) 
     if src != dst: 
      edges.add((src, dst)) 
    return edges 


if __name__ == "__main__": 
    """ 
    Usage: transitive_closure [partitions] 
    """ 
    sc = SparkContext(appName="PythonTransitiveClosure") 
    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 
    tc = sc.parallelize(generateGraph(), partitions).cache() 

    # Linear transitive closure: each round grows paths by one edge, 
    # by joining the graph's edges with the already-discovered paths. 
    # e.g. join the path (y, z) from the TC with the edge (x, y) from 
    # the graph to obtain the path (x, z). 

    # Because join() joins on keys, the edges are stored in reversed order. 
    edges = tc.map(lambda x_y: (x_y[1], x_y[0])) 

    oldCount = 0 
    nextCount = tc.count() 
    while True: 
     oldCount = nextCount 
     # Perform the join, obtaining an RDD of (y, (z, x)) pairs, 
     # then project the result to obtain the new (x, z) paths. 
     new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0])) 
     tc = tc.union(new_edges).distinct().cache() 
     nextCount = tc.count() 
     if nextCount == oldCount: 
      break 

    print("TC has %i edges" % tc.count()) 

    sc.stop() 

答えて

4

このコードは、あなたのマシン上で特にうまく実行されませんが、ほとんどの場合、これはSpark iteration time increasing exponentially when using joinで説明した問題のちょうど別の変種である多くの理由があることができます。それは確かにそうであるかどうかを確認するための最も簡単な方法は、提出にspark.default.parallelismパラメータを提供することです:

bin/spark-submit --conf spark.default.parallelism=2 \ 
    examples/src/main/python/transitive_closure.py 

そうでない場合は制限されない場合は、SparkContext.unionRDD.joinRDD.unionはパーティションの合計数に子のパーティションの数を設定します両親に通常は望ましい動作ですが、反復して適用すると非常に非効率になります。

+1

ありがとうございます。本当に役に立ちました。もう一つ質問があります。お手伝いできれば、とても感謝しています。結合ポイント、選択、結合、更新などのような多くの関係演算をループ内で使用するプログラムがあるとします。合計タプル数が50以下であっても、2回目の繰り返しとJavaヒープサイズの例外が発生しました。私はすべてのデータフレーム操作でcache()とcoalesce(1)を使用しました。あなたは何が問題だと思いますか? – c21

0

useageは、コマンドラインは、各パーティション、仕事のないinital配布に参加して

transitive_closure [partitions] 

の設定、デフォルトの並列度だけ助けると言います。

私は、より多くのパーティションを使用すべきだと主張します。既定の並列処理を設定しても役立つかもしれませんが、投稿したコードによって数値が明示的に設定されます(引数が渡されるか2のいずれか大きい方)。絶対最小値はSparkで使用可能なコアでなければなりません。そうでなければ常に100%未満で作業しています。

+0

ここで並列度を上げることには価値がありません。実際に与えられた量のデータは、それを1に減らすことでより多くを得ることができます:) Sparkを落とすことはもちろんです。 – zero323

関連する問題