2016-11-16 5 views
4

を切断Iスパークシェルで次のジョブを実行した:スパークUI DAGステージ

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist 
d.join(d.reduceByKey(_ + _)).collect 

をスパークUIを3つの段階を示しています。ステージ4および5は、dの計算に対応し、ステージ6は、collectアクションの計算に対応する。 dが保持されているので、私は2つのステージしか期待しません。しかしながら、ステージ5は、他のステージに接続されていない状態で存在する。

Spark UI DAG

だから、持続使用せずに、同じ計算を実行しようとした、とDAGは、RDDが永続化されたことを示す緑のドットなしを除いて、全く同じように見えます。

Spark UI DAG without persist

Iは、ステージ11の出力は、ステージ12の入力に接続することが期待されるが、それはありません。

ステージの説明を見ると、ステージ5には入力があるため、dが保持されているように見えますが、ステージ5が存在する理由はまだ分かりません。

Spark UI stages

Spark UI stages without persist

答えて

1
  1. 入力RDDがキャッシュされ、キャッシュされた部分が再計算されていません。

    import org.apache.spark.SparkContext 
    
    def f(sc: SparkContext) = { 
        val counter = sc.longAccumulator("counter") 
        val rdd = sc.parallelize(0 until 100).map(i => { 
        counter.add(1L) 
        (i%10, i) 
        }).persist 
        rdd.join(rdd.reduceByKey(_ + _)).foreach(_ =>()) 
        counter.value 
    } 
    
    assert(f(spark.sparkContext) == 100) 
    
  2. キャッシュはDAGからステージを削除しません:

    これは簡単なテストで検証することができます。

    データが対応するステージcan be marked as skippedにキャッシュされていても、DAGの一部です。リネージュはチェックポイントを使用して切り捨てられますが、同じことではなく、ステージをビジュアライゼーションから削除しません。

  3. 入力ステージには、キャッシュされた演算以上のものが含まれています。

    スパークステージは、シャッフルを実行せずにチェーンできる操作をグループ化します。

    入力ステージの一部がキャッシュされていますが、シャッフルファイルの準備に必要なすべての操作をカバーしているわけではありません。このため、スキップされたタスクは表示されません。

  4. 残り(デタッチメント)は、グラフの視覚化の単なる制限です。

  5. あなたが最初のデータを再分割した場合:

    import org.apache.spark.HashPartitioner 
    
    val d = sc.parallelize(0 until 1000000) 
        .map(i => (i%100000, i)) 
        .partitionBy(new HashPartitioner(20)) 
    
    d.join(d.reduceByKey(_ + _)).collect 
    

    あなたがDAGを得るでしょう、あなたが最も可能性を探しています:

    enter image description here

0

がuser6910411の詳細な回答、RDDに追加します最初のアクションが実行されるまでメモリに保持されず、RDDの遅延評価のためにDAG全体が計算されます。したがって、初めてcollect()を実行すると、RDD "d"は初めてメモリに保持されますが、メモリからは何も読み込まれません。 collect()を2回実行すると、キャッシュされたRDDが読み込まれます。

あなたが最終RDDにtoDebugStringを行う場合も、それは出力の下示す:

scala> d.join(d.reduceByKey(_ + _)).toDebugString 
res5: String = 
(4) MapPartitionsRDD[19] at join at <console>:27 [] 
| MapPartitionsRDD[18] at join at <console>:27 [] 
| CoGroupedRDD[17] at join at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
| | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 
| ShuffledRDD[16] at reduceByKey at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
    | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 

上記の粗いグラフィック表現として示すことができる:RDD Stages