2016-11-03 10 views
0

で関数を呼び出す:次のように私はScalaの機能を持っているmapPartitionスパーク

def variance (reg: Int, samRecords: Array[SAMRecord]) : 
    Array[(Int, (Int, String))] = 
{ 
    // Body of the function 
} 

私は次のようにmapPartition方法でこの関数を呼び出ししようとしています:

//SortedOut is RDD[(Int,(Int,Int,SAMRecord))] 
val Out = SortedOut.mapPartitions(iter=> {val inArr = iter.map(x=>x._2._3).toArray 
             val inReg = iter.map(x=> x._1).toArray 
             if (inArr.length != 0) 
             { 
             println("Calling function") 
             variantCall(inReg(0),inArr).iterator 
             } 
             else 
             iter}).cache 

私はSortedOutを持っていることを確認しています非空のパーティションでも、機能呼び出しは行われません。このコードはなぜ機能しないのですか?私は各パーティションに対してこの機能を呼びたいのですが、どうすればいいですか?

答えて

2

アクションを実行した場合にのみ、計算を開始できます。

評価

お知らせをトリガするために最後にcollectまたはcountまたはforeachなどを行うには、エンド

val Out = SortedOut.mapPartitions(iter=> {val inArr = iter.map(x=>x._2._3).toArray 
             val inReg = iter.map(x=> x._1).toArray 
             if (inArr.length != 0) 
             { 
             println("Calling function") 
             variantCall(inReg(0),inArr).iterator 
             } 
             else 
             iter}).cache.collect 
に集まります
関連する問題